Netty线程模型
Netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到NioEventLoopGroup这个线程池
线程模型
首先来看一段Netty的使用示例
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public final class SimpleServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new SimpleServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded");
}
}
}
下面将分析第一、二行代码,看下NioEventLoopGroup类的构造函数干了些什么。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
从代码中可以看到这里使用了两个线程池bossGroup和workerGroup,那么为什么需要定义两个线程池呢?这就要说到Netty的线程模型了。
Netty的线程模型被称为Reactor模型,具体如图所示,图上的mainReactor指的就是bossGroup,这个线程池处理客户端的连接请求,并将accept的连接注册到subReactor的其中一个线程上;图上的subReactor当然指的就是workerGroup,负责处理已建立的客户端通道上的数据读写;图上还有一块ThreadPool是具体的处理业务逻辑的线程池,一般情况下可以复用subReactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的ThreadPool。
NioEventLoopGroup构造函数
NioEventLoopGroup的构造函数的代码如下
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
NioEventLoopGroup类中的构造函数最终都是调用的父类MultithreadEventLoopGroup如下的构造函数:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
从上面的构造函数可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()来创建对象,即不指定线程个数,则netty给我们使用默认的线程个数,如果指定则用我们指定的线程个数。
默认线程个数相关的代码如下:
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
而SystemPropertyUtil.getInt函数的功能为:得到系统属性中指定key(这里:key=”io.netty.eventLoopThreads”)所对应的value,如果获取不到获取失败则返回默认值,这里的默认值为:cpu的核数的2倍。
结论:如果没有设置程序启动参数(或者说没有指定key=”io.netty.eventLoopThreads”的属性值),那么默认情况下线程的个数为cpu的核数乘以2。
继续看,由于MultithreadEventLoopGroup的构造函数是调用的是其父类MultithreadEventExecutorGroup的构造函数,因此,看下此类的构造函数
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
//根据线程个数是否为2的幂次方,采用不同策略初始化chooser
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
//产生nTreads个NioEventLoop对象保存在children数组中
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//如果newChild方法执行失败,则对前面执行new成功的几个NioEventLoop进行shutdown处理
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
}