Netty 源码分析(三)
版本 4.1.15
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
addLast 方法
io.netty.channel.DefaultChannelPipeline#addLast
|
AbstractChannelHandlerContext
定义了一个上下文,找到实现的一个接口 ChannelHandlerContext
io.netty.channel.ChannelHandlerContext
文档:https://netty.io/5.0/api/io/netty/channel/ChannelHandlerContext.html
接下来我分析一下 ChannelHandlerContext
,PipeLine
,Handler
这三者的关系
这篇文章写的很清楚
https://blog.csdn.net/u010853261/article/details/54574440
ChannelHandlerContext
每个ChannelHandler
被添加到ChannelPipeline
后,都会创建一个ChannelHandlerContext
并与之创建的ChannelHandler
关联绑定。ChannelHandlerContext
允许ChannelHandler
与其他的ChannelHandler
实现进行交互。ChannelHandlerContext
不会改变添加到其中的ChannelHandler,因此它是安全的
下图显示了ChannelHandlerContext
、ChannelHandler
、ChannelPipeline
的关系:
最后我们看到
private void addLast0(AbstractChannelHandlerContext newCtx) { |
我们的双向链表链表维护的是 ChannelHandlerContext
对象,而ChannelHandlerContext
包装了 ChannelHandler
我们回到 addLast
方法上
p.addLast(new ChannelInitializer<Channel>() { |
进入 ChannelInitializer
类中,我们看 #initChannel
方法,说这个方法当 Channel
注册时会被调用,一旦掉用完就会被移除 ChannelPipeline
,这是因为只需要把里面封装的 Handler 添加到 ChannelPipeline
,因为他本身就不一个 Handler
io.netty.channel.ChannelInitializer
protected abstract void initChannel(C ch)This method will be called once the Channel was registered. After the method returns this instance will be removed from the ChannelPipeline of the Channel.
下面是移除代码
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { |
ChannelHandlerContext.attr(..) == Channel.attr(..)
https://netty.io/wiki/new-and-noteworthy-in-4.1.html
Both
Channel
andChannelHandlerContext
implement the interfaceAttributeMap
to enable a user to attach one or more user-defined attributes to them. What sometimes made a user confused was that aChannel
and aChannelHandlerContext
had its own storage for the user-defined attributes. For example, even if you put an attribute ‘KEY_X’ viaChannel.attr(KEY_X).set(valueX)
, you will never find it viaChannelHandlerContext.attr(KEY_X).get()
and vice versa. This behavior is not only confusing but also is waste of memory.To address this issue, we decided to keep only one map per
Channel
internally.AttributeMap
always usesAttributeKey
as its key.AttributeKey
ensures uniqueness between each key, and thus there’s no point of having more than one attribute map perChannel
. As long as a user defines its ownAttributeKey
as a private static final field of his or herChannelHandler
, there will be no risk of duplicate keys.
注意:现在这两个关联的是一个Map
callHandlerCallbackLater
我们回到 #addLast
方法上,这个时候是还没有注册的,进入这个 #callHandlerCallbackLater
方法,把稍后调用 Handler 回调,封装成一个 task
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { |
注册
我们回到io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() { |
前面的初始化初始化已经有一点的了解,现在我来看注册,这里有#config
,#group
和 #register
这三个方法,我们一个一个分析
ChannelFuture regFuture = config().group().register(channel); |
config 方法
/** |
返回了一个 ServerbootstrapConfig
对象
group 方法
/** |
返回一个 NioEventLoopGroup
对象,这个时候返回的是一个调用的是他的父类MultithreadEventLoopGroup
的
register 方法
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
最终会调用 io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) 的注册方法
我们来看看这个类
io.netty.channel.SingleThreadEventLoop
io.netty.channel
public abstract class SingleThreadEventLoop
extends SingleThreadEventExecutor
implements EventLoop
Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
io.netty.channel.AbstractChannel.AbstractUnsafe#register
|
先理解一下线程 Netty 中的线程模型
- 一个
EventLoopGroup
当中会包含多个EventLoop
- 一个
EventLoop
在它的整个生命周期当中都只会与唯一一个 Thread 进行绑定 - 所有
EventLoop
所处理的各种 I/O 事件都是将在他所关联的那个Thread
上进行处理 - 一个
Channel
在它的整个生命周期中只会注册在一个EventLoop
上 - 一个
EventLoop
在运行过程中,会被分配给一或者多个Channel
重要结论:
- 在Netty 中 Channel 的实现是线程安全的,基于此,我们可以存储一个 Channel 的引用,并且在需要向远程端点发送数据时,通过这个引用来调用 Channel 相应的方法,即便是当时有很多线程都在使用它也不会出现多线程的问题,而且消息一点会按照这个顺序发送出去
- 我们在业务开发中,不要将执行耗时的任务放入到 EventLoop 的执行队列中,因为它会堵塞该线程的所有Channel 上的其它执行任务,如果我们需要进行阻塞调用或则是耗时操作,那么我们需要使用一个专门的EventExectutor(业务线程池)
通常会有两种实现方式:
- 在 ChannelHandler 的回调方法中,使用自己定义的业务线程池,这样就可以实现异步调用
- 借助于 Netty 提供的向 ChannelPipeline 添加ChannelHandler是调用的addLast方法来传递 EventExecutorGroup
说明:如果addLast(handler)的方法是由I/O线程所执行的,如果addLast(eventExectutorGroup, handler)的方法,那么就是由参数中的group的线程组来执行
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) { |
io.netty.channel.nio.AbstractNioChannel#doRegister
看到 doXxx 开头的方法就知道是认真工作的
|
与我们前面写的 NIO 逻辑是一样的
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); |
sync
public class MyServer { |
我们回到我们编写的 Server 中,需要绑定,之后需要调用 #sync
表示这个方法需要同步,要不然还没绑定完成就返回了 ChannelFuture
,里面的结果或者状态是还没有完成的,加了 #sync
就能保证完成
ChannelFuture channelFuture = bootstrap.bind(8899).sync(); |
在我们正常开发是流程就会停在下面,就卡住了
channelFuture.channel().closeFuture().sync(); |
当我们调用关闭就会到 finally 中,会执行优雅关闭
到此我们启动过程基本分析完了