1 解码概述
在Dubbo服务消费方(客户端)和服务提供方(服务端)进行网络通信时,服务提供方会通过
socket把需要发送的内容序列化为二进制流后发出。接着二进制流通过网络流向服务提供方。服务
提供方接收到该请求后会解析该请求包,对接收到的数据进行反序列化后对请求进行处理。
服务提供方接收到请求后解析请求包(即Dubbo协议帧数据)的过程为服务提供方的解码过程。
同理,服务消费方接收到服务提供方发出的响应信息后,会对响应信息进行解析,此过程为服务消费方的解码过程。
2 解码原理解析
2.1 解码入口
Dubbo服务提供方或消费方对接收到的Dubbo协议帧数据进行解析,最终是借助于 InternalDecoder 的 decode 方法来实现的。具体实现代码如下所示。
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List
2.2 解码实现细节
在 InternalDecoder 的 decode 方法内部通过调用具体的解码实现来进行解码。
如 ExchangeCodec 的 decode 方法,其主要内容为:
(1)解析 Dubbo 协议头-header;
(2)解析 Dubbo 协议帧数据部分-body。
具体实现如下所示。
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 获取dubbo协议帧的字节数 int readable = buffer.readableBytes(); // 将dubbo协议头读取到数据header byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); // 解析dubbo协议帧数据部分 return decode(channel, buffer, readable, header); }
其中将dubbo协议头读取到header数组的实现如下所示。
@Override public void readBytes(byte[] dst) { readBytes(dst, 0, dst.length); }
2.2.1 解析 Dubbo 协议帧的 header
解析 Dubbo 协议头时主要做的事情为:
(1)校验协议头的魔法值是否有效;
(2)校验 Dubbo 协议帧是否为一个完整的协议帧。
判断的方式包括:读取到的dubbo协议帧字节数是否小于协议头长度;读取到的dubbo协议帧字节数是否小于完整的dubbo协议帧字节数(header长度+body长度)。
具体实现如下所示。
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. // (1)读取到的dubbo协议帧字节数小于协议头长度,说明该协议帧不是一个完整的协议帧 if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // get data length. int len = Bytes.bytes2int(header, 12); // When receiving response, how to exceed the length, then directly construct a response to the client. // see more detail from https://github.com/apache/dubbo/issues/7021. Object obj = finishRespWhenOverPayload(channel, len, header); if (null != obj) { return obj; } // dubbo协议帧理论上的字节数 int tt = len + HEADER_LENGTH; // (2)读取到的dubbo协议帧字节数小于完整的dubbo协议帧字节数,说明该协议帧不是一个完整的协议帧 if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } // limit input stream. // 限制只读取一个完整协议帧的数据部分 ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", "Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(TRANSPORT_SKIP_UNUSED_STREAM, "", "", e.getMessage(), e); } } } }
2.2.2 解析 Dubbo 协议帧的body
对dubbo协议帧的body进行解析时调用了 decodeBody 方法,其中包括了服务提供方对请求内容的解析和服务消费方对响应信息的解析。
具体实现如下所示。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // get request id. long id = Bytes.bytes2long(header, 4); if ((flag & FLAG_REQUEST) == 0) { // decode response. - 服务消费方对响应内容进行解析 Response res = new Response(id); if ((flag & FLAG_EVENT) != 0) { res.setEvent(true); } // get status. byte status = header[3]; res.setStatus(status); try { if (status == Response.OK) { Object data; if (res.isEvent()) { byte[] eventPayload = CodecSupport.getPayload(is); if (CodecSupport.isHeartBeat(eventPayload, proto)) { // heart beat response data is always null; data = null; } else { data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload); } } else { data = decodeResponseData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto), getRequestData(channel, res, id)); } res.setResult(data); } else { res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF()); } } catch (Throwable t) { res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else { // decode request.-服务提供方对请求信息进行解析 Request req; try { Object data; if ((flag & FLAG_EVENT) != 0) { byte[] eventPayload = CodecSupport.getPayload(is); if (CodecSupport.isHeartBeat(eventPayload, proto)) { // heart beat response data is always null; req = new HeartBeatRequest(id); ((HeartBeatRequest) req).setProto(proto); data = null; } else { req = new Request(id); data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto), eventPayload); } req.setEvent(true); } else { req = new Request(id); data = decodeRequestData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto)); } req.setData(data); } catch (Throwable t) { // bad request req = new Request(id); req.setBroken(true); req.setData(t); } req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0); return req; } }
2.3 Dubbo协议帧的半包和粘包问题
2.3.1 半包和粘包问题概述
当Dubbo服务端某次接收到的Dubbo协议帧数据是一个完整协议帧(对应某个完整的数据包)的一部分,则此时出现了半包现象。如果Dubbo服务端某次接收到的Dubbo协议帧数据比一个完整协议帧数据还要多,则此时出现了粘包现象。对半包和粘包问题的具体描述如下。
(1)半包现象
客户端发送数据时,实际上是先把数据写入tcp发送缓存中,然后通过socket发送给服务端。
如果待发送的数据包比tcp发送缓存的容量还大,则这个数据包会被拆成多个包,然后分多次发送给服务端。
因此服务端第一次从接收缓存中获取到的数据为全包的一部分,此时即出现了半包现象。
(2)粘包现象
同理,如果待发送的数据包比tcp发送缓存的容量小,则tcp发送缓存中可能会存放多个不同数据包的内容。即tcp发送缓存中包含多个包,且这些包属于不同的数据包,属于不同的请求。
因此服务端可能会从接收缓存中一次读取到多个不同数据包的内容,此时即出现了粘包现象。
当出现半包或者粘包现象时,如果服务器将接收的二进制流数据直接进行解析,将其整体反序列化为对象,反序列化会失败。
2.3.2 解决办法
为了解决当出现半包或者粘包现象时不发生反序列化失败,服务端需要识别接收到的数据是否包含完整的Dubbo协议帧。
服务端从tcp接收缓存中读取到数据后,首先会解析 Dubbo 协议头,此时会校验本次读取到数据是否包含完整的Dubbo协议帧(判断逻辑见 2.2.1)。
(1)当判断出此次读取到的数据不包含完整的Dubbo协议帧时,说明此时出现半包现象,此时会重置缓存的读取下标,然后结束本次数据解析操作。等待下次读事件到来时再次读取并解析数据。
(2)当成功解析完一个完整的协议帧之后还有数据可以读取,说明出现了粘包现象,此时会对剩余的数据执行解析操作,直到没有数据可读取为止。
具体代码如下所示。
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List
每次循环解析时只读取一个完整的协议帧数据,因此即使粘包,不同协议帧数据也是依次被单独解析的,确保了数据解析和反序列化不会出错。关键代码如下所示。
// ExchangeCodec 的 decode 方法 protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { ... // limit input stream. // 限制只读取一个完整协议帧的数据部分 ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); ... } // length - 某个完整协议帧的数据部分的长度 public ChannelBufferInputStream(ChannelBuffer buffer, int length) { if (buffer == null) { throw new NullPointerException("buffer"); } if (length < 0) { throw new IllegalArgumentException("length: " + length); } if (length > buffer.readableBytes()) { throw new IndexOutOfBoundsException(); } this.buffer = buffer; startIndex = buffer.readerIndex(); endIndex = startIndex + length; buffer.markReaderIndex(); }
还没有评论,来说两句吧...