Netty 源码分析(三)
版本 4.1.15
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
addLast 方法
io.netty.channel.DefaultChannelPipeline#addLast
|
AbstractChannelHandlerContext 定义了一个上下文,找到实现的一个接口 ChannelHandlerContext
io.netty.channel.ChannelHandlerContext
文档:https://netty.io/5.0/api/io/netty/channel/ChannelHandlerContext.html
接下来我分析一下 ChannelHandlerContext ,PipeLine,Handler 这三者的关系
这篇文章写的很清楚
https://blog.csdn.net/u010853261/article/details/54574440
ChannelHandlerContext
每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的
下图显示了ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系:

最后我们看到
private void addLast0(AbstractChannelHandlerContext newCtx) { |
我们的双向链表链表维护的是 ChannelHandlerContext 对象,而ChannelHandlerContext 包装了 ChannelHandler
我们回到 addLast 方法上
p.addLast(new ChannelInitializer<Channel>() { |
进入 ChannelInitializer 类中,我们看 #initChannel 方法,说这个方法当 Channel 注册时会被调用,一旦掉用完就会被移除 ChannelPipeline,这是因为只需要把里面封装的 Handler 添加到 ChannelPipeline,因为他本身就不一个 Handler
io.netty.channel.ChannelInitializer
protected abstract void initChannel(C ch)This method will be called once the Channel was registered. After the method returns this instance will be removed from the ChannelPipeline of the Channel.
下面是移除代码
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { |
ChannelHandlerContext.attr(..) == Channel.attr(..)
https://netty.io/wiki/new-and-noteworthy-in-4.1.html
Both
ChannelandChannelHandlerContextimplement the interfaceAttributeMapto enable a user to attach one or more user-defined attributes to them. What sometimes made a user confused was that aChanneland aChannelHandlerContexthad its own storage for the user-defined attributes. For example, even if you put an attribute ‘KEY_X’ viaChannel.attr(KEY_X).set(valueX), you will never find it viaChannelHandlerContext.attr(KEY_X).get()and vice versa. This behavior is not only confusing but also is waste of memory.To address this issue, we decided to keep only one map per
Channelinternally.AttributeMapalways usesAttributeKeyas its key.AttributeKeyensures uniqueness between each key, and thus there’s no point of having more than one attribute map perChannel. As long as a user defines its ownAttributeKeyas a private static final field of his or herChannelHandler, there will be no risk of duplicate keys.
注意:现在这两个关联的是一个Map
callHandlerCallbackLater

我们回到 #addLast 方法上,这个时候是还没有注册的,进入这个 #callHandlerCallbackLater 方法,把稍后调用 Handler 回调,封装成一个 task
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { |
注册
我们回到io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() { |
前面的初始化初始化已经有一点的了解,现在我来看注册,这里有#config,#group 和 #register 这三个方法,我们一个一个分析
ChannelFuture regFuture = config().group().register(channel); |
config 方法
/** |
返回了一个 ServerbootstrapConfig 对象
group 方法
/** |
返回一个 NioEventLoopGroup 对象,这个时候返回的是一个调用的是他的父类MultithreadEventLoopGroup的
register 方法
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
最终会调用 io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) 的注册方法
我们来看看这个类
io.netty.channel.SingleThreadEventLoop
io.netty.channel
public abstract class SingleThreadEventLoop
extends SingleThreadEventExecutor
implements EventLoop
Abstract base class for EventLoops that execute all its submitted tasks in a single thread.
io.netty.channel.AbstractChannel.AbstractUnsafe#register
|
先理解一下线程 Netty 中的线程模型
- 一个
EventLoopGroup当中会包含多个EventLoop - 一个
EventLoop在它的整个生命周期当中都只会与唯一一个 Thread 进行绑定 - 所有
EventLoop所处理的各种 I/O 事件都是将在他所关联的那个Thread上进行处理 - 一个
Channel在它的整个生命周期中只会注册在一个EventLoop上 - 一个
EventLoop在运行过程中,会被分配给一或者多个Channel
重要结论:
- 在Netty 中 Channel 的实现是线程安全的,基于此,我们可以存储一个 Channel 的引用,并且在需要向远程端点发送数据时,通过这个引用来调用 Channel 相应的方法,即便是当时有很多线程都在使用它也不会出现多线程的问题,而且消息一点会按照这个顺序发送出去
- 我们在业务开发中,不要将执行耗时的任务放入到 EventLoop 的执行队列中,因为它会堵塞该线程的所有Channel 上的其它执行任务,如果我们需要进行阻塞调用或则是耗时操作,那么我们需要使用一个专门的EventExectutor(业务线程池)
通常会有两种实现方式:
- 在 ChannelHandler 的回调方法中,使用自己定义的业务线程池,这样就可以实现异步调用
- 借助于 Netty 提供的向 ChannelPipeline 添加ChannelHandler是调用的addLast方法来传递 EventExecutorGroup
说明:如果addLast(handler)的方法是由I/O线程所执行的,如果addLast(eventExectutorGroup, handler)的方法,那么就是由参数中的group的线程组来执行
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) { |
io.netty.channel.nio.AbstractNioChannel#doRegister
看到 doXxx 开头的方法就知道是认真工作的
|
与我们前面写的 NIO 逻辑是一样的
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); |
sync
public class MyServer { |
我们回到我们编写的 Server 中,需要绑定,之后需要调用 #sync 表示这个方法需要同步,要不然还没绑定完成就返回了 ChannelFuture ,里面的结果或者状态是还没有完成的,加了 #sync 就能保证完成
ChannelFuture channelFuture = bootstrap.bind(8899).sync(); |
在我们正常开发是流程就会停在下面,就卡住了
channelFuture.channel().closeFuture().sync(); |
当我们调用关闭就会到 finally 中,会执行优雅关闭
到此我们启动过程基本分析完了


