type
Post
status
Published
date
Aug 17, 2022
slug
reactor-pattern-implement-java-nio
summary
读完 Reactor 模式的论文,如果我们能够自己编码实现一遍,对这个模式的理解会更进一步,同时对这个模式的印象也就更加深刻,真正实现学以致用
tags
Java
Design Pattern
NIO
Reactor
category
技术分享
icon
password
Property
Sep 21, 2022 08:22 AM
前期准备
需求
因为我们将要开始的项目是对 Reactor 论文的实现,所以整个项目的需求也来源于该论文的示例。具体如下图所示
系统为分布式日志服务系统,其中本文实现的部分为 Logging Server ,它能够接受多个客户端的连接请求和日志记录请求,并将日志记录显示在控制终端上。
分析一下系统的功能点,有利于更好的开展工作
- 系统必须能够同时响应多个客户端的连接请求,并成功建立连接
- 系统必须能够同时接收多个客户端的日志记录,并打印到终端上
- 每个客户端发送的数据必须正确的被接收和打印
- 能够容忍部分客户端无故中断通信
Java New I/O 和 Reactor 模式
Java New IO
作为一门以面向对象编程为主范式的编程语言,Java 很早就提供了对计算机 IO 设备和接口的良好封装,屏蔽了他们在各个操作系统或者平台中的差异,给开发者提供了统一的抽象,让开发者写起代码来更顺畅,早期提供的库代码主要集中在包
java.io
中。Java New I/O (下文简称 NIO)指的是 JDK 1.4 及之后版本引入的一系列库,它给广大的 Java 开发者带了比前期版本更高性能、更易操作和更易扩展的 IO 操作库,代码主要集中在包
java.nio
中。本文需要掌握 NIO 中的
java.nio.channels.Selector
、 java.nio.channels.SelectionKey
、java.nio.channels.Channel
等核心模型以及典型用法。Reactor 模式
详细的内容可以参考前面的论文翻译,这里简要的回顾一下它的主要结构和交互
有些概念需要简略介绍一下
- Handle:代表一个事件源,比如 Socket
- Event:Handle 所发生的状态变化值
- Demultiplexer:多路分离器,对多个 Handle 的事件进行翻译和分拣
- Dispatcher:调用 Handler 来处理各个 Handle 的事件
- Handler:被用来处理各个 Handle 事件
两者结合
Logging Server 中我们需要处理多个客户端的连接请求和日志记录请求,简而言之,客户端同时通过 Socket 接口跟 Server 进行通信,我们需要对多个 Socket 上面的状态变化进行分离和挑拣,并分派给特定的 Handler 去处理。NIO 带来的全新抽象模型能够更好的跟实际中的系统 IO 模型交互,这也就为我们实现基于 Reactor 模式的 Logging Server 打好了基础,我们可以利用 NIO 提供的 Readness Selection 机制方便的实现 Demultiplexer 和对 Handle 的通知。
用 NIO 实现 Reactor 模式时部分组件和 NIO 模型对应的关系如下
- Handle —>
SelectionKey
- Event —>
SelectionKey.OP_READ
等
- Demultiplexer —>
Selector
动手开始
📢 所有代码均不可直接用于生产,演示代码不保证质量
整体代码结构如图
定义 Dispatcher 接口,用来分发不同的 Handle 事件到 Handler 中
import java.nio.channels.SelectableChannel; /** * @author wynn5a * @date 2021/2/17 */ public interface Dispatcher { void handleEvents(); void registerHandler(SelectableChannel channel, Class<? extends EventHandler> clazz, int event); void removeHandler(SelectableChannel channel); }
实现核心的 Dispatcher 的过程,是一个 Select loop
import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @author wynn5a * @date 2021/2/17 */ public class InitiationDispatcher implements Dispatcher { private final Selector selector; private final ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private final Map<SelectionKey, Class<? extends EventHandler>> handlers = new ConcurrentHashMap<>(); private final ReadWriteLock selectorLock = new ReentrantReadWriteLock(); public InitiationDispatcher() { try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } } @Override public void handleEvents() { while (true) { try { checkBeforeSelect(); selector.select(); var i = selector.selectedKeys().iterator(); while (i.hasNext()) { SelectionKey handle = i.next(); Class<? extends EventHandler> handlerClass = handlers.get(handle); if (handlerClass != null) { EventHandler handler = handlerClass.getDeclaredConstructor().newInstance(); handler.setHandle(handle); Future<Boolean> handled = pool.submit(handler); if (handled.get()) { i.remove(); } } } } catch (Exception e) { throw new RuntimeException(e); } } } /** * make sure none of handler thread is updating selector */ private void checkBeforeSelect() { selectorLock.writeLock().lock(); selectorLock.writeLock().unlock(); } @Override public void registerHandler(SelectableChannel channel, Class<? extends EventHandler> handlerClass, int event) { try { lockBeforeRegister(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, event); key.attach(this); handlers.put(key, handlerClass); } catch (Exception e) { throw new RuntimeException(e); } finally { unlockAfterRegister(); } } private void unlockAfterRegister() { selectorLock.readLock().unlock(); } private void lockBeforeRegister() { selectorLock.readLock().lock(); selector.wakeup(); } @Override public void removeHandler(SelectableChannel channel) { boolean registered = channel.isRegistered(); if (registered) { SelectionKey selectionKey = channel.keyFor(selector); selectionKey.cancel(); handlers.remove(selectionKey); } } }
定义 EventHandler 接口,主要用来处理 Handle 中发生的事件,也就是处理
SelectionKey
的状态变化import java.nio.channels.SelectionKey; import java.util.concurrent.Callable; /** * @author wynn5a * @date 2021/2/17 */ public interface EventHandler extends Callable<Boolean> { void handleEvent(); SelectionKey getHandle(); void setHandle(SelectionKey key); }
用来处理日志请求的 EventHandler
import io.github.wynn5a.reactor.nio.EventHandler; import io.github.wynn5a.reactor.nio.InitiationDispatcher; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; /** * @author wynn5a * @date 2021/2/17 */ public class LoggingHandler implements EventHandler { private SelectionKey handle; private final ByteBuffer buffer = ByteBuffer.allocate(1024); @Override public void handleEvent() { SocketChannel channel = (SocketChannel) handle.channel(); if (channel.isOpen() && handle.isReadable()) { try { buffer.clear(); int read = channel.read(buffer); if (read < 0) { System.out.println("Client connection refused"); InitiationDispatcher dispatcher = (InitiationDispatcher) handle.attachment(); dispatcher.removeHandler(channel); channel.close(); return; } buffer.flip(); slowdown(); System.out.print("LOG: " + new String(buffer.array(), UTF_8)); // handleLogContent(); buffer.clear(); } catch (IOException e) { throw new RuntimeException(e); } } } //if dont block in Dispatcher, then have to handle event more than 1 time private void handleLogContent() { if (buffer.remaining() > 0) { byte[] data = new byte[buffer.remaining()]; buffer.get(data); for (int i = 0; i < data.length; i++) { byte b = data[i]; if (b == '\r' || b == '\n') { byte[] temp = new byte[i + 1]; System.arraycopy(data, 0, temp, 0, i + 1); slowdown(); System.out.print("LOG: " + new String(temp, UTF_8)); } } } } private static void slowdown() { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public SelectionKey getHandle() { return handle; } @Override public void setHandle(SelectionKey key) { this.handle = key; } @Override public Boolean call() { handleEvent(); return true; } }
用来处理连接请求的 EventHandler
import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.charset.StandardCharsets.UTF_8; import io.github.wynn5a.reactor.nio.Dispatcher; import io.github.wynn5a.reactor.nio.EventHandler; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; /** * @author wynn5a * @date 2021/2/17 */ public class LoggingAcceptor implements EventHandler { private SelectionKey handle; @Override public void handleEvent() { ServerSocketChannel channel = (ServerSocketChannel) handle.channel(); if (channel.isOpen() && handle.isAcceptable()) { try { SocketChannel accept = channel.accept(); ByteBuffer buffer = ByteBuffer.wrap("Connect Successfully with Logging Server! \n".getBytes(UTF_8)); accept.write(buffer); Dispatcher dispatcher = (Dispatcher) handle.attachment(); dispatcher.registerHandler(accept, LoggingHandler.class, OP_READ); } catch (IOException e) { throw new RuntimeException(e); } } } @Override public SelectionKey getHandle() { return handle; } @Override public void setHandle(SelectionKey key) { this.handle = key; } @Override public Boolean call() { handleEvent(); return true; } }
最后是 Logging Server APP 的启动类
import static java.nio.channels.SelectionKey.OP_ACCEPT; import io.github.wynn5a.reactor.nio.Dispatcher; import io.github.wynn5a.reactor.nio.InitiationDispatcher; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; /** * @author wynn5a * @date 2021/2/17 */ public class LoggingServer { static Dispatcher dispatcher = new InitiationDispatcher(); public static void main(String[] args) { try { ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); channel.bind(new InetSocketAddress("127.0.0.1", 1234)); dispatcher.registerHandler(channel, LoggingAcceptor.class, OP_ACCEPT); dispatcher.handleEvents(); } catch (IOException e) { throw new RuntimeException(e); } } }
效果验证
上面的代码实现了一个监听了本地 1234 端口的 Logging Server,它可以接收来自多个客户端的请求,我们可以使用多个终端模拟客户端来验证效果