首页  

netty ByteToMessageDecoder     所属分类 netty 浏览量 92
ByteToMessageDecoder 主要用于解决网络通信中的半包和拆包问题。
它属于io.netty.handler.codec包,是一个抽象类,提供了将字节流(ByteBuf)转换为消息(Message)的功能。
ByteToMessageDecoder的核心作用是缓存入站的数据,直到准备好了用于处理,从而避免了因数据不完整而导致的半包或拆包问题

它的主要逻辑是将所有的数据全部放入累积区,子类从累积区取出数据进行解码后放入到一个数组中,
然后ByteToMessageDecoder会循环数组调用后面的handler

ByteToMessageDecoder的具体实现类包括 
LineBasedFrameDecoder 、 DelimiterBasedFrameDecoder 和 FixedLengthFrameDecoder 等

继承 ByteToMessageDecoder 并实现decode方法,定制解码逻辑

内部保存一个Cumulator用于保存待解码的ByteBuf,
然后不断调用子类需要实现的抽象方法decode去取出byte数据转换处理。

public interface Cumulator {
    ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}

Cumulator有两种实现,MERGE_CUMULATOR和COMPOSITE_CMUMULATOR
MERGE_CUMULATOR通过memory copy的方法将in中的数据复制写入到cumulation中。
COMPOSITE_CUMULATOR采取的是类似链表的方式,没有进行memory copy, 通过一种CompositeByteBuf来实现。
默认采用的是MERGE_CUMULATOR。


ByteToMessageDecoder中最主要的部分在channelRead处理上
收到一个msg后先判断是否是ByteBuf类型,是的情况 ,创建一个CodecOutputList(也是一种list)保存转码后的对象列表
如果cumulation为null则把msg设置为cumulation,否则合并到cumulation里
调用callDecode方法,尝试解码
finally中如果cumulation已经读完了,就release并置为null等待gc
调用fireChannelRead将解码后的out传递给后面的Handler


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

callDecode中不断执行抽象decode(ctx, in, out)方法直到in可读数据没有减少或当前handler被remove


fireChannelRead(ctx, msgs, numElements)的处理方式是对每个解码后的消息进行fireChannelRead,交给下一个Handler处理






io.netty.handler.codec.ByteToMessageDecoder io.netty.handler.codec.FixedLengthFrameDecoder public class FixedLengthFrameDecoder extends ByteToMessageDecoder public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List< Object> out) throws Exception; 在解码过程中将解码后的对象添加到列表中,以便后续处理 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { int readableBytes = in.readableBytes(); System.out.println("readableBytes="+readableBytes+","+in.getClass()); if (readableBytes < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); } } ByteBuf-readSlice 和 readRetainedSlice 用于创建从当前readerIndex开始的子缓冲区,readRetainedSlice会增加引用计数,而 readSlice 则不会

上一篇     下一篇
小学数学学习资料收集

netty学习资料汇总

netty ByteBuf 种类

netty解码器实例

Netty EventLoop

netty VS mina