Netty 源码分析(五)

Netty 源码分析(五)

版本 4.1.15

官网:https://netty.io/

Netty is an asynchronous event-driven network application framework  for rapid development of maintainable high performance protocol servers & clients.

ReferenceCounted

io.netty.util.ReferenceCounted

引用计数文档

public interface ReferenceCounted
A reference-counted object that requires explicit deallocation.
When a new ReferenceCounted is instantiated, it starts with the reference count of 1. retain() increases the reference count, and release() decreases the reference count. If the reference count is decreased to 0, the object will be deallocated explicitly, and accessing the deallocated object will usually result in an access violation.
If an object that implements ReferenceCounted is a container of other objects that implement ReferenceCounted, the contained objects will also be released via release() when the container’s reference count becomes 0.

我们看看其中的方法

public interface ReferenceCounted {
/**
* Returns the reference count of this object. If {@code 0}, it means this object has been deallocated.
*/
int refCnt();

/**
* Increases the reference count by {@code 1}.
*/
ReferenceCounted retain();

/**
* Increases the reference count by the specified {@code increment}.
*/
ReferenceCounted retain(int increment);

/**
* Records the current access location of this object for debugging purposes.
* If this object is determined to be leaked, the information recorded by this operation will be provided to you
* via {@link ResourceLeakDetector}. This method is a shortcut to {@link #touch(Object) touch(null)}.
*/
ReferenceCounted touch();

/**
* Records the current access location of this object with an additional arbitrary information for debugging
* purposes. If this object is determined to be leaked, the information recorded by this operation will be
* provided to you via {@link ResourceLeakDetector}.
*/
ReferenceCounted touch(Object hint);

/**
* Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
* {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release();

/**
* Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
* count reaches at {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
*/
boolean release(int decrement);
}

AbstractReferenceCountedByteBuf

io.netty.buffer.AbstractReferenceCountedByteBuf

我们先来看两个比较重要的方法,retain() 和 release() 方法

retain()

io.netty.buffer.AbstractReferenceCountedByteBuf#retain()

retain() 方法可以使引用计数加一

@Override
public ByteBuf retain() {
return retain0(1);
}

@Override
public ByteBuf retain(int increment) {
return retain0(checkPositive(increment, "increment"));
}

private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// 如果 refCnt = 0 的时候 nextCont = increment,就就应该被回收
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
// 这里使用到了自旋锁
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}

java.util.concurrent.atomic.AtomicIntegerFieldUpdater

public abstract class AtomicIntegerFieldUpdater
extends Object
A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes. This class is designed for use in atomic data structures in which several fields of the same node are independently subject to atomic updates.
Note that the guarantees of the compareAndSet method in this class are weaker than in other atomic classes. Because this class cannot ensure that all uses of the field are appropriate for purposes of atomic access, it can guarantee atomicity only with respect to other invocations of compareAndSet and set on the same updater.

AtomicIntegerFieldUpdater要点的总结:

  1. 更新器必须是int类型的变量,不能是其他包装类型
  2. 更新器的更新必须是volatile类型的变量,确保线程之间的共享变量时的立即可见性
  3. 变量不能是static的,必须是实例变量,因为Unsafe.objectFieldOffset() 方法不支持静态变量(CAS操作本质是通过对象实例的偏移来直接进行赋值)
  4. 更新器只能修改它可见范围内的变量,因为更新器是通过反射来得到这个变量,如果变量不可见就会报错

如果更新的变量时包装类型,那么可以使用AtomicReferenceFieldUpdater来进行更新

java.util.concurrent.atomic.AtomicIntegerFieldUpdater#compareAndSet

public abstract boolean compareAndSet(T obj,
int expect,
int update)
Atomically sets the field of the given object managed by this updater to the given updated value if the current value == the expected value. This method is guaranteed to be atomic with respect to other calls to compareAndSet and set, but not necessarily with respect to other changes in the field.
Parameters:
obj - An object whose field to conditionally set
expect - the expected value
update - the new value

一个不安全的更新

/**
* @Author: cuzz
* @Date: 2019/1/19 15:40
* @Description:
*/
public class AtomicUpdateTest {
public static void main(String[] args) {
Person person = new Person();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(person.age++ + " "); // 1 6 7 5 4 2 3 1 8 9
}).start();
}

}
}
class Person{
int age = 1;
}

使用AtomicIntegerFieldUpdater

/**
* @Author: cuzz
* @Date: 2019/1/19 15:40
* @Description:
*/
public class AtomicUpdateTest {
public static void main(String[] args) {
AtomicIntegerFieldUpdater<Person> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
Person person = new Person();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(fieldUpdater.getAndIncrement(person) + " "); // 1 4 3 2 5 6 7 10 9 8
}).start();
}
}
}
class Person{
volatile int age = 1;
}

大概有以下两种字段适合用Atomic*FieldUpdater:

大多数用到这个字段的代码是在读取字段的值, 但仍然有通过CAS更新字段值的需求. 这个时候用AtomicInteger的话每个直接读取这个字段的地方都要多一次.get()调用, 用volatile又满足不了需求, 所以就用到了AtomicIntegerFieldUpdater

这个字段所属的类会被创建大量的实例对象, 如果用AtomicInteger, 每个实例里面都要创建AtomicInteger对象, 从而多出内存消耗. 比如一个链表类的Node, 用AtomicReference保存next显然是不合适的.

原文:https://blog.csdn.net/u012415542/article/details/80646605

private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

Reference counted objects

引用计数文档:Reference counted objects

Netty 处理器

重要概念

  • Netty 的处理器可以分为两类:入站处理器和出站处理器
  • 入站处理器的顶层是 ChannelnboundHandler,出站处理器的顶层是 ChannelOutboundHandler
  • 数据处理时常用的各种解码器本质上都是处理器
  • 编解码器:无论我们向网络中写入的数据是什么类型,数据在网络中传递时,其都是以字节流的形式呈现,将数据有原本的字节流的操作成为编码(encode),将数据有字节转化为它原本的格式或是其它的操作成为解码(decode),编码统一称为(codec)
  • 编码:本质上是一种出站处理器,因此,编码一定是一种 ChannelOutboundHandler
  • 解码:本质上是一种入站处理器,因此,解码一定是一种 ChannelInboundHandler
  • 在 Netty 中,编码器通常以 xxxEncoder命名;解码器通常以xxxDecoder命名

编写一个Long类型的解码器

编写一个解码器在客服端与服务端传输一个 Long 型的数据,Netty 为我们提供了 ByteToMessageDecoder

io.netty.handler.codec.ByteToMessageDecoder

io.netty.handler.codec
public abstract class ByteToMessageDecoder
extends ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter which decodes bytes in a stream-like fashion from one ByteBuf to an other Message type. For example here is an implementation which reads all readable bytes from the input ByteBuf and create a new ByteBuf.
public class SquareDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List out)
throws Exception {
out.add(in.readBytes(in.readableBytes()));
}
}

Frame detection
Generally frame detection should be handled earlier in the pipeline by adding a DelimiterBasedFrameDecoder, FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder, or LineBasedFrameDecoder.
If a custom frame decoder is required, then one needs to be careful when implementing one with ByteToMessageDecoder. Ensure there are enough bytes in the buffer for a complete frame by checking ByteBuf.readableBytes(). If there are not enough bytes for a complete frame, return without modifying the reader index to allow more bytes to arrive.
To check for complete frames without modifying the reader index, use methods like ByteBuf.getInt(int). One MUST use the reader index when using methods like ByteBuf.getInt(int). For example calling in.getInt(0) is assuming the frame starts at the beginning of the buffer, which is not always the case. Use in.getInt(in.readerIndex()) instead.
Pitfalls
Be aware that sub-classes of ByteToMessageDecoder MUST NOT annotated with @Sharable.
Some methods such as ByteBuf.readBytes(int) will cause a memory leak if the returned buffer is not released or added to the out List. Use derived buffers like ByteBuf.readSlice(int) to avoid leaking memory.

MyByteToLongDecoder

/**
* @Author: cuzz
* @Date: 2019/1/22 12:16
* @Description:
*/
public class MyByteToLongDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decode invoked!");
System.out.println(in.readableBytes());
if (in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}

MyLongToByteEncoder

/**
* @Author: cuzz
* @Date: 2019/1/22 12:23
* @Description:
*/
public class MyLongToByteEncoder extends MessageToByteEncoder<Long>{
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encoder invoked!");
System.out.println(msg);
out.writeLong(msg);
}
}

重要结论:

  1. 无论是编码器还是解码器,其所接收的消息类型必须要与待处理的参数保持一致,否则该编码器或则解码器不会被执行
  2. 在解码器进行数据解码时,一定要记得判断缓冲(ByteBuf)中的数据是否足够,否则将会产生一些问题

ReplayingDecoder

文档:https://netty.io/4.1/api/io/netty/handler/codec/ReplayingDecoder.html

如果我们使用这个继承这个编码器,他会自动帮我判断是否可读,代码也简单,简化了我们的判断

public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 decode invoked!");
out.add(in.readLong());
}
}

LengthFieldBasedFrameDecoder

io.netty.handler.codec.LengthFieldBasedFrameDecoder

文档:https://netty.io/4.1/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

这是一个常用语自定义协议的解码器

TCP 粘包拆包

如果我写的自定义协议没有对粘包和拆包做特殊处理的话就会产生粘包和拆包现象

粘包、拆包发生原因
发生TCP粘包或拆包有很多原因,现列出常见的几点,可能不全面,欢迎补充,
1、要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
2、待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
3、要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
4、接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
粘包、拆包解决办法
通过以上分析,我们清楚了粘包或拆包发生的原因,那么如何解决这个问题呢?解决问题的关键在于如何给每个数据包添加边界信息,常用的方法有如下几个:
1、发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。
2、发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。
3、可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。

作者:wxy941011
来源:CSDN
原文:https://blog.csdn.net/wxy941011/article/details/80428470
版权声明:本文为博主原创文章,转载请附上博文链接!

自定义协议解决粘包和拆包

一个 Person 协议类

/**
* @Author: cuzz
* @Date: 2019/1/22 16:00
* @Description: 这是一个关于 Person 的协议
*/
public class PersonProtocol {

private int length;

private byte[] content;

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

public byte[] getContent() {
return content;
}

public void setContent(byte[] content) {
this.content = content;
}
}

解码处理器

/**
* @Author: cuzz
* @Date: 2019/1/22 16:04
* @Description:
*/
public class MyPersonDecoder extends ReplayingDecoder<Void>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyPersonDecoder decode invoked!");
// Gets a 32-bit integer at the current {@code readerIndex}
// and increases the {@code readerIndex} by {@code 4} in this buffer.
int length = in.readInt();
byte[] content = new byte[length];
// Transfers this buffer's data to the specified destination starting at
// the current {@code readerIndex} and increases the {@code readerIndex}
// by the number of the transferred bytes (= {@code dst.length}
in.readBytes(content);

// 把内容添加到协议中
PersonProtocol personProtocol = new PersonProtocol();
personProtocol.setLength(length);
personProtocol.setContent(content);

out.add(personProtocol);
}
}

编码处理器

/**
* @Author: cuzz
* @Date: 2019/1/22 16:12
* @Description:
*/
public class MyPersonEncoder extends MessageToByteEncoder<PersonProtocol>{

@Override
protected void encode(ChannelHandlerContext ctx, PersonProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyPersonEncoder encoder invoked!");
// 消息头
out.writeInt(msg.getLength());
// 消息体
out.writeBytes(msg.getContent());
}
}

服务端

/**
* @Author: cuzz
* @Date: 2019/1/22 16:39
* @Description:
*/
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();

try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyPersonDecoder());
pipeline.addLast(new MyPersonEncoder());
pipeline.addLast(new MyServerHandler());
}
});

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}


/**
* @Author: cuzz
* @Date: 2019/1/22 16:16
* @Description:
*/
public class MyServerHandler extends SimpleChannelInboundHandler<PersonProtocol>{
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
int length = msg.getLength();
byte[] content = msg.getContent();
System.out.println("服务端接收到的数据:");
System.out.println("长度:" + length);
System.out.println("内容:" + new String(content, Charset.forName("utf-8")));
System.out.println("服务器接收到的消息数量:" + (++this.count));

PersonProtocol personProtocol = new PersonProtocol();
String resp = "hello, world";
personProtocol.setLength(resp.getBytes("utf-8").length);
personProtocol.setContent(resp.getBytes("utf-8"));
ctx.writeAndFlush(personProtocol);
}
}

客服端

public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyPersonDecoder());
pipeline.addLast(new MyPersonEncoder());
pipeline.addLast(new MyClientHandler());
}
});

ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}


/**
* @Author: cuzz
* @Date: 2019/1/22 16:25
* @Description:
*/
public class MyClientHandler extends SimpleChannelInboundHandler<PersonProtocol>{
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, PersonProtocol msg) throws Exception {
int length = msg.getLength();
byte[] content = msg.getContent();

System.out.println("客户端接收的消息:");
System.out.println("消息的长度:" + length);
System.out.println("消息的内容:" + new String(content, Charset.forName("utf-8")));
System.out.println("客户端接收到的消息数量:" + (++count));
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
String messageToBeSend = "send form client";
PersonProtocol personProtocol = new PersonProtocol();
personProtocol.setLength(messageToBeSend.getBytes("utf-8").length);
personProtocol.setContent(messageToBeSend.getBytes("utf-8"));
ctx.writeAndFlush(personProtocol);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

总结

这里关于 Netty 的这五篇分析都是看的圣思园张龙老师的课程自己所写下的笔记,自己对 Netty 有了简单的认识,也对 NIO 有了更深的了解,最主要的学会看英文文档,看官方文档很重要,不要惧怕,慢慢的就感觉还是文档写的最清楚,最有价值。老师还提到需要多记录,因此我也把一些重要的知识点记录下来,方便以后查找。当然以后还要加强学习,多看看 Netty 官方文档和例子,加强练习。

Comments