netty ByteToMessageDecoder
所属分类 netty
浏览量 92
ByteToMessageDecoder 主要用于解决网络通信中的半包和拆包问题。
它属于io.netty.handler.codec包,是一个抽象类,提供了将字节流(ByteBuf)转换为消息(Message)的功能。
ByteToMessageDecoder的核心作用是缓存入站的数据,直到准备好了用于处理,从而避免了因数据不完整而导致的半包或拆包问题
它的主要逻辑是将所有的数据全部放入累积区,子类从累积区取出数据进行解码后放入到一个数组中,
然后ByteToMessageDecoder会循环数组调用后面的handler
ByteToMessageDecoder的具体实现类包括
LineBasedFrameDecoder 、 DelimiterBasedFrameDecoder 和 FixedLengthFrameDecoder 等
继承 ByteToMessageDecoder 并实现decode方法,定制解码逻辑
内部保存一个Cumulator用于保存待解码的ByteBuf,
然后不断调用子类需要实现的抽象方法decode去取出byte数据转换处理。
public interface Cumulator {
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
Cumulator有两种实现,MERGE_CUMULATOR和COMPOSITE_CMUMULATOR
MERGE_CUMULATOR通过memory copy的方法将in中的数据复制写入到cumulation中。
COMPOSITE_CUMULATOR采取的是类似链表的方式,没有进行memory copy, 通过一种CompositeByteBuf来实现。
默认采用的是MERGE_CUMULATOR。
ByteToMessageDecoder中最主要的部分在channelRead处理上
收到一个msg后先判断是否是ByteBuf类型,是的情况 ,创建一个CodecOutputList(也是一种list)保存转码后的对象列表
如果cumulation为null则把msg设置为cumulation,否则合并到cumulation里
调用callDecode方法,尝试解码
finally中如果cumulation已经读完了,就release并置为null等待gc
调用fireChannelRead将解码后的out传递给后面的Handler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
callDecode中不断执行抽象decode(ctx, in, out)方法直到in可读数据没有减少或当前handler被remove
fireChannelRead(ctx, msgs, numElements)的处理方式是对每个解码后的消息进行fireChannelRead,交给下一个Handler处理
io.netty.handler.codec.ByteToMessageDecoder
io.netty.handler.codec.FixedLengthFrameDecoder
public class FixedLengthFrameDecoder extends ByteToMessageDecoder
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List< Object> out) throws Exception;
在解码过程中将解码后的对象添加到列表中,以便后续处理
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
int readableBytes = in.readableBytes();
System.out.println("readableBytes="+readableBytes+","+in.getClass());
if (readableBytes < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
ByteBuf-readSlice 和 readRetainedSlice
用于创建从当前readerIndex开始的子缓冲区,readRetainedSlice会增加引用计数,而 readSlice 则不会
上一篇
下一篇
小学数学学习资料收集
netty学习资料汇总
netty ByteBuf 种类
netty解码器实例
Netty EventLoop
netty VS mina