Netty and TCP packet sticking and unpacking

Black hole code 2021-01-14 16:27:52
netty tcp packet sticking unpacking


Netty How to solve TCP The problem of sticking and unpacking ?

TCP Sticky package / unpacking

TCP Protocol is a streaming protocol , Flow , A string of data without boundaries . The water in the river , It's a continuous piece of , There is no dividing line .TCP The bottom layer does not understand the specific meaning of the upper layer business data , He will be based on TCP The actual situation of the buffer is divided into packets , So in business, a complete package , It's possible that TCP Split into multiple packets to send , It is also possible to encapsulate multiple small packets in the business into a large packet to send , That's what's called TCP Sticking and unpacking problems .

Instructions for sticking and unpacking

Now suppose that the client sends two consecutive packets to the server , use packet1 and packet2 To express , Then the data received by the server can be divided into three types , Here are some examples :

Case one , The receiver normally receives two packets , That is to say, there is no phenomenon of unpacking and sticking

The second case , The receiver receives only one packet , because TCP There will be no packet loss , So this packet contains the information of two packets sent by the sender , This phenomenon is called "sticking package" . In this case, the receiver does not know the boundary between the two packets , So it's hard for the receiver to handle .

The third case , There are two forms of this situation , Here's the picture . The receiver received two packets , But these two packets are either incomplete , Or one more piece , In this case, unpacking and sticking happen . If these two cases are not treated specially , It is also not easy to deal with the receiver .

If the server side TCP The receiving window is very small , The packet Packet1 and Packet2 The larger , It's very likely that something else will happen —— The server can send Packet1 and Packet2 Full reception , There will be many unpacking during this period .

Sticky package 、 Reasons for unpacking

1. The data to be sent is greater than TCP The amount of space left in the send buffer , Unpacking will occur, that is, the size of the bytes written by the application is larger than the size of the socket send buffer .

2. Conduct MSS The size of TCP piecewise .MSS Is the abbreviation of the maximum message segment length .MSS yes TCP Maximum length of data field in message segment . Add... To the data field TCP The first is the whole TCP Message segment . therefore MSS Not at all TCP Maximum length of message segment , It is :MSS=TCP Segment length -TCP The length of the first , Data to be sent is greater than MSS( Maximum message length ),TCP Unpack before transmission .

3. The data to be sent is less than TCP Size of send buffer ,TCP Send the data written to the buffer several times at a time , Sticking will occur .

4. The application layer of the receiving data side does not read the data in the receiving buffer in time , Sticking will occur .

5. Ethernet payload Greater than MTU Conduct IP Fragmentation .MTU finger : The maximum packet size that can be passed over a certain layer of a communication protocol . If IP Layer has a packet to transmit , And the length of the data is longer than that of the link layer MTU Big , that IP Layers will be sliced , Divide the packet into pieces , Let each piece not exceed MTU. Be careful ,IP Fragmentation can occur on the original sender host , It can also happen on the middle router .

TCP The solution strategy of sticking and unpacking

As a result of the underlying TCP Inability to understand the business data at the top , So at the bottom, there is no guarantee that the data will not be unpacked and reorganized , This problem needs to be solved through the design of the upper application protocol stack .

1. Message fixed-length . for example 100 byte . If not enough , Blank space .

2. Add special characters such as carriage return or space character at the end of the package for segmentation , A typical such as FTP agreement , The sender encapsulates each packet as a fixed length ( If it's not enough, we can make up for it 0 fill ), In this way, each time the receiving end reads fixed length data from the receiving buffer, it will naturally split each packet .

3. Divides messages into headers and bodies . The field in the message header that contains the total length of the message , In this way, each time the receiving end reads fixed length data from the receiving buffer, it will naturally split each packet .

4. Other complex agreements , Such as RTMP Agreements, etc .

Not considered TCP Abnormal cases caused by package sticking and unpacking

Server-side code :

public class TimeServer {
public void bind(int port) {
// Configure the server side NIO Thread group
//NioEventLoopGroup It's a thread group , Contains a set of NIO Threads , Dealing with network events , It's actually Reactor Thread group
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty Used to start NIO Server startup class , The aim is to reduce NIO The complexity of development
ServerBootstrap bootstrap = new ServerBootstrap();
// Function like NIO Medium ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup).channel(NioServerSocketChannel.class)
// To configure NioServerSocketChannel Parameters of
.option(ChannelOption.SO_BACKLOG, 1024)
// Binding event handling class ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
});
// Binding port , Wait for the binding operation to complete
ChannelFuture channelFuture = bootstrap.bind(port).sync();
// Wait for the server listening port to close
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new TimeServer().bind(port);
}
}

Server side processing class

public class TimeServerHandler extends ChannelHandlerAdapter {
/**
* Counter
*/
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// similar NIO Medium ByteBuffer
ByteBuf buf = (ByteBuf) msg;
// Gets the number of bytes read in the buffer
byte[] req = new byte[buf.readableBytes()];
// The bytes in the buffer are copied to the byte array
buf.readBytes(req);
String body = new String(req).substring(0, req.length - System.getProperty("line.separator").length());
if ("hello world".equalsIgnoreCase(body)) {
System.out.println(" Input received :" + body);
} else {
System.out.println(" Exception input :" + body);
}
++counter;
ByteBuf response = Unpooled.copiedBuffer((" current time :" + new Date()).getBytes());
// It's not sending messages directly to SocketChannel in , Just send the message to the buffer array , adopt flush Method to send a message to SocketChannel
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// Write messages from the message sending queue to SocketChannel in , Send to the other party
// Prevent frequent wakeups Selector Send message
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// An abnormal shutdown occurred ChannelHandlerContext And so on
ctx.close();
}
}

Client code

public class TimeClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
// Asynchronous link operation
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// Wait for the client
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new TimeClient().connect(port, "127.0.0.1");
}
}

Customer service end processing class

public class TimeClientHandler extends ChannelHandlerAdapter {
private byte[] req;
public TimeClientHandler() {
req = ("hello world" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
// Cycle to send 100 Bar message , Refresh every time you send one , In theory, the server will receive 100 strip hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* Read and print messages
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] resp = new byte[buf.readableBytes()];
buf.readBytes(resp);
String body = new String(resp);
System.out.println(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

Output results :

 Input received :hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hell
Input received :o world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world

Because of not considering TCP The problem of sticking and unpacking , So it happened “ Input received :o world” Output .

Netty Solutions provided

LineBasedFrameDecoder and StringDecoder The use of decoders

Optimized server-side code

public class OptimizedTimeServer {
public void bind(int port) {
// Configure the server side NIO Thread group
//NioEventLoopGroup It's a thread group , Contains a set of NIO Threads , Dealing with network events , It's actually Reactor Thread group
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty Used to start NIO Server startup class , The aim is to reduce NIO The complexity of development
ServerBootstrap bootstrap = new ServerBootstrap();
// Function like NIO Medium ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup).channel(NioServerSocketChannel.class)
// To configure NioServerSocketChannel Parameters of
.option(ChannelOption.SO_BACKLOG, 1024)
// Binding event handling class ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//LineBasedFrameDecoder decoder
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//StringDecoder decoder
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new OptimizeTimeServerHandler());
}
});
// Binding port , Wait for the binding operation to complete
ChannelFuture channelFuture = bootstrap.bind(port).sync();
// Wait for the server listening port to close
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new OptimizedTimeServer().bind(port);
}
}

Optimized server-side processing class

public class OptimizeTimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println(" Received a message :" + body);
ByteBuf response = Unpooled.copiedBuffer((" current time :" + new Date() + System.getProperty("line.separator")).getBytes());
// It's not sending messages directly to SocketChannel in , Just send the message to the buffer array , adopt flush Method to send a message to SocketChannel
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// Write messages from the message sending queue to SocketChannel in , Send to the other party
// Prevent frequent wakeups Selector Send message
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// An abnormal shutdown occurred ChannelHandlerContext And so on
ctx.close();
}
}

Optimized client code

public class OptimizeTimeClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new OptimizeTimeClientHandler());
}
});
// Asynchronous link operation
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// Wait for the client
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new OptimizeTimeClient().connect(port, "127.0.0.1");
}
}

Optimized client processing class

public class OptimizeTimeClientHandler extends ChannelHandlerAdapter {
private byte[] req;
public OptimizeTimeClientHandler() {
req = ("hello world" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
// Cycle to send 100 Bar message , Refresh every time you send one , In theory, the server will receive 100 strip hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* Read and print messages
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

Optimized server-side output

 Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world
Received a message :hello world

LineBasedFrameDecoder and StringDecoder Principle analysis of decoder

LineBasedFrameDecoder: Traverse ByteBuf Read bytes in , To determine if there is “\n” perhaps “\r\n”, If there is , That's where it ends , The bytes from the readable index to the end position form a line , It's a decoder that ends with a behavior identifier , Support two ways of decoding with or without terminators , At the same time, the configuration supports the maximum length of a single line , If the maximum length is read continuously , If no newline is found, an exception is thrown , At the same time, the abnormal code stream read before is ignored .

StringDecoder: Convert the received object to a string , Then call the follow up. Handler.

LineBasedFrameDecoder + StringDecoder = Text decoder for line switching

This article is from WeChat official account. - The snail flying in the fallen leaves (A_GallopingSnail) , author : The supernatural snail

The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .

Original publication time : 2018-09-10

Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .

版权声明
本文为[Black hole code]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210114162621255x.html

  1. To what extent can I go out to work?
  2. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  3. Implementation of rocketmq message sending based on JMeter
  4. How to choose the ticket grabbing app in the Spring Festival? We have measured
  5. Implementation of rocketmq message sending based on JMeter
  6. My programmer's Road: self study java
  7. My programmer's Road: self study java
  8. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  9. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  10. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  11. [Java training project] Java ID number recognition system
  12. How does serverless deal with the resource supply demand of k8s in the offline scenario
  13. Detailed explanation of HBase basic principle
  14. Explain the function of thread pool and how to use it in Java
  15. Kubernetes official java client 8: fluent style
  16. 010_MySQL
  17. Vibrant special purchases for the Spring Festival tiktok section, hundreds of good things to make the year more rich flavor.
  18. 010_MySQL
  19. Of the 4 million docker images, 51% have high-risk vulnerabilities
  20. Rocketmq CPP client visual studio 2019 compilation
  21. Rocketmq CPP client visual studio 2019 compilation
  22. Usage of data custom attribute in jquery
  23. Common decompression in Linux
  24. Upload large files in Java
  25. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  26. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  27. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  28. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  29. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  30. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  31. 010_ MySQL
  32. 010_ MySQL
  33. Fast integration of imsdk and Huawei offline push
  34. 消息队列之RabbitMQ
  35. Rabbitmq of message queue
  36. 初学java进制转换方面补充学习
  37. Learn java base conversion supplementary learning
  38. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  39. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  40. 初学java进制转换方面补充学习
  41. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  42. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  43. Learn java base conversion supplementary learning
  44. JDBC测试连接数据库
  45. JDBC test connection database
  46. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  47. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  48. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  49. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  50. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  51. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  52. Java微服务 vs Go微服务,究竟谁更强!?
  53. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  54. Who is stronger, Java microservice vs go microservice!?
  55. Java微服务 vs Go微服务,究竟谁更强!?
  56. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  57. Who is stronger, Java microservice vs go microservice!?
  58. springboot异常处理之404
  59. Spring boot exception handling 404
  60. Spring Boot Security 国际化 多语言 i18n 趟过巨坑