Netty decoder

Black hole code 2021-01-14 16:27:42
netty decoder


Part1

1

Netty—— Separators and fixed length decoders

TCP Data transmission in the form of stream , The upper application protocol is to distinguish messages , It's often used 4 Ways of planting :

1. Fixed message length . After accumulatively reading messages of specified length, it is considered that a complete message has been read ; Set the counter to , Start reading the next datagram again

2. Use carriage return as the end of the message . Such as FTP agreement

3. Use a special separator as the end of a message . The line break is a special kind of end separator

4. The total length of the message is represented by the length field in the message header

Netty For the above 4 One way provides a unified abstraction , Provide 4 A decoder is designed to solve the corresponding problem .

2

Decoder Introduction

DelimiterBasedFrameDecoder: Automatically complete the message receiving code with separator as identifier

FixedLengthFrameDecoder: Automatically complete the code receiving of fixed length messages

Part2

1

DelimiterBasedFrameDecoder client

public class DelimiterBasedFrameDecoderEchoClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler());
}
});
// Asynchronous link operation
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// Wait for the client
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1");
}
}

2

DelimiterBasedFrameDecoder Client processing class

public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
private byte[] req;
public DelimiterBasedFrameDecoderEchoClientHandler() {
req = ("hello world" + "$_").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
// Cycle to send 100 Bar message , Refresh every time you send one , In theory, the server will receive 100 strip hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* Read and print messages
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println(" Client page " + count.incrementAndGet() + " The first time I got a message :" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

3

DelimiterBasedFrameDecoder Server side

public class DelimiterBasedFrameDecoderEchoServer {
public void bind(int port) {
// Configure the server side NIO Thread group
//NioEventLoopGroup It's a thread group , Contains a set of NIO Threads , Dealing with network events , It's actually Reactor Thread group
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty Used to start NIO Server startup class , The aim is to reduce NIO The complexity of development
ServerBootstrap bootstrap = new ServerBootstrap();
// Function like NIO Medium ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// To configure NioServerSocketChannel Parameters of
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
// Binding event handling class ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//DelimiterBasedFrameDecoder decoder $_ As a separator
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//StringDecoder decoder
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler());
}
});
// Binding port , Wait for the binding operation to complete
ChannelFuture channelFuture = bootstrap.bind(port).sync();
// Wait for the server listening port to close
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new DelimiterBasedFrameDecoderEchoServer().bind(port);
}
}

4

DelimiterBasedFrameDecoder Server side processing class

public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println(" Server side second " + count.incrementAndGet() + " The first time I got a message :" + body);
ByteBuf response = Unpooled.copiedBuffer((" current time :" + new Date() + "$_").getBytes());
// It's not sending messages directly to SocketChannel in , Just send the message to the buffer array , adopt flush Method to send a message to SocketChannel
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// Write messages from the message sending queue to SocketChannel in , Send to the other party
// Prevent frequent wakeups Selector Send message
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// An abnormal shutdown occurred ChannelHandlerContext And so on
ctx.close();
}
}

5

DelimiterBasedFrameDecoder Execution results

 Server side second 1 The first time I got a message :hello world
Server side second 2 The first time I got a message :hello world
Server side second 3 The first time I got a message :hello world
Server side second 4 The first time I got a message :hello world
Server side second 5 The first time I got a message :hello world
Server side second 6 The first time I got a message :hello world
Server side second 7 The first time I got a message :hello world
Server side second 8 The first time I got a message :hello world
Server side second 9 The first time I got a message :hello world
Server side second 10 The first time I got a message :hello world
······
Server side second 99 The first time I got a message :hello world
Server side second 100 The first time I got a message :hello world
Client page 1 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 2 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 3 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 4 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 5 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 6 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 7 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 8 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 9 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 10 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
·····
Client page 98 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 99 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Client page 100 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018

Part3

1

FixedLengthFrameDecoder client

public class FixedLengthFrameDecoderEchoClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler());
}
});
// Asynchronous link operation
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// Wait for the client
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1");
}
}

2

FixedLengthFrameDecoder Client processing class

public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
private byte[] req;
public FixedLengthFrameDecoderEchoClientHandler() {
req = ("hello world").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
// Cycle to send 100 Bar message , Refresh every time you send one , In theory, the server will receive 100 strip hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* Read and print messages
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println(" Client page " + count.incrementAndGet() + " The first time I got a message :" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

3

FixedLengthFrameDecoder Server side

public class FixedLengthFrameDecoderEchoServer {
public void bind(int port) {
// Configure the server side NIO Thread group
//NioEventLoopGroup It's a thread group , Contains a set of NIO Threads , Dealing with network events , It's actually Reactor Thread group
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty Used to start NIO Server startup class , The aim is to reduce NIO The complexity of development
ServerBootstrap bootstrap = new ServerBootstrap();
// Function like NIO Medium ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// To configure NioServerSocketChannel Parameters of
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
// Binding event handling class ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//FixedLengthFrameDecoder decoder
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11));
//StringDecoder decoder
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler());
}
});
// Binding port , Wait for the binding operation to complete
ChannelFuture channelFuture = bootstrap.bind(port).sync();
// Wait for the server listening port to close
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new FixedLengthFrameDecoderEchoServer().bind(port);
}
}

4

FixedLengthFrameDecoder Server side processing class

public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println(" The server receives a message :" + body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// Write messages from the message sending queue to SocketChannel in , Send to the other party
// Prevent frequent wakeups Selector Send message
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// An abnormal shutdown occurred ChannelHandlerContext And so on
ctx.close();
}
}

5

FixedLengthFrameDecoder test result

 The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
The server receives a message :hello world
······

END

This article is from WeChat official account. - The snail flying in the fallen leaves (A_GallopingSnail) , author : The supernatural snail

The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .

Original publication time : 2018-09-11

Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .

版权声明
本文为[Black hole code]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210114162621250t.html

  1. To what extent can I go out to work?
  2. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  3. Implementation of rocketmq message sending based on JMeter
  4. How to choose the ticket grabbing app in the Spring Festival? We have measured
  5. Implementation of rocketmq message sending based on JMeter
  6. My programmer's Road: self study java
  7. My programmer's Road: self study java
  8. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  9. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  10. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  11. [Java training project] Java ID number recognition system
  12. How does serverless deal with the resource supply demand of k8s in the offline scenario
  13. Detailed explanation of HBase basic principle
  14. Explain the function of thread pool and how to use it in Java
  15. Kubernetes official java client 8: fluent style
  16. 010_MySQL
  17. Vibrant special purchases for the Spring Festival tiktok section, hundreds of good things to make the year more rich flavor.
  18. 010_MySQL
  19. Of the 4 million docker images, 51% have high-risk vulnerabilities
  20. Rocketmq CPP client visual studio 2019 compilation
  21. Rocketmq CPP client visual studio 2019 compilation
  22. Usage of data custom attribute in jquery
  23. Common decompression in Linux
  24. Upload large files in Java
  25. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  26. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  27. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  28. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  29. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  30. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  31. 010_ MySQL
  32. 010_ MySQL
  33. Fast integration of imsdk and Huawei offline push
  34. 消息队列之RabbitMQ
  35. Rabbitmq of message queue
  36. 初学java进制转换方面补充学习
  37. Learn java base conversion supplementary learning
  38. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  39. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  40. 初学java进制转换方面补充学习
  41. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  42. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  43. Learn java base conversion supplementary learning
  44. JDBC测试连接数据库
  45. JDBC test connection database
  46. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  47. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  48. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  49. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  50. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  51. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  52. Java微服务 vs Go微服务,究竟谁更强!?
  53. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  54. Who is stronger, Java microservice vs go microservice!?
  55. Java微服务 vs Go微服务,究竟谁更强!?
  56. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  57. Who is stronger, Java microservice vs go microservice!?
  58. springboot异常处理之404
  59. Spring boot exception handling 404
  60. Spring Boot Security 国际化 多语言 i18n 趟过巨坑