Netty 源码分析(三)

Netty 源码分析(三)

版本 4.1.15

官网:https://netty.io/

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

addLast 方法

io.netty.channel.DefaultChannelPipeline#addLast

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx); // 是把 ChannelHandlerContext 添加进去

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

AbstractChannelHandlerContext 定义了一个上下文,找到实现的一个接口 ChannelHandlerContext

io.netty.channel.ChannelHandlerContext

文档:https://netty.io/5.0/api/io/netty/channel/ChannelHandlerContext.html

接下来我分析一下 ChannelHandlerContextPipeLineHandler 这三者的关系

这篇文章写的很清楚

https://blog.csdn.net/u010853261/article/details/54574440

ChannelHandlerContext

每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的

下图显示了ChannelHandlerContextChannelHandlerChannelPipeline的关系:

最后我们看到

private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

我们的双向链表链表维护的是 ChannelHandlerContext 对象,而ChannelHandlerContext 包装了 ChannelHandler

我们回到 addLast 方法上

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

进入 ChannelInitializer 类中,我们看 #initChannel 方法,说这个方法当 Channel 注册时会被调用,一旦掉用完就会被移除 ChannelPipeline,这是因为只需要把里面封装的 Handler 添加到 ChannelPipeline,因为他本身就不一个 Handler

io.netty.channel.ChannelInitializer
protected abstract void initChannel(C ch)

This method will be called once the Channel was registered. After the method returns this instance will be removed from the ChannelPipeline of the Channel.

下面是移除代码

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}

private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}

ChannelHandlerContext.attr(..) == Channel.attr(..)

https://netty.io/wiki/new-and-noteworthy-in-4.1.html

Both Channel and ChannelHandlerContext implement the interface AttributeMap to enable a user to attach one or more user-defined attributes to them. What sometimes made a user confused was that a Channel and a ChannelHandlerContext had its own storage for the user-defined attributes. For example, even if you put an attribute ‘KEY_X’ via Channel.attr(KEY_X).set(valueX), you will never find it via ChannelHandlerContext.attr(KEY_X).get() and vice versa. This behavior is not only confusing but also is waste of memory.

To address this issue, we decided to keep only one map per Channel internally. AttributeMap always uses AttributeKey as its key. AttributeKey ensures uniqueness between each key, and thus there’s no point of having more than one attribute map per Channel. As long as a user defines its own AttributeKey as a private static final field of his or her ChannelHandler, there will be no risk of duplicate keys.

注意:现在这两个关联的是一个Map

callHandlerCallbackLater

我们回到 #addLast 方法上,这个时候是还没有注册的,进入这个 #callHandlerCallbackLater 方法,把稍后调用 Handler 回调,封装成一个 task

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

注册

我们回到io.netty.bootstrap.AbstractBootstrap#initAndRegister

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

前面的初始化初始化已经有一点的了解,现在我来看注册,这里有#config#group#register 这三个方法,我们一个一个分析

ChannelFuture regFuture = config().group().register(channel);

config 方法

/**
* Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
* of the bootstrap.
*/
public abstract AbstractBootstrapConfig<B, C> config();

返回了一个 ServerbootstrapConfig 对象

group 方法

/**
* Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet.
*/
@SuppressWarnings("deprecation")
public final EventLoopGroup group() {
return bootstrap.group();
}

返回一个 NioEventLoopGroup 对象,这个时候返回的是一个调用的是他的父类MultithreadEventLoopGroup

register 方法

io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)

最终会调用 io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) 的注册方法

我们来看看这个类

io.netty.channel.SingleThreadEventLoop

io.netty.channel
public abstract class SingleThreadEventLoop
extends SingleThreadEventExecutor
implements EventLoop
Abstract base class for EventLoops that execute all its submitted tasks in a single thread.

io.netty.channel.AbstractChannel.AbstractUnsafe#register

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;
// 如果是当前线程就让它执行
if (eventLoop.inEventLoop()) {
register0(promise);
// 如果不是的话就放到线程池中注册
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

先理解一下线程 Netty 中的线程模型

  • 一个 EventLoopGroup 当中会包含多个 EventLoop
  • 一个 EventLoop 在它的整个生命周期当中都只会与唯一一个 Thread 进行绑定
  • 所有 EventLoop 所处理的各种 I/O 事件都是将在他所关联的那个 Thread 上进行处理
  • 一个 Channel 在它的整个生命周期中只会注册在一个 EventLoop
  • 一个 EventLoop 在运行过程中,会被分配给一或者多个 Channel

重要结论:

  1. 在Netty 中 Channel 的实现是线程安全的,基于此,我们可以存储一个 Channel 的引用,并且在需要向远程端点发送数据时,通过这个引用来调用 Channel 相应的方法,即便是当时有很多线程都在使用它也不会出现多线程的问题,而且消息一点会按照这个顺序发送出去
  2. 我们在业务开发中,不要将执行耗时的任务放入到 EventLoop 的执行队列中,因为它会堵塞该线程的所有Channel 上的其它执行任务,如果我们需要进行阻塞调用或则是耗时操作,那么我们需要使用一个专门的EventExectutor(业务线程池)

通常会有两种实现方式:

  1. 在 ChannelHandler 的回调方法中,使用自己定义的业务线程池,这样就可以实现异步调用
  2. 借助于 Netty 提供的向 ChannelPipeline 添加ChannelHandler是调用的addLast方法来传递 EventExecutorGroup

说明:如果addLast(handler)的方法是由I/O线程所执行的,如果addLast(eventExectutorGroup, handler)的方法,那么就是由参数中的group的线程组来执行

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); // 这个方法
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

io.netty.channel.nio.AbstractNioChannel#doRegister

看到 doXxx 开头的方法就知道是认真工作的

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

与我们前面写的 NIO 逻辑是一样的

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

sync

public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, false)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyServerinitializer());
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

我们回到我们编写的 Server 中,需要绑定,之后需要调用 #sync 表示这个方法需要同步,要不然还没绑定完成就返回了 ChannelFuture ,里面的结果或者状态是还没有完成的,加了 #sync 就能保证完成

ChannelFuture channelFuture = bootstrap.bind(8899).sync();

在我们正常开发是流程就会停在下面,就卡住了

channelFuture.channel().closeFuture().sync();

当我们调用关闭就会到 finally 中,会执行优雅关闭

到此我们启动过程基本分析完了

Comments