下面是Reactor Pattern中多Reactor多线程模型的简单示例,可以简单响应客户端的请求,但是没有处理TCP/IP传输数据包的粘包/分包相关问题。
直接运行此类的main()方法,里面有一个简单的同步阻塞的客户端实现。
多Reactor多线程模型示例代码
package com.example.nio.reactor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.BufferOverflowException;import java.nio.ByteBuffer;import java.nio.channels.*;import java.time.LocalDateTime;import java.util.*;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;/** * 多Reactor多线程模型 * * <pre> * 与单Reactor多线程模型相比,是将Reactor分成两部分: * 1. MainReactor负责监听server socket,用来处理新连接的接收,将建立的socketChannel指定注册给SubReactor。 * 2. SubReactor维护自己的selector,基于mainReactor注册的socketChannel多路分离IO读写事件,读写网络数据, * 将接收到的数据发给业务线程池worker处理,并在SubReactor线程池中返回业务处理的结果。 * 3. 业务逻辑代码通常比较耗时,不要在reactor线程处理。 * </pre> */public class TimeServerMultipleReactor { private static final Logger LOGGER = LoggerFactory.getLogger(TimeServerMultipleReactor.class); private static final int POOL_SIZE = 4; private static final AtomicInteger WORKER_NUMBER = new AtomicInteger(1); private static final AtomicInteger SUB_REACTOR_NUMBER = new AtomicInteger(1); private static final AtomicInteger MAIN_REACTOR_LOOPS = new AtomicInteger(); private final List<Queue<SocketChannel>> socketChannelQueues = new ArrayList<>(POOL_SIZE); private final Selector[] selectors = new Selector[POOL_SIZE]; private final ExecutorService workerPool = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE * 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("WORKER-" + WORKER_NUMBER.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); private final ExecutorService subReactorPool = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("SUB-REACTOR-" + SUB_REACTOR_NUMBER.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); public static void main(String[] args) { TimeServerMultipleReactor server = new TimeServerMultipleReactor(); server.start(); // Test Client Requests ExecutorService testExecutorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 6; i++) { testExecutorService.submit(new TimeServerMultipleReactorClient()); } testExecutorService.shutdown(); } public void start() { // 接受 TCP 连接的主线程 Thread mainReactorThread = new Thread(new MainReactor()); mainReactorThread.setName("MAIN-REACTOR"); mainReactorThread.start(); // 初始化并启动 SubReactor 线程 for (int i = 0; i < POOL_SIZE; i++) { socketChannelQueues.add(new ConcurrentLinkedQueue<>()); subReactorPool.submit(new SubReactor(i)); } subReactorPool.shutdown(); } private class MainReactor implements Runnable { public void run() { try { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); SelectionKey selectionKey = serverSocketChannel.register(selector, serverSocketChannel.validOps()); selectionKey.attach(new Acceptor()); LOGGER.info("server start at port : {}", 8080); // 这个是分配 socketChannel 到不同的 SubReactor 中去的索引 int index = 0; while (true) { MAIN_REACTOR_LOOPS.getAndIncrement(); int select = selector.select(1000); int counter = MAIN_REACTOR_LOOPS.get(); if ((counter & 0x3F) == 0) { LOGGER.info("MAIN_REACTOR_LOOPS : {} for main selector {}", counter, select); } if (select != 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey sk = iterator.next(); if (sk.isAcceptable()) { Acceptor acceptor = (Acceptor) sk.attachment(); acceptor.accept(index++, selectionKey); if (index >= POOL_SIZE) { index = 0; } } } selectionKeys.clear(); } } } catch (Exception e) { e.printStackTrace(); } } } private class Acceptor { public synchronized void accept(int index, SelectionKey selectionKey) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); if (Objects.nonNull(socketChannel)) { socketChannel.configureBlocking(false); // 使用这个共享队列,而不是使用共享sub Selector,避免在Selector及其publicKeys上产生锁竞争 boolean offer = socketChannelQueues.get(index).offer(socketChannel); // wakeup 对应的用路复用器 selectors[index].wakeup(); if (!offer) { LOGGER.warn("offer socketChannel failure : {}", socketChannel); } else { LOGGER.info("offer socketChannel success : {}", socketChannel); } } } catch (Exception e) { LOGGER.error("acceptor exception", e); } } } private class SubReactor implements Runnable { private int index; SubReactor(int index) { this.index = index; } public void run() { try { final Selector selector = Selector.open(); LOGGER.info("add sub reactor : {}", selector); selectors[index] = selector; // 每个子线程有一个I/O完成的消息发送队列,避免并发操作产生问题 Queue<SelectionKey> queue = new LinkedBlockingQueue<>(); // 每个子线程有一个channelSocket队列,均匀分布请求数 Queue<SocketChannel> socketChannels = socketChannelQueues.get(index); int counter = 0; while (!Thread.interrupted()) { counter++; int select = selector.select(10_000); if (select != 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { dispatch(iterator.next(), queue); } selectionKeys.clear(); } if ((counter & 0x7F) == 0) { LOGGER.info("sub reactor select counter : {} and select : {}", counter, select); } SocketChannel socketChannel = socketChannels.poll(); do { if (Objects.nonNull(socketChannel)) { // register new socketChannel to this selector try { LOGGER.info("register socketChannel to selector : {}", selector); socketChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } } while (Objects.nonNull(socketChannel = socketChannels.poll())); } } catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey, final Queue<SelectionKey> queue) { SelectableChannel selectableChannel = selectionKey.channel(); if (selectionKey.isReadable()) { SocketChannel channel = (SocketChannel) selectableChannel; if (!channel.isConnected()) { LOGGER.info("channel is closed : {}", channel); return; } try { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = channel.read(byteBuffer); if (read == -1) { LOGGER.info("close channel"); channel.close(); } else { if (read > 0) { byteBuffer.flip(); int remaining = byteBuffer.remaining(); byte[] bytes = new byte[remaining]; byteBuffer.get(bytes); String request = new String(bytes); // 多线程负责I/O处理 workerPool.submit(new Processor(request, selectionKey, queue)); } else { LOGGER.info("read 0 bytes for channel : {}", channel); } } } catch (IOException e) { try { LOGGER.error("close channel for selectionKey : {}", selectionKey, e); selectionKey.channel(); channel.close(); } catch (IOException ex) { ex.printStackTrace(); } } } else if (selectionKey.isWritable()) { send(selectionKey, queue); } } private class Processor implements Runnable { private String request; private SelectionKey selectionKey; private Queue<SelectionKey> queue; Processor(String request, SelectionKey selectionKey, Queue<SelectionKey> queue) { this.request = request; this.selectionKey = selectionKey; this.queue = queue; } public void run() { try { // 业务处理时间 TimeUnit.MILLISECONDS.sleep(10L); LOGGER.info("request : {}", request); boolean offer = queue.offer(selectionKey); if (!offer) { LOGGER.info("add queue failure for selectionKey : {}", selectionKey); } else { String attachment = request + " : " + LocalDateTime.now(); // add interest OP_WRITE selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE); // 并添加附件 selectionKey.attach(attachment); // wakeup被select()阻塞的selector selectionKey.selector().wakeup(); } } catch (InterruptedException e) { e.printStackTrace(); } } } private void send(SelectionKey selectionKey, Queue<SelectionKey> queue) { if (queue.isEmpty()) { // queue should not be empty return; } ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int size = queue.size(); boolean removed = queue.remove(selectionKey); LOGGER.info("queue size : {}, removed selectionKey : {}", size, removed); SocketChannel channel = (SocketChannel) selectionKey.channel(); String body = (String) selectionKey.attachment(); if (Objects.isNull(body)) { return; } try { LOGGER.info("send response : {}", body); byteBuffer.clear(); try { byteBuffer.put(body.getBytes()); } catch (BufferOverflowException b) { byteBuffer.put("ERROR".getBytes()); } byteBuffer.flip(); channel.write(byteBuffer); // remove interest OP_WRITE and attachment selectionKey.attach(null); selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); } catch (Exception e) { LOGGER.error("byteBuffer or channel write error : {}", selectionKey, e); try { channel.close(); } catch (IOException ex) { ex.printStackTrace(); } } } private static class TimeServerMultipleReactorClient implements Callable<Boolean> { private static final Logger LOGGER = LoggerFactory.getLogger(TimeServerMultipleReactorClient.class); public Boolean call() throws IOException, InterruptedException { ByteBuffer byteBuffer = ByteBuffer.allocateDirect(64); SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080)); // 同步请求 // socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); while (!socketChannel.finishConnect()) { LOGGER.info("connecting ..."); TimeUnit.MILLISECONDS.sleep(5L); } for (int i = 1; i <= 20; i++) { byteBuffer.put(String.format("%d : TIME", i).getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); TimeUnit.MILLISECONDS.sleep(300L); byteBuffer.clear(); if (socketChannel.read(byteBuffer) > 0) { byteBuffer.flip(); byte[] response = new byte[byteBuffer.remaining()]; byteBuffer.get(response); LOGGER.info("{} : receive response : {}", i, new String(response)); byteBuffer.clear(); } } socketChannel.shutdownOutput(); // 服务端会收到FIN包 TimeUnit.MILLISECONDS.sleep(300L); socketChannel.close(); return true; } }} |
logback.xml
为了方便查看线程运行情况,加入logback依赖及其配置文件,内容如下:
<configuration> <property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(-%5p) --- [%15.15thread] %cyan(%-50.50logger{49} [%5line] :) %m%n%ex"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>utf8</charset> </encoder> </appender> <root level="DEBUG"> <appender-ref ref="CONSOLE"/> </root></configuration> |
Selector 在不同线程中对象锁竞争问题
多路复用器selector的register(...)和select(...)操作会有锁冲突,在 reactor pattern skeleton example 文章中已经简单说明了多路复用器上有很多同步操作,锁竞争很严重,并可能阻塞线程,为了避免在selector及其属性对象上有同步操作,本示例里将MainReactor新接入的socketChannel加入到对应SubReactor线程的不同队列里,另外IO请求处理完的结果也用队列来维护减少并发问题,可以看源码注释说明。