Netty 源码分析(二)

版本 4.1.15

官网:https://netty.io/

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

先来看一个NIO网络编程

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* @Author: cuzz
* @Date: 2019/1/7 15:39
* @Description:
*/
public class NioServer {
// 储存客户端连接
private static Map<String, SocketChannel> clientMap = new HashMap<>();

public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));

Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(selectionKey -> {
try {
if (selectionKey.isAcceptable()) { // 可以读
read(selector, selectionKey);
} else if (selectionKey.isReadable()) { // 可以写
write(selector, selectionKey);
}
} catch (IOException e) {
e.printStackTrace();
}
});
selectionKeys.clear(); // 别忘了清空
} catch (Exception e) {
e.printStackTrace();
}
}

}

private static void write(Selector selector, SelectionKey selectionKey) throws IOException{
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
int read = client.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
Charset charset = Charset.forName("utf-8");
String receiveMessage = String.valueOf(charset.decode(byteBuffer).array());
System.out.println(client + ": " + receiveMessage);

String key = null;
for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
if (entry.getValue() == client) {
key = entry.getKey();
break;
}
}
for (Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {
SocketChannel value = entry.getValue();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((key + " :" + receiveMessage).getBytes());
writeBuffer.flip();
value.write(writeBuffer);
}
}
}

private static void read(Selector selector, SelectionKey selectionKey) throws IOException{
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
System.out.println(server);
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
String key = UUID.randomUUID().toString();
// 保存客户端
clientMap.put(key, client);
}
}

客服端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* @Author: cuzz
* @Date: 2019/1/8 17:10
* @Description:
*/
public class NioClient {
public static void main(String[] args){
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("127.0.0.1",8899));

while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();

for (SelectionKey selectionKey : selectionKeys ) {
if (selectionKey.isConnectable()) {
SocketChannel client = (SocketChannel) selectionKey.channel();
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println(client);
ByteBuffer writeBuffer = ByteBuffer.allocate(512);
writeBuffer.put((LocalDateTime.now() + " 连接成功").getBytes());
writeBuffer.flip();
client.write(writeBuffer);

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
while (true) {
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader bf = new BufferedReader(inputStreamReader);
String message = bf.readLine();
ByteBuffer buffer = ByteBuffer.allocate(512);
buffer.put(message.getBytes());
buffer.flip();
client.write(buffer);
}
});
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = client.read(byteBuffer);
if (read > 0) {
String message = new String(byteBuffer.array());
System.out.println(message);
}
}
}
selectionKeys.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

代码还是比较复杂的,Netty 内部就是把这些细节给封装起来了

Reactor模式

翻译过来为反应器模式,可以先看看由 Doug Lea 写的 Scalable IO in Java ,更好的理解 Netty 的设计模式

还有一篇博客也写得很好,介绍相关理论模型,使用场景,基本组件、整体架构,

这可能是目前最透彻的Netty原理架构解析

Netty 那些事儿 ——— Reactor模式详解

Netty Reactor 工作架构图

reactor

bind() 方法

前面通过 .channel(NioServerSocketChannel.class) 是为了通过反射创建一个 NioServerSocketChannel 对象

NioServerSocketChannel

使用反射创建 NioServerSocketChannel 肯定是通过无参数构造器,在调用 newSocket(DEFAULT_SELECTOR_PROVIDER) 所以这是一个静态方法,返回一个 ServerSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses
* NIO selector based implementation to accept new connections.
*/
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {

private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

private final ServerSocketChannelConfig config;

/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

/**
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
...
}

AbstractNioChannel

我们回到调用的这个构造方法上

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

一直调用父类,把 SelectionKey.OP_ACCEPT 设置上,还有设置非堵塞,是不出是很熟悉,这都是对 NIO 进行封装

io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel

1
2
3
4
5
6
7
8
9
10
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}

再调用父类,就是设置 Id 和创建管道

io.netty.channel.AbstractChannel

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

NioServerSocketChannelConfig

我们在回到这个构造方法上,我们重点来看看这个, NioServerSocketChannelConfig 这是一个配置类,Netty 的各种各样的信息都是体现在这个里面

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

把自己和刚开始创建的 NIOSocketChannelServerSocket 对象传入进去

io.netty.channel.DefaultChannelConfig

1
2
3
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}

传了一个 AdaptiveRecvByteBufAllocator 翻译过来可以叫可适配的接受字节缓冲适配器

AdaptiveRecvByteBufAllocator

io.netty.channel.AdaptiveRecvByteBufAllocator

文档:

The RecvByteBufAllocator that automatically increases and decreases the predicted buffer size on feed back.
It gradually increases the expected number of readable bytes if the previous read fully filled the allocated buffer. It gradually decreases the expected number of readable bytes if the read operation was not able to fill a certain amount of the allocated buffer two times consecutively. Otherwise, it keeps returning the same prediction.

构造方法,默认是1024,最小是63,最大是65536

1
2
3
4
5
6
7
8
/**
* Creates a new predictor with the default parameters. With the default
* parameters, the expected buffer size starts from {@code 1024}, does not
* go down below {@code 64}, and does not go up above {@code 65536}.
*/
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}

我们在看看里面的内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final class HandleImpl extends MaxMessageHandle {
private final int minIndex;
private final int maxIndex;
private int index;
private int nextReceiveBufferSize;
private boolean decreaseNow;

public HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;

index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}

@Override
public int guess() {
return nextReceiveBufferSize;
}

private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}

@Override
public void readComplete() {
record(totalBytesRead());
}
}

其父亲 MaxMessageHandle 中,根据记录中的分配,计算出下一次分配的内存

1
2
3
4
@Override
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}

根据系统的支持返回是堆内内存还是堆外内存

1
2
3
4
5
6
7
@Override
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) {
return directBuffer(initialCapacity);
}
return heapBuffer(initialCapacity);
}

Pipeline

我们回到前面管道的创建

io.netty.channel.AbstractChannel

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline

1
2
3
4
5
6
7
8
9
10
11
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

这里维护了一个上下文,并且把 Channel 对象赋值给自己,所以 ChannelPipeline 是相互引用的

ChannelPipeline

io.netty.channel.ChannelPipeline

文档:

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel. ChannelPipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the ChannelHandlers in a pipeline interact with each other.

Creation of a pipeline

Each channel has its own pipeline and it is created automatically when a new channel is created.

How an event flows in a pipeline

The following diagram describes how I/O events are processed by ChannelHandlers in a ChannelPipeline typically. An I/O event is handled by either a ChannelInboundHandler or a ChannelOutboundHandler and be forwarded to its closest handler by calling the event propagation methods defined in ChannelHandlerContext, such as ChannelHandlerContext.fireChannelRead(Object) and ChannelHandlerContext.write(Object).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
                                               I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+

An inbound event is handled by the inbound handlers in the bottom-up direction as shown on the left side of the diagram. An inbound handler usually handles the inbound data generated by the I/O thread on the bottom of the diagram. The inbound data is often read from a remote peer via the actual input operation such as SocketChannel.read(ByteBuffer). If an inbound event goes beyond the top inbound handler, it is discarded silently, or logged if it needs your attention.

An outbound event is handled by the outbound handler in the top-down direction as shown on the right side of the diagram. An outbound handler usually generates or transforms the outbound traffic such as write requests. If an outbound event goes beyond the bottom outbound handler, it is handled by an I/O thread associated with the Channel. The I/O thread often performs the actual output operation such as SocketChannel.write(ByteBuffer)

For example, let us assume that we created the following pipeline:

1
2
3
4
5
6
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());

In the example above, the class whose name starts with Inbound means it is an inbound handler. The class whose name starts with Outbound means it is a outbound handler.

In the given example configuration, the handler evaluation order is 1, 2, 3, 4, 5 when an event goes inbound. When an event goes outbound, the order is 5, 4, 3, 2, 1. On top of this principle, ChannelPipeline skips the evaluation of certain handlers to shorten the stack depth:

  • 3 and 4 don’t implement ChannelInboundHandler, and therefore the actual evaluation order of an inbound event will be: 1, 2, and 5.
  • 1 and 2 don’t implement ChannelOutboundHandler, and therefore the actual evaluation order of a outbound event will be: 5, 4, and 3.
  • If 5 implements both ChannelInboundHandler and ChannelOutboundHandler, the evaluation order of an inbound and a outbound event could be 125 and 543 respectively.

Forwarding an event to the next handler

As you might noticed in the diagram shows, a handler has to invoke the event propagation methods in ChannelHandlerContext to forward an event to its next handler. Those methods include:

  • Inbound event propagation methods
    • ChannelHandlerContext.fireChannelRegistered()
    • hannelHandlerContext.fireChannelActive()
    • ChannelHandlerContext.fireChannelRead(Object)
    • ChannelHandlerContext.fireChannelReadComplete()
    • ChannelHandlerContext.fireExceptionCaught(Throwable)
    • ChannelHandlerContext.fireUserEventTriggered(Object)
    • ChannelHandlerContext.fireChannelWritabilityChanged()
    • ChannelHandlerContext.fireChannelInactive()
    • ChannelHandlerContext.fireChannelUnregistered()
  • Outbound event propagation methods:
    • ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
    • ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
    • ChannelHandlerContext.write(Object, ChannelPromise)
    • ChannelHandlerContext.flush()
    • ChannelHandlerContext.read()
    • ChannelHandlerContext.disconnect(ChannelPromise)
    • ChannelHandlerContext.close(ChannelPromise)
    • ChannelHandlerContext.deregister(ChannelPromise)

and the following example shows how the event propagation is usually done:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}

public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}

Building a pipeline (重点)

A user is supposed to have one or more ChannelHandlers in a pipeline to receive I/O events (e.g. read) and to request I/O operations (e.g. write and close). For example, a typical server will have the following handlers in each channel’s pipeline, but your mileage may vary depending on the complexity and characteristics of the protocol and business logic:

  • Protocol Decoder - translates binary data (e.g. ByteBuf) into a Java object.
  • Protocol Encoder - translates a Java object into binary data.
  • Business Logic Handler - performs the actual business logic (e.g. database access).

and it could be represented as shown in the following example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());

// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());

注:可以使用重载这个方法添加一个事件循环组 group 去执行耗时的任务,获取在 MyBusinessLogicHandler 中把耗时部分异步处理,这样就不会堵塞 IO 线程

Thread safety

A ChannelHandler can be added or removed at any time because a ChannelPipeline is thread safe. For example, you can insert an encryption handler when sensitive information is about to be exchanged, and remove it after the exchange.

对于传统的过滤器如 SpringMVC 比如我们配置了 Filter1 Filter2 Filter3 过滤器,请求和返回都要经过滤器这3个过滤器,而管道可以选择的其中某些作为请求的过滤器,一些作为返回的过滤器,不一定要一样,入站的处理器专门处理入站的,出站的处理器专门处理出站的

init() 方法

io.netty.bootstrap.ServerBootstrap#init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
...
}

ChannelOption

类图

1547524182709

io.netty.channel
public class ChannelOption
extends AbstractConstant>

A ChannelOption allows to configure a ChannelConfig in a type-safe way. Which ChannelOption is supported depends on the actual implementation of ChannelConfig and may depend on the nature of the transport it belongs to.

Type parameters:

- the type of the value which is valid for the ChannelOption

ChannelOption <T> 主要维护 TCP/IP 的一些底层的设定,T 表示值的类型

ChannelOption 继承了 AbstractConstantAbstractConstant 有是 Constant 的一个基本的实现

io.netty.util.Constant

io.netty.util
public interface Constant>
extends Comparable

A singleton which is safe to compare via the == operator. Created and managed by ConstantPool.

Type parameters:

- the type of objects that this object may be compared to

我们可以知道这个常量是由 ConstantPool 来维持的,我看看他是怎么起作用的

io.netty.util.ConstantPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public abstract class ConstantPool<T extends Constant<T>> {

private final ConcurrentMap<String, T> constants = PlatformDependent.newConcurrentHashMap();

private final AtomicInteger nextId = new AtomicInteger(1);

/**
* Shortcut of {@link #valueOf(String) valueOf(firstNameComponent.getName() + "#" + secondNameComponent)}.
*/
public T valueOf(Class<?> firstNameComponent, String secondNameComponent) {
if (firstNameComponent == null) {
throw new NullPointerException("firstNameComponent");
}
if (secondNameComponent == null) {
throw new NullPointerException("secondNameComponent");
}

return valueOf(firstNameComponent.getName() + '#' + secondNameComponent);
}

/**
* Returns the {@link Constant} which is assigned to the specified {@code name}.
* If there's no such {@link Constant}, a new one will be created and returned.
* Once created, the subsequent calls with the same {@code name} will always return the previously created one
* (i.e. singleton.)
*
* @param name the name of the {@link Constant}
*/
public T valueOf(String name) {
checkNotNullAndNotEmpty(name);
return getOrCreate(name);
}

/**
* Get existing constant by name or creates new one if not exists. Threadsafe
*
* @param name the name of the {@link Constant}
*/
private T getOrCreate(String name) {
T constant = constants.get(name);
if (constant == null) {
final T tempConstant = newConstant(nextId(), name);
constant = constants.putIfAbsent(name, tempConstant);
if (constant == null) {
return tempConstant;
}
}

return constant;
}

/**
* Returns {@code true} if a {@link AttributeKey} exists for the given {@code name}.
*/
public boolean exists(String name) {
checkNotNullAndNotEmpty(name);
return constants.containsKey(name);
}

/**
* Creates a new {@link Constant} for the given {@code name} or fail with an
* {@link IllegalArgumentException} if a {@link Constant} for the given {@code name} exists.
*/
public T newInstance(String name) {
checkNotNullAndNotEmpty(name);
return createOrThrow(name);
}

/**
* Creates constant by name or throws exception. Threadsafe
*
* @param name the name of the {@link Constant}
*/
private T createOrThrow(String name) {
T constant = constants.get(name);
if (constant == null) {
final T tempConstant = newConstant(nextId(), name);
constant = constants.putIfAbsent(name, tempConstant);
if (constant == null) {
return tempConstant;
}
}

throw new IllegalArgumentException(String.format("'%s' is already in use", name));
}

private static String checkNotNullAndNotEmpty(String name) {
ObjectUtil.checkNotNull(name, "name");

if (name.isEmpty()) {
throw new IllegalArgumentException("empty name");
}

return name;
}

protected abstract T newConstant(int id, String name);

@Deprecated
public final int nextId() {
return nextId.getAndIncrement();
}
}

我们先看这个创建方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Get existing constant by name or creates new one if not exists. Threadsafe
*
* @param name the name of the {@link Constant}
*/
private T getOrCreate(String name) {
T constant = constants.get(name);
if (constant == null) {
final T tempConstant = newConstant(nextId(), name);
constant = constants.putIfAbsent(name, tempConstant);
if (constant == null) {
return tempConstant;
}
}

return constant;
}

这里使用了双重检验机制,这个常量池保存的值是一个 T extends Constant<T> 包装过后的

io.netty.util.ConstantPool#newConstant

新建一个常量是由子类完成的,我们回到 ChannelOption 类中,ChannelOption 是不保存值的,只是维护键的包装

AttributeKey

io.netty.util.AttributeKey

io.netty.util
public final class AttributeKey
extends AbstractConstant>

Key which can be used to access Attribute out of the AttributeMap. Be aware that it is not be possible to have multiple keys with the same name.

Type parameters:

- the type of the Attribute which can be accessed via this AttributeKey.

与 ChannelOption 很相似,AttributeMapAttributeKeyAttribute 相当一个 Map,key 和 value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Holds {@link Attribute}s which can be accessed via {@link AttributeKey}.
*
* Implementations must be Thread-safe.
*/
public interface AttributeMap {
/**
* Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
* an {@link Attribute} which does not have a value set yet.
*/
<T> Attribute<T> attr(AttributeKey<T> key);

/**
* Returns {@code} true if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
*/
<T> boolean hasAttr(AttributeKey<T> key);
}

主要维护的是业务数据,可以在程序运行中动态的往里面添加数据和获取数据

ChannelInitializer

io.netty.bootstrap.ServerBootstrap#init

回到 init 方法上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
void init(Channel channel) throws Exception {

ChannelPipeline p = channel.pipeline();

...

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

获取管道添加了一个 ChannelInitializer 实例,先看看这个类

io.netty.channel.ChannelInitializer

io.netty.channel
@Sharable
public abstract class ChannelInitializer
extends ChannelInboundHandlerAdapter

A special ChannelInboundHandler which offers an easy way to initialize a Channel once it was registered to its EventLoop. Implementations are most often used in the context of Bootstrap.handler(ChannelHandler) , ServerBootstrap.handler(ChannelHandler) and ServerBootstrap.childHandler(ChannelHandler) to setup the ChannelPipeline of a Channel.

1
2
3
4
5
6
7
8
9
10
11
>    public class MyChannelInitializer extends ChannelInitializer {
> public void initChannel(Channel channel) {
> channel.pipeline().addLast("myHandler", new MyHandler());
> }
> }
> ServerBootstrap bootstrap = ...;
> ...
> bootstrap.childHandler(new MyChannelInitializer());
> ...
>
>

>

Be aware that this class is marked as ChannelHandler.Sharable and so the implementation must be safe to be re-used.

Type parameters:

- A sub-type of Channel

addLast() 方法

io.netty.channel.DefaultChannelPipeline#addLast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

AbstractChannelHandlerContext 定义了一个上下文,找到实现的一个接口 ChannelHandlerContext

io.netty.channel.ChannelHandlerContext

文档:https://netty.io/5.0/api/io/netty/channel/ChannelHandlerContext.html

接下来我分析一下 ChannelHandlerContextPipeLineHandler 这三者的关系。

加油!!!

-------------本文结束感谢您的阅读-------------

本文标题:Netty 源码分析(二)

文章作者:cuzz

发布时间:2019年01月15日 - 22:01

最后更新:2019年08月12日 - 20:08

原始链接:http://blog.cuzz.site/2019/01/15/Netty 源码分析(二)/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

请博主吃包辣条
0%