Netty与TCP粘包拆包

黑洞代码 2021-01-14 16:26:32
TCP 腾讯云 Netty 技术开发


Netty如何解决TCP粘包拆包的问题?

TCP粘包/拆包

TCP协议是个流协议,所谓流,就是指没有界限的一串数据。河里的流水,是连成一片的,没有分界线。TCP底层并不了解上层业务数据的具体意义,他会根据TCP缓冲区的实际情况进行包的划分,所以在业务上一个完整的包,有可能会被TCP拆分为多个包进行发送,也有可能把业务上多个小包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

粘包拆包说明

现在假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种,现列举如下:

第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象

第二种情况,接收端只收到一个数据包,由于TCP是不会出现丢包的,所以这一个数据包中包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。

第三种情况,这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。

如果此时服务器端TCP接收窗口非常小,而数据包Packet1和Packet2比较大,很有可能会发生另一种情况——服务器分多次才能将Packet1和Packet2完全接收,期间会发生多次拆包。

粘包、拆包发生原因

1.要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包即应用程序写入数据的字节大小大于套接字发送缓冲区的大小。

2.进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度,待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。

3.要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。

4.接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

5.以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成若干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

TCP粘包拆包的解决策略

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据不被拆包和重组的,这样问题需要通过上层的应用协议栈设计来解决。

1. 消息定长。例如100字节。如果不够,空位补空格。

2. 在包尾部增加回车或者空格符等特殊字符进行分割,典型的如FTP协议,发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。

3. 将消息分为消息头和消息体。消息头中包含消息总长度的字段,这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。

4. 其它复杂的协议,如RTMP协议等。

未考虑TCP粘包拆包导致异常的案例

服务器端代码:

public class TimeServer {
public void bind(int port) {
//配置服务器端NIO线程组
//NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//功能类似于NIO中的ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup).channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的参数
.option(ChannelOption.SO_BACKLOG, 1024)
//绑定事件的处理类ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
});
//绑定端口,同步等待绑定操作完成
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new TimeServer().bind(port);
}
}

服务器端处理类

public class TimeServerHandler extends ChannelHandlerAdapter {
/**
* 计数器
*/
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//类似NIO中的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
//获取缓冲区可读字节数
byte[] req = new byte[buf.readableBytes()];
//缓冲区中的字节复制到字节数组
buf.readBytes(req);
String body = new String(req).substring(0, req.length - System.getProperty("line.separator").length());
if ("hello world".equalsIgnoreCase(body)) {
System.out.println("收到输入:" + body);
} else {
System.out.println("异常输入:" + body);
}
++counter;
ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date()).getBytes());
//并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//将消息发送队列中的消息写入SocketChannel中,发送到对方
//防止频繁的唤醒Selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//发生异常关闭ChannelHandlerContext等资源
ctx.close();
}
}

客户端代码

public class TimeClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//异步链接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待客户端
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new TimeClient().connect(port, "127.0.0.1");
}
}

客服端处理类

public class TimeClientHandler extends ChannelHandlerAdapter {
private byte[] req;
public TimeClientHandler() {
req = ("hello world" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 读取并打印消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] resp = new byte[buf.readableBytes()];
buf.readBytes(resp);
String body = new String(resp);
System.out.println(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

输出结果:

收到输入:hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hell
收到输入:o world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world

由于没有考虑TCP粘包拆包的问题,所以发生“收到输入:o world”的输出。

Netty提供的解决方案

LineBasedFrameDecoder和StringDecoder解码器的使用

优化后的服务器端代码

public class OptimizedTimeServer {
public void bind(int port) {
//配置服务器端NIO线程组
//NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//功能类似于NIO中的ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup).channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的参数
.option(ChannelOption.SO_BACKLOG, 1024)
//绑定事件的处理类ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//LineBasedFrameDecoder解码器
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//StringDecoder解码器
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new OptimizeTimeServerHandler());
}
});
//绑定端口,同步等待绑定操作完成
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new OptimizedTimeServer().bind(port);
}
}

优化后的服务器端处理类

public class OptimizeTimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println("收到消息:" + body);
ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + System.getProperty("line.separator")).getBytes());
//并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//将消息发送队列中的消息写入SocketChannel中,发送到对方
//防止频繁的唤醒Selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//发生异常关闭ChannelHandlerContext等资源
ctx.close();
}
}

优化后的客户端代码

public class OptimizeTimeClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new OptimizeTimeClientHandler());
}
});
//异步链接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待客户端
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new OptimizeTimeClient().connect(port, "127.0.0.1");
}
}

优化后的客户端处理类

public class OptimizeTimeClientHandler extends ChannelHandlerAdapter {
private byte[] req;
public OptimizeTimeClientHandler() {
req = ("hello world" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 读取并打印消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

优化后的服务器端输出

收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world
收到消息:hello world

LineBasedFrameDecoder和StringDecoder解码器原理分析

LineBasedFrameDecoder:遍历ByteBuf中的可读字节,判断是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置之间的字节组成一行,它是以换行为结束标识符的解码器,支持携带结束符或不携带结束符两种方式解码,同时配置支持单行最大长度,如果连续读取最大长度后,仍没有发现换行符就会抛出异常,同时忽略掉之前读取到的异常码流。

StringDecoder:将收到的对象转换成字符串,然后调用后续的Handler。

LineBasedFrameDecoder + StringDecoder = 按行切换的文本解码器

本文分享自微信公众号 - 落叶飞翔的蜗牛(A_GallopingSnail) ,作者:超神的蜗牛

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间: 2018-09-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

版权声明
本文为[黑洞代码]所创,转载请带上原文链接,感谢
https://cloud.tencent.com/developer/article/1773777

  1. To what extent can I go out to work?
  2. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  3. Implementation of rocketmq message sending based on JMeter
  4. How to choose the ticket grabbing app in the Spring Festival? We have measured
  5. Implementation of rocketmq message sending based on JMeter
  6. My programmer's Road: self study java
  7. My programmer's Road: self study java
  8. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  9. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  10. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  11. [Java training project] Java ID number recognition system
  12. How does serverless deal with the resource supply demand of k8s in the offline scenario
  13. Detailed explanation of HBase basic principle
  14. Explain the function of thread pool and how to use it in Java
  15. Kubernetes official java client 8: fluent style
  16. 010_MySQL
  17. Vibrant special purchases for the Spring Festival tiktok section, hundreds of good things to make the year more rich flavor.
  18. 010_MySQL
  19. Of the 4 million docker images, 51% have high-risk vulnerabilities
  20. Rocketmq CPP client visual studio 2019 compilation
  21. Rocketmq CPP client visual studio 2019 compilation
  22. Usage of data custom attribute in jquery
  23. Common decompression in Linux
  24. Upload large files in Java
  25. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  26. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  27. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  28. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  29. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  30. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  31. 010_ MySQL
  32. 010_ MySQL
  33. Fast integration of imsdk and Huawei offline push
  34. 消息队列之RabbitMQ
  35. Rabbitmq of message queue
  36. 初学java进制转换方面补充学习
  37. Learn java base conversion supplementary learning
  38. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  39. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  40. 初学java进制转换方面补充学习
  41. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  42. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  43. Learn java base conversion supplementary learning
  44. JDBC测试连接数据库
  45. JDBC test connection database
  46. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  47. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  48. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  49. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  50. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  51. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  52. Java微服务 vs Go微服务,究竟谁更强!?
  53. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  54. Who is stronger, Java microservice vs go microservice!?
  55. Java微服务 vs Go微服务,究竟谁更强!?
  56. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  57. Who is stronger, Java microservice vs go microservice!?
  58. springboot异常处理之404
  59. Spring boot exception handling 404
  60. Spring Boot Security 国际化 多语言 i18n 趟过巨坑