Netty 源码分析(一)

Netty 源码分析(一)

版本 4.1.15

官网:https://netty.io/

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

先看一个例子

服务端

MyServer 类

/**
* @Author: cuzz
* @Date: 2019/1/1 19:44
* @Description:
*/
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyServerinitializer());
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}

MyServerinitializer 类

/**
* @Author: cuzz
* @Date: 2019/1/1 20:06
* @Description:
*/
public class MyServerinitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyServerHandler());

}
}

MyServerHandler 类

/**
* @Author: cuzz
* @Date: 2019/1/1 20:23
* @Description:
*/
public class MyServerHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + ": " + msg);
ctx.channel().writeAndFlush("from server: " + UUID.randomUUID());
}

/**
* 出现异常关闭连接
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客服端

MyClient 类

/**
* @Author: cuzz
* @Date: 2019/1/1 20:31
* @Description:
*/
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer());

ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

MyClientInitializer 类

/**
* @Author: cuzz
* @Date: 2019/1/1 20:40
* @Description:
*/
public class MyClientInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new MyClientHandler());
}
}

MyClientHandler 类

/**
* @Author: cuzz
* @Date: 2019/1/1 20:42
* @Description:
*/
public class MyClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + ": " + msg);
ctx.writeAndFlush("from clinet: " + UUID.randomUUID());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("来自客户端的连接!!!");
}
}

初始化

EventLoopGroup

创建一个 bossGroupworkGroup

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

EventLoopGroup 翻译过来叫事件循环组,其本身就是一个死循环

bossGroup 是把接受连接,把连接转发给 workGroupworkGroup 是真正完成用户请求处理的类

EventLoopGroup 是一个接口,在后面循环的过程中可以选择把 Channel 注册上

/**
* Special {@link EventExecutorGroup} which allows registering {@link Channel}s that get
* processed for later selection during the event loop.
*
*/
public interface EventLoopGroup extends EventExecutorGroup {

@Override
EventLoop next();

ChannelFuture register(Channel channel);

ChannelFuture register(ChannelPromise promise);
}

NioEventLoopGroup

// 他是一个基于NIO的选择器的对象 
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
// 0
public NioEventLoopGroup() {
this(0);
}
// 1
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
// 2
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
}

MultithreadEventExecutorGroup

最终会跳到MultithreadEventExecutorGroup 中的一个构造器中

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {

}
// 1
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

ThreadPerTaskExecutor

代码1中,executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());,跟进去

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

这里用到了工厂方法和命令模式,通过传入一个command调用工厂方法

Executor

public interface Executor {

/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}

这是在java.util.concurrent 下的一个接口,最主要的实现方式把一个task传入,新建一个线程运行

class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}

也可以通过一系列的限制,比如序列化等一下操作

class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;

SerialExecutor(Executor executor) {
this.executor = executor;
}

public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}

protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}

其中非常常用用的几个实现如ExecutorServiceThreadPoolExecutor

下面是官方文档

The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.The Executor implementations provided in this package implement ExecutorService, which is a more extensive interface. The ThreadPoolExecutor class provides an extensible thread pool implementation. The Executors class provides convenient factory methods for these Executors.

回顾一下 MyServer 中启动的代码

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyServerinitializer());
ChannelFuture channelFuture = bootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

ServerBootstrap

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... }

ServerBootstrapBootstrap子类,容易的地启动一个 ServerChannel

ServerChannel

接受一个即将到来的连接,创建子 Channel

/**
* A {@link Channel} that accepts an incoming connection attempt and creates
* its child {@link Channel}s by accepting them. {@link ServerSocketChannel} is
* a good example.
*/
public interface ServerChannel extends Channel {
// This is a tag interface.
}

其有很多实现的子类,其中 NioServerSocketChannel 是我们比较关注的

方法链

bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new MyServerinitializer());

.group(bossGroup, workGroup) 我们把 bossGroupworkGroup 传入进去,由于是方法链,肯定返回本身,跟踪下去

/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}

这个步,就是给 bossGroupworkGroup 赋值给 ServerBootstrap 的实例

.channel(NioServerSocketChannel.class) 方法,接受的是一个 class 对象,一般接受 class 对象大多数与反射有关系

/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

进入 channelFactory 方法

/**
* {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}

如果有无参数的构造方法推荐使用,这样可以简化代码

Q:为什么必须要有无参数构造方法呢?

A : 一般来说,获取一个实例如下生成,所以必须有无参数构造方法

Class class = Class.forName(className);
Object object = class.newInstance(); // 只能调用无参构造函数

我们在来看看 NioServerSocketChannel

A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses NIO selector based implementation to accept new connections.

.childHandler(new MyServerinitializer()); 设置用于请求的 Handler

/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}

这里其实有 handlerchildHandler 一个是给 bossGroup 使用的,一个是给 workGroup 使用的

启动

ChannelFuture channelFuture = bootstrap.bind(8899).sync();

ChannelFuture

ChannelFuture 先是继承了自己提供的 Future ,自身的 Future 又继承 java.util.concurrent.Future<V> ,我们先看看 JUC 中 FutureFutureTask

JUC.Future

看看其中几个主要的方法,从方法名也知道是做什么的

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

文档:

A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

使用:

interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...

void showSearch(final String target) throws InterruptedException {
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}
});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) {
cleanup();
return;
}
}
}

JUC.FutureTask

The FutureTask class is an implementation of Future that implements Runnable, and so may be executed by an Executor. For example, the above construction with submit could be replaced by:

FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);

可以通过 Executor 的实例去执行,最后再从 future 中获取

Netty.Future

public interface Future<V> extends java.util.concurrent.Future<V> {

boolean isSuccess();

boolean isCancellable();

Throwable cause();

/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

/**
* Removes the first occurrence of the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

/**
* Removes the first occurrence for each of the listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待Future完成
Future<V> sync() throws InterruptedException;

Future<V> syncUninterruptibly();

Future<V> await() throws InterruptedException;

Future<V> awaitUninterruptibly();

boolean await(long timeout, TimeUnit unit) throws InterruptedException;

boolean await(long timeoutMillis) throws InterruptedException;

boolean awaitUninterruptibly(long timeout, TimeUnit unit);

boolean awaitUninterruptibly(long timeoutMillis);

V getNow();

@Override
boolean cancel(boolean mayInterruptIfRunning);
}

我们主要看看 xxListener 方法,一后缀为 Listener 使用了观察者模式

它比 JUC.Future 更厉害的是就因为这个 Listener ,虽然 JUC.Future 可以调用 get() 方法,获取异步结果,但是我们不知道什么时候去调用,调用早了就堵塞在那里;而 Netty.Future 使用了观察者模式,当完成时会自动触发

ChannelFuture

我们回到 ChannelFuture ,都重写了 Netty.Future 中的方法,返回值是 Future 的子类,java5或者以前,必须一样,java7以后可以不同,但是必须是父类返回值的派生类

public interface ChannelFuture extends Future<Void> {

/**
* Returns a channel where the I/O operation associated with this
* future takes place.
*/
Channel channel();

@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

@Override
ChannelFuture sync() throws InterruptedException;

@Override
ChannelFuture syncUninterruptibly();

@Override
ChannelFuture await() throws InterruptedException;

@Override
ChannelFuture awaitUninterruptibly();

文档:

io.netty.channel
public interface ChannelFuture
extends Future

The result of an asynchronous Channel I/O operation.

All I/O operations in Netty are asynchronous. It means any I/O calls will return immediately with no guarantee that the requested I/O operation has been completed at the end of the call. Instead, you will be returned with a ChannelFuture instance which gives you the information about the result or status of the I/O operation.

A ChannelFuture is either uncompleted or completed. When an I/O operation begins, a new future object is created. The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled because the I/O operation is not finished yet. If the I/O operation is finished either successfully, with failure, or by cancellation, the future is marked as completed with more specific information, such as the cause of the failure. Please note that even failure and cancellation belong to the completed state.

                                     +---------------------------+
| Completed successfully |
+---------------------------+
+----> isDone() = true |
+--------------------------+ | | isSuccess() = true |
| Uncompleted | | +===========================+
+--------------------------+ | | Completed with failure |
| isDone() = false | | +---------------------------+
| isSuccess() = false |----+----> isDone() = true |
| isCancelled() = false | | | cause() = non-null |
| cause() = null | | +===========================+
+--------------------------+ | | Completed by cancellation |
| +---------------------------+
+----> isDone() = true |
| isCancelled() = true |
+---------------------------+

Various methods are provided to let you check if the I/O operation has been completed, wait for the completion, and retrieve the result of the I/O operation. It also allows you to add ChannelFutureListeners so you can get notified when the I/O operation is completed.

推荐使用监听器而不是等待的方法

// BAD - NEVER DO THIS
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.awaitUninterruptibly();
// Perform post-closure operation
// ...
}

// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
}

不要混淆连接超时和等待超时

// BAD - NEVER DO THIS
Bootstrap b = ...;
ChannelFuture f = b.connect(...);
f.awaitUninterruptibly(10, TimeUnit.SECONDS);
if (f.isCancelled()) {
// Connection attempt cancelled by user
} else if (!f.isSuccess()) {
// You might get a NullPointerException here because the future
// might not be completed yet.
f.cause().printStackTrace();
} else {
// Connection established successfully
}

// GOOD
Bootstrap b = ...;
// Configure the connect timeout option.
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
ChannelFuture f = b.connect(...);
f.awaitUninterruptibly();

// Now we are sure the future is completed.
assert f.isDone();

if (f.isCancelled()) {
// Connection attempt cancelled by user
} else if (!f.isSuccess()) {
f.cause().printStackTrace();
} else {
// Connection established successfully
}

bind()方法

当我们调用 bind 方法时,才真正的启动服务器

ChannelFuture channelFuture = bootstrap.bind(8899).sync();

通过一些判断最终到 doBind 方法上

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

initAndRegister()方法

这个主要是初始化和注册,比较复杂,后续在分析

加油!!!

Comments