下面这个示例代码基本都是从Doug Lea的这个 PPT,Scalable IO in Java 中复制的,只是简单做了修改,以便这个Server能够初步可以运行起来。
以下为这个示例的源代码:
ReactorMain.java
package com.example.nio.reactor.pattern;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;public class ReactorMain { private static final Logger LOGGER = LoggerFactory.getLogger(ReactorMain.class); public static void main(String[] args) throws IOException { Reactor reactor = new Reactor(8080); Thread thread = new Thread(reactor); thread.setName("MAIN-REACTOR"); thread.start(); LOGGER.info("reactor start on port : 8080"); }} |
Handler.java
package com.example.nio.reactor.pattern;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public final class Handler implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Handler.class); private static final AtomicInteger IO_WORKER_NUMBER = new AtomicInteger(1); private static final ExecutorService IO_WORKER_POOL = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("IO-WORKER-" + IO_WORKER_NUMBER.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); private static final int MAX_IN = 1024; private static final int MAX_OUT = 1024; private static final int READING = 0; private static final int SENDING = 1; private static final int PROCESSING = 3; private ByteBuffer input = ByteBuffer.allocate(MAX_IN); private ByteBuffer output = ByteBuffer.allocate(MAX_OUT); private int state = READING; private final SocketChannel socketChannel; private final SelectionKey selectionKey; public Handler(Selector selector, SocketChannel socketChannel) throws IOException { this.socketChannel = socketChannel; socketChannel.configureBlocking(false); // Optionally try first read now selectionKey = this.socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { return true; } boolean outputIsComplete() { return true; } private void process() { LOGGER.info("process ..."); } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { ex.printStackTrace(); } } synchronized void read() throws IOException { socketChannel.read(input); if (inputIsComplete()) { // for single IO process thread // process(); // state = SENDING; // Normally also do first write now // selectionKey.interestOps(SelectionKey.OP_WRITE); state = PROCESSING; IO_WORKER_POOL.execute(new Processor()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment including data for response selectionKey.interestOps(SelectionKey.OP_WRITE); } class Processor implements Runnable { public void run() { processAndHandOff(); } } void send() throws IOException { socketChannel.write(output); if (outputIsComplete()) selectionKey.cancel(); }} |
Reactor.java
package com.example.nio.reactor.pattern;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class Reactor implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class); private static final int SIZE = 2; private final Selector[] selectors = new Selector[SIZE]; private int next = 0; private final ServerSocketChannel serverSocketChannel; private static final AtomicInteger SUB_REACTOR_THREAD_NUMBER = new AtomicInteger(1); private static final ExecutorService SUB_REACTOR_THREAD_POOL = new ThreadPoolExecutor(SIZE, SIZE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("SUB-REACTOR-" + SUB_REACTOR_THREAD_NUMBER.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); private static final AtomicInteger HANDLER_THREAD_NUMBER = new AtomicInteger(1); private static final ExecutorService HANDLER_THREAD_POOL = new ThreadPoolExecutor(SIZE, SIZE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("HANDLER-" + HANDLER_THREAD_NUMBER.getAndIncrement()); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); private final Selector selector; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); selectionKey.attach(new Acceptor()); startSubReactors(); } private void startSubReactors() throws IOException { for (int i = 0; i < selectors.length; i++) { selectors[i] = Selector.open(); final Selector subSelector = selectors[i]; SUB_REACTOR_THREAD_POOL.submit(() -> { while (!Thread.interrupted()) { try { int select = subSelector.select(1000L); if (select == 0) { synchronized (subSelector) { // release selector.publicKeys object monitor subSelector.wait(10L); } } else { Set<SelectionKey> selectionKeys = subSelector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); handle(selectionKey); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.info("sub reactor selector wait interrupted", e); } } }); } } private void handle(SelectionKey selectionKey) { Runnable attachment = (Runnable) selectionKey.attachment(); if (attachment != null) { // attachment is Handler HANDLER_THREAD_POOL.submit(attachment); } } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey) it.next()); } selected.clear(); } } catch (IOException e) { LOGGER.error("main reactor exception", e); } } public void dispatch(SelectionKey selectionKey) { Runnable attachment = (Runnable) (selectionKey.attachment()); if (attachment != null) { // attachment is Reactor$Acceptor LOGGER.info("{} accept {} new socket connection", selectionKey, selectionKey.channel()); attachment.run(); } } /** * run in main reactor thread */ class Acceptor implements Runnable { public synchronized void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { new Handler(selectors[next], socketChannel); } if (++next == selectors.length) { next = 0; } } catch (IOException e) { LOGGER.error("acceptor exception", e); } } }} |
logback.xml
为了方便查看线程运行情况,加入logback依赖及其配置文件,内容如下:
<configuration> <property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(-%5p) --- [%15.15thread] %cyan(%-40.40logger{39} [%5line] :) %m%n%ex"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>utf8</charset> </encoder> </appender> <root level="DEBUG"> <appender-ref ref="CONSOLE"/> </root></configuration> |
selector.publicKeys 对象锁竞争问题
线程在运行selector.select(1000)时此会一直执有selector.publicKeys对象锁,Handler中注册socketChannel时,获取不到selector.publicKeys对象锁而被一直阻塞,因此需要在运行selector.select(1000)多路复用器阻塞的线程中释放一下执有的selector.publicKeys对象锁才能让Handler构造方法中socketChannel注册完成。
/** * <pre> * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // SelectorImpl 类的 register() 这个方法中会同步这个 selector.publicKeys 属性,就是 selector.keys() 返回的 * // 对象,如果在 Acceptor 中注册 socketChannel 到 SubReactor 中时,SubReactor 线程因为执有 publicKeys 对象锁并一直 * // while(true) 循环,从而 Acceptor 中的 socketChannel.register() 就被 BLOCK 了,线程栈如下: * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // java.lang.Thread.State: BLOCKED (on object monitor) * // at sun.nio.ch.SelectorImpl.register(SelectorImpl.java:132) * // - waiting to lock <0x000000076e8c7390> (a java.util.Collections$UnmodifiableSet) * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // SelectorImpl 中关于同步 publicKeys 对象的代码如下 * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) { * // if (!(var1 instanceof SelChImpl)) { * // throw new IllegalSelectorException(); * // } else { * // SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this); * // var4.attach(var3); * // synchronized(this.publicKeys) { * // this.implRegister(var4); * // } * // * // var4.interestOps(var2); * // return var4; * // } * // } * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * </pre> */int select = subSelector.select(1000L);if (select == 0) { synchronized (subSelector) { // release selector.publicKeys object monitor subSelector.wait(10L); }} |