netty系列之:NIO和netty详解

netty系列之:NIO和netty详解,第1张

netty为什么快呢?这是因为netty底层使用了JAVA的NIO技术,并在其基础上进行了性能的优化,虽然netty不是单纯的JAVA nio,但是netty的底层还是基于的是nio技术。

nio是JDK1.4中引入的,用于区别于传统的IO,所以nio也可以称之为new io。

nio的三大核心是Selector,channel和Buffer,本文我们将会深入探究NIO和netty之间的关系。

在讲解netty中的NIO实现之前,我们先来回顾一下JDK中NIO的selector,channel是怎么工作的。对于NIO来说selector主要用来接受客户端的连接,所以一般用在server端。我们以一个NIO的服务器端和客户端聊天室为例来讲解NIO在JDK中是怎么使用的。

因为是一个简单的聊天室,我们选择Socket协议为基础的ServerSocketChannel,首先就是open这个Server channel:

然后向server channel中注册selector:

虽然是NIO,但是对于Selector来说,它的select方法是阻塞方法,只有找到匹配的channel之后才会返回,为了多次进行select操作,我们需要在一个while循环里面进行selector的select操作:

selector中会有一些SelectionKey,SelectionKey中有一些表示操作状态的OP Status,根据这个OP Status的不同,selectionKey可以有四种状态,分别是isReadable,isWritable,isConnectable和isAcceptable。

当SelectionKey处于isAcceptable状态的时候,表示ServerSocketChannel可以接受连接了,我们需要调用register方法将serverSocketChannel accept生成的socketChannel注册到selector中,以监听它的OP READ状态,后续可以从中读取数据:

当selectionKey处于isReadable状态的时候,表示可以从socketChannel中读取数据然后进行处理:

上面的serverResponse方法中,从selectionKey中拿到对应的SocketChannel,然后调用SocketChannel的read方法,将channel中的数据读取到byteBuffer中,要想回复消息到channel中,还是使用同一个socketChannel,然后调用write方法回写消息给client端,到这里一个简单的回写客户端消息的server端就完成了。

接下来就是对应的NIO客户端,在NIO客户端需要使用SocketChannel,首先建立和服务器的连接:

然后就可以使用这个channel来发送和接受消息了:

向channel中写入消息可以使用write方法,从channel中读取消息可以使用read方法。

这样一个NIO的客户端就完成了。

虽然以上是NIO的server和client的基本使用,但是基本上涵盖了NIO的所有要点。接下来我们来详细了解一下netty中NIO到底是怎么使用的。

以netty的ServerBootstrap为例,启动的时候需要指定它的group,先来看一下ServerBootstrap的group方法:

ServerBootstrap可以接受一个EventLoopGroup或者两个EventLoopGroup,EventLoopGroup被用来处理所有的event和IO,对于ServerBootstrap来说,可以有两个EventLoopGroup,对于Bootstrap来说只有一个EventLoopGroup。两个EventLoopGroup表示acceptor group和worker group。

EventLoopGroup只是一个接口,我们常用的一个实现就是NioEventLoopGroup,如下所示是一个常用的netty服务器端代码:

这里和NIO相关的有两个类,分别是NioEventLoopGroup和NioServerSocketChannel,事实上在他们的底层还有两个类似的类分别叫做NioEventLoop和NioSocketChannel,接下来我们分别讲解一些他们的底层实现和逻辑关系。

NioEventLoopGroup和DefaultEventLoopGroup一样都是继承自MultithreadEventLoopGroup:

他们的不同之处在于newChild方法的不同,newChild用来构建Group中的实际对象,NioEventLoopGroup来说,newChild返回的是一个NioEventLoop对象,先来看下NioEventLoopGroup的newChild方法:

这个newChild方法除了固定的executor参数之外,还可以根据NioEventLoopGroup的构造函数传入的参数来实现更多的功能。

这里参数中传入了SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler、taskQueueFactory和tailTaskQueueFactory这几个参数,其中后面的两个EventLoopTaskQueueFactory并不是必须的。

最后所有的参数都会传递给NioEventLoop的构造函数用来构造出一个新的NioEventLoop。

在详细讲解NioEventLoop之前,我们来研读一下传入的这几个参数类型的实际作用。

SelectorProvider是JDK中的类,它提供了一个静态的provider()方法可以从Property或者ServiceLoader中加载对应的SelectorProvider类并实例化。

另外还提供了openDatagramChannel、openPipe、openSelector、openServerSocketChannel和openSocketChannel等实用的NIO操作方法。

SelectStrategyFactory是一个接口,里面只定义了一个方法,用来返回SelectStrategy:

什么是SelectStrategy呢?

先看下SelectStrategy中定义了哪些Strategy:

SelectStrategy中定义了3个strategy,分别是SELECT、CONTINUE和BUSY_WAIT。

我们知道一般情况下,在NIO中select操作本身是一个阻塞操作,也就是block操作,这个操作对应的strategy是SELECT,也就是select block状态。

如果我们想跳过这个block,重新进入下一个event loop,那么对应的strategy就是CONTINUE。

BUSY_WAIT是一个特殊的strategy,是指IO 循环轮询新事件而不阻塞,这个strategy只有在epoll模式下才支持,NIO和Kqueue模式并不支持这个strategy。

RejectedExecutionHandler是netty自己的类,和 java.util.concurrent.RejectedExecutionHandler类似,但是是特别针对SingleThreadEventExecutor来说的。这个接口定义了一个rejected方法,用来表示因为SingleThreadEventExecutor容量限制导致的任务添加失败而被拒绝的情况:

EventLoopTaskQueueFactory是一个接口,用来创建存储提交给EventLoop的taskQueue:

这个Queue必须是线程安全的,并且继承自java.util.concurrent.BlockingQueue.

讲解完这几个参数,接下来我们就可以详细查看NioEventLoop的具体NIO实现了。

首先NioEventLoop和DefaultEventLoop一样,都是继承自SingleThreadEventLoop:

表示的是使用单一线程来执行任务的EventLoop。

首先作为一个NIO的实现,必须要有selector,在NioEventLoop中定义了两个selector,分别是selector和unwrappedSelector:

在NioEventLoop的构造函数中,他们是这样定义的:

首先调用openSelector方法,然后通过返回的SelectorTuple来获取对应的selector和unwrappedSelector。

这两个selector有什么区别呢?

在openSelector方法中,首先通过调用provider的openSelector方法返回一个Selector,这个Selector就是unwrappedSelector:

然后检查DISABLE_KEY_SET_OPTIMIZATION是否设置,如果没有设置那么unwrappedSelector和selector实际上是同一个Selector:

DISABLE_KEY_SET_OPTIMIZATION表示的是是否对select key set进行优化:

如果DISABLE_KEY_SET_OPTIMIZATION被设置为false,那么意味着我们需要对select key set进行优化,具体是怎么进行优化的呢?

先来看下最后的返回:

最后返回的SelectorTuple第二个参数就是selector,这里的selector是一个SelectedSelectionKeySetSelector对象。

SelectedSelectionKeySetSelector继承自selector,构造函数传入的第一个参数是一个delegate,所有的Selector中定义的方法都是通过调用

delegate来实现的,不同的是对于select方法来说,会首先调用selectedKeySet的reset方法,下面是以isOpen和select方法为例观察一下代码的实现:

selectedKeySet是一个SelectedSelectionKeySet对象,是一个set集合,用来存储SelectionKey,在openSelector()方法中,使用new来实例化这个对象:

netty实际是想用这个SelectedSelectionKeySet类来管理Selector中的selectedKeys,所以接下来netty用了一个高技巧性的对象替换操作。

首先判断系统中有没有sun.nio.ch.SelectorImpl的实现:

SelectorImpl中有两个Set字段:

这两个字段就是我们需要替换的对象。如果有SelectorImpl的话,首先使用Unsafe类,调用PlatformDependent中的objectFieldOffset方法拿到这两个字段相对于对象示例的偏移量,然后调用putObject将这两个字段替换成为前面初始化的selectedKeySet对象:

如果系统设置不支持Unsafe,那么就用反射再做一次:

还记得前面我们提到的selectStrategy吗?run方法通过调用selectStrategy.calculateStrategy返回了select的strategy,然后通过判断

strategy的值来进行对应的处理。

如果strategy是CONTINUE,这跳过这次循环,进入到下一个loop中。

BUSY_WAIT在NIO中是不支持的,如果是SELECT状态,那么会在curDeadlineNanos之后再次进行select操作:

如果strategy >0,表示有拿到了SelectedKeys,那么需要调用processSelectedKeys方法对SelectedKeys进行处理:

上面提到了NioEventLoop中有两个selector,还有一个selectedKeys属性,这个selectedKeys存储的就是Optimized SelectedKeys,如果这个值不为空,就调用processSelectedKeysOptimized方法,否则就调用processSelectedKeysPlain方法。

processSelectedKeysOptimized和processSelectedKeysPlain这两个方法差别不大,只是传入的要处理的selectedKeys不同。

处理的逻辑是首先拿到selectedKeys的key,然后调用它的attachment方法拿到attach的对象:

如果channel还没有建立连接,那么这个对象可能是一个NioTask,用来处理channelReady和channelUnregistered的事件。

如果channel已经建立好连接了,那么这个对象可能是一个AbstractNioChannel。

针对两种不同的对象,会去分别调用不同的processSelectedKey方法。

对第一种情况,会调用task的channelReady方法:

对第二种情况,会根据SelectionKey的readyOps()的各种状态调用ch.unsafe()中的各种方法,去进行read或者close等操作。

NioEventLoop虽然也是一个SingleThreadEventLoop,但是通过使用NIO技术,可以更好的利用现有资源实现更好的效率,这也就是为什么我们在项目中使用NioEventLoopGroup而不是DefaultEventLoopGroup的原因。

这种问题其实到官方文档上查看一番就可以知道,tomcat很早的版本还是使用的BIO,之后就支持NIO了,具体版本我也不记得了,有兴趣的自己可以去查下。本篇的tomcat版本是tomcat8.5。可以到这里看下 tomcat8.5的配置参数

我们先来简单回顾下目前一般的NIO服务器端的大致实现,借鉴infoq上的一篇文章 Netty系列之Netty线程模型 中的一张图

所以一般参数就是Acceptor线程个数,Worker线程个数。来具体看下参数

文档描述为:

The maximum queue length for incoming connection requests when all possible request processing threads are in use. Any requests received when the queue is full will be refused. The default value is 100.

这个参数就立马牵涉出一块大内容:TCP三次握手的详细过程,这个之后再详细探讨(操作系统的接收队列长度默认为100)。这里可以简单理解为:连接在被ServerSocketChannel accept之前就暂存在这个队列中,acceptCount就是这个队列的最大长度。ServerSocketChannel accept就是从这个队列中不断取出已经建立连接的的请求。所以当ServerSocketChannel accept取出不及时就有可能造成该队列积压,一旦满了连接就被拒绝了

文档如下描述

The number of threads to be used to accept connections. Increase this value on a multi CPU machine, although you would never really need more than 2. Also, with a lot of non keep alive connections, you might want to increase this value as well. Default value is 1.

Acceptor线程只负责从上述队列中取出已经建立连接的请求。在启动的时候使用一个ServerSocketChannel监听一个连接端口如8080,可以有多个Acceptor线程并发不断调用上述ServerSocketChannel的accept方法来获取新的连接。参数acceptorThreadCount其实使用的Acceptor线程的个数。

文档描述如下

The maximum number of connections that the server will accept and process at any given time. When this number has been reached, the server will accept, but not process, one further connection. This additional connection be blocked until the number of connections being processed falls below maxConnections at which point the server will start accepting and processing new connections again. Note that once the limit has been reached, the operating system may still accept connections based on the acceptCount setting. The default value varies by connector type. For NIO and NIO2 the default is 10000. For APR/native, the default is 8192.

Note that for APR/native on Windows, the configured value will be reduced to the highest multiple of 1024 that is less than or equal to maxConnections. This is done for performance reasons. If set to a value of -1, the maxConnections feature is disabled and connections are not counted.

这里就是tomcat对于连接数的一个控制,即最大连接数限制。一旦发现当前连接数已经超过了一定的数量(NIO默认是10000,BIO是200与线程池最大线程数密切相关),上述的Acceptor线程就被阻塞了,即不再执行ServerSocketChannel的accept方法从队列中获取已经建立的连接。但是它并不阻止新的连接的建立,新的连接的建立过程不是Acceptor控制的,Acceptor仅仅是从队列中获取新建立的连接。所以当连接数已经超过maxConnections后,仍然是可以建立新的连接的,存放在上述acceptCount大小的队列中,这个队列里面的连接没有被Acceptor获取,就处于连接建立了但是不被处理的状态。当连接数低于maxConnections之后,Acceptor线程就不再阻塞,继续调用ServerSocketChannel的accept方法从acceptCount大小的队列中继续获取新的连接,之后就开始处理这些新的连接的IO事件了

文档描述如下

The maximum number of request processing threads to be created by this Connector, which therefore determines the maximum number of simultaneous requests that can be handled. If not specified, this attribute is set to 200. If an executor is associated with this connector, this attribute is ignored as the connector will execute tasks using the executor rather than an internal thread pool.

这个简单理解就算是上述worker的线程数,下面会详细的说明。他们专门用于处理IO事件,默认是200。

上面参数仅仅是简单了解了下参数配置,下面我们就来详细研究下tomcat的NIO服务器具体情况,这就要详细了解下tomcat的NioEndpoint实现了

先来借鉴看下 tomcat高并发场景下的BUG排查 中的一张图

这张图勾画出了NioEndpoint的大致执行流程图,worker线程并没有体现出来,它是作为一个线程池不断的执行IO读写事件即SocketProcessor(一个Runnable),即这里的Poller仅仅监听Socket的IO事件,然后封装成一个个的SocketProcessor交给worker线程池来处理。下面我们来详细的介绍下NioEndpoint中的Acceptor、Poller、SocketProcessor

获取指定的Acceptor数量的线程

可以看到就是一个while循环,循环里面不断的accept新的连接。

先来看下在accept新的连接之前,首选进行连接数的自增,即countUpOrAwaitConnection

当我们设置maxConnections=-1的时候就表示不用限制最大连接数。默认是限制10000,如果不限制则一旦出现大的冲击,则tomcat很有可能直接挂掉,导致服务停止。

这里的需求就是当前连接数一旦超过最大连接数maxConnections,就直接阻塞了,一旦当前连接数小于最大连接数maxConnections,就不再阻塞,我们来看下这个功能的具体实现latch.countUpOrAwait()

具体看这个需求无非就是一个共享锁,来看具体实现:

目前实现里算是使用了2个锁,LimitLatch本身的AQS实现再加上AtomicLong的AQS实现。也可以不使用AtomicLong来实现。

共享锁的tryAcquireShared实现中,如果不依托AtomicLong,则需要进行for循环加CAS的自增,自增之后没有超过limit这里即maxConnections,则直接返回1表示获取到了共享锁,如果一旦超过limit则首先进行for循环加CAS的自减,然后返回-1表示获取锁失败,便进入加入同步队列进入阻塞状态。

共享锁的tryReleaseShared实现中,该方法可能会被并发执行,所以释放共享锁的时候也是需要for循环加CAS的自减

上述的for循环加CAS的自增、for循环加CAS的自减的实现全部被替换成了AtomicLong的incrementAndGet和decrementAndGet而已。

上文我们关注的latch.countUpOrAwait()方法其实就是在获取一个共享锁,如下:

从上面可以看到在真正获取一个连接之前,首先是把连接计数先自增了。一旦TCP三次握手成功连接建立,就能从ServerSocketChannel的accept方法中获取到新的连接了。一旦获取连接或者处理过程发生异常则需要将当前连接数自减的,否则会造成连接数虚高,即当前连接数并没有那么多,但是当前连接数却很大,一旦超过最大连接数,就导致其他请求全部阻塞,没有办法被ServerSocketChannel的accept处理。该bug在Tomcat7.0.26版本中出现了,详细见这里的一篇文章 Tomcat7.0.26的连接数控制bug的问题排查

然后我们来看下,一个SocketChannel连接被accept获取之后如何来处理的呢?

处理过程如下:

下面就来详细介绍下Poller

前面没有说到Poller的数量控制,来看下

如果不设置的话最大就是2

来详细看下getPoller0().register(channel):

就是轮训一个Poller来进行SocketChannel的注册

这里又是进行一些参数包装,将socket和Poller的关系绑定,再次从缓存中取出或者重新构建一个PollerEvent,然后将该event放到Poller的事件队列中等待被异步处理

在Poller的run方法中不断处理上述事件队列中的事件,直接执行PollerEvent的run方法,将SocketChannel注册到自己的Selector上。

并将Selector监听到的IO读写事件封装成SocketProcessor,交给线程池执行

我们来看看这个线程池的初始化:

就是创建了一个ThreadPoolExecutor,那我们就重点关注下核心线程数、最大线程数、任务队列等信息

核心线程数最大是10个,再来看下最大线程数

默认就是上面的配置参数maxThreads为200。还有就是TaskQueue,这里的TaskQueue是LinkedBlockingQueue<Runnable>的子类,最大容量就是Integer.MAX_VALUE,根据之前ThreadPoolExecutor的源码分析,核心线程数满了之后,会先将任务放到队列中,队列满了才会创建出新的非核心线程,如果队列是一个大容量的话,也就是不会到创建新的非核心线程那一步了。

但是这里的TaskQueue修改了底层offer的实现

这里当线程数小于最大线程数的时候就直接返回false即入队列失败,则迫使ThreadPoolExecutor创建出新的非核心线程。

TaskQueue这一块没太看懂它的意图是什么,有待继续研究。

本篇文章描述了tomcat8.5中的NIO线程模型,以及其中涉及到的相关参数的设置。

NIO是非阻塞IO,传统的 BIO 是每次request服务器都会分配一个线程,所以,当某个线程发生IO阻塞的时候,该线程就无法充分利用。而NIO则是服务器会不断的轮询每一个client发送的request,如果有N个可读状态的request,OK,那服务器就会分配N个线程去读,所谓非阻塞就是这个意思。


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/115098.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-03-12
下一篇2023-03-12

发表评论

登录后才能评论

评论列表(0条)

    保存