Encoding and decoding of netty

Black hole code 2021-01-14 16:27:27
encoding decoding netty


Content catalog

MessagePack brief introduction MessagePack SDKMessagePack Encoder development MessagePack Decoder client code, server code POJO test result

MessagePack brief introduction

MessagePack Is an efficient framework for binary serialization and deserialization .

  • Cross language data exchange
  • Faster performance
  • The resulting bit stream is smaller

MessagePack SDK

<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>

MessagePack Encoder development

public class MsgpackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// establish MessagePack object
MessagePack msgpack = new MessagePack();
// Encode the object as MessagePack Byte array of format
byte[] raw = msgpack.write(msg);
// Write the byte array to ByteBuf in
out.writeBytes(raw);
}
}

MessagePack Decoder writing

public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// From the datagram msg in ( The data type here is ByteBuf, because Netty Our communication is based on ByteBuf object )
final byte[] array;
final int length = msg.readableBytes();
array = new byte[length];
/**
* What we use here is ByteBuf Of getBytes Methods to ByteBuf Object to byte array , The front is using readBytes, Directly pass in a received byte array parameter
* There are many parameters here , The first parameter is index, About readerIndex, The explanation is as follows :
* ByteBuf It's through readerIndex Follow writerIndex Two position pointers to assist in buffer read and write operations , The specific principle is as follows Netty Learn more about source code analysis
* The second parameter is the received byte array
* The third parameter is dstIndex the first index of the destination
* The fourth parameter is length the number of bytes to transfer
*/
msg.getBytes(msg.readerIndex(), array, 0, length);
// Create a MessagePack object
MessagePack msgpack = new MessagePack();
// Decode and add to the decode list out in
out.add(msgpack.read(array));
}
}

Client code

public class EchoClient {
public void connect(String host, int port, int sendNumber) throws Exception {
// Configure client NIO Thread group
try(EventLoopGroup group = new NioEventLoopGroup();) {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// Set up TCP Connection timeout
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Add length field
// stay MessagePack Add before decoder LengthFieldBasedFrameDecoder, Used to process half packet messages
// It parses the length field information in the message header , It's like this MsgpackDecoder The message received is always the whole package
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
// add to MesspagePack decoder
ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
// Add length field encoder
// stay MessagePack Encoder before adding LengthFieldPrepender, It will be ByteBuf Before adding 2 Message length field of bytes
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
// add to MessagePack Encoder
ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
// Add business processing handler
ch.pipeline().addLast(new EchoClientHandler(sendNumber));
}
});
// Initiate asynchronous connection operation
ChannelFuture f = b.connect(host, port).sync();
// Wait for the client link to close
f.channel().closeFuture().sync();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// Use default values
}
}
int sendNumber = 100;
new EchoClient().connect("localhost", port, sendNumber);
}
}
public class EchoClientHandler extends ChannelHandlerAdapter {
// sendNumber Number of objects written to the send buffer
private int sendNumber;
public EchoClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
}
/**
* Build length is userNum Of User An array of objects
* @param userNum
* @return
*/
private User[] getUserArray(int userNum) {
User[] users = new User[userNum];
User user = null;
for(int i = 0; i < userNum; i++) {
user = new User();
user.setName("ABCDEFG --->" + i);
user.setAge(i);
users[i] = user;
}
return users;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
User[] users = getUserArray(sendNumber);
for (User user : users) {
ctx.writeAndFlush(user);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client receive the msgpack message : " + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

Server code

public class EchoServer {
public void bind(int port) throws Exception {
// Configure the server NIO Thread group
try(EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup()) {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Add length field
// stay MessagePack Add before decoder LengthFieldBasedFrameDecoder, Used to process half packet messages
// It parses the length field information in the message header , It's like this MsgpackDecoder The message received is always the whole package
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
// add to MesspagePack decoder
ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
// Add length field encoder
// stay MessagePack Encoder before adding LengthFieldPrepender, It will be ByteBuf Before adding 2 Message length field of bytes
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
// add to MessagePack Encoder
ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
// Add business processing handler
// Add business processing handler
ch.pipeline().addLast(new EchoServerHandler());
}
});
// Binding port , Synchronization wait for success
ChannelFuture f = b.bind(port).sync();
// Wait for the server listening port to close
f.channel().closeFuture().sync();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// TODO: handle exception
}
}
new EchoServer().bind(port);
}
}
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server receive the msgpack message : " + msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Something goes wrong , Turn off the link
ctx.close();
}
}

POJO

@Message
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User [name=" + name + ", age=" + age + "]";
}
}

test result

 Server-side output
Server receive the msgpack message : ["ABCDEFG --->0",0]
Server receive the msgpack message : ["ABCDEFG --->1",1]
Server receive the msgpack message : ["ABCDEFG --->2",2]
······ Leave out the code ······
Server receive the msgpack message : ["ABCDEFG --->98",98]
Server receive the msgpack message : ["ABCDEFG --->99",99]
Client output
Client receive the msgpack message : ["ABCDEFG --->0",0]
Client receive the msgpack message : ["ABCDEFG --->1",1]
Client receive the msgpack message : ["ABCDEFG --->2",2]
······ Leave out the code ······
Client receive the msgpack message : ["ABCDEFG --->98",98]
Client receive the msgpack message : ["ABCDEFG --->99",99]

This article is from WeChat official account. - The snail flying in the fallen leaves (A_GallopingSnail) , author : The supernatural snail

The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .

Original publication time : 2018-09-13

Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .

版权声明
本文为[Black hole code]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210114162621242R.html

  1. To what extent can I go out to work?
  2. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  3. Implementation of rocketmq message sending based on JMeter
  4. How to choose the ticket grabbing app in the Spring Festival? We have measured
  5. Implementation of rocketmq message sending based on JMeter
  6. My programmer's Road: self study java
  7. My programmer's Road: self study java
  8. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  9. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  10. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  11. [Java training project] Java ID number recognition system
  12. How does serverless deal with the resource supply demand of k8s in the offline scenario
  13. Detailed explanation of HBase basic principle
  14. Explain the function of thread pool and how to use it in Java
  15. Kubernetes official java client 8: fluent style
  16. 010_MySQL
  17. Vibrant special purchases for the Spring Festival tiktok section, hundreds of good things to make the year more rich flavor.
  18. 010_MySQL
  19. Of the 4 million docker images, 51% have high-risk vulnerabilities
  20. Rocketmq CPP client visual studio 2019 compilation
  21. Rocketmq CPP client visual studio 2019 compilation
  22. Usage of data custom attribute in jquery
  23. Common decompression in Linux
  24. Upload large files in Java
  25. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  26. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  27. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  28. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  29. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  30. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  31. 010_ MySQL
  32. 010_ MySQL
  33. Fast integration of imsdk and Huawei offline push
  34. 消息队列之RabbitMQ
  35. Rabbitmq of message queue
  36. 初学java进制转换方面补充学习
  37. Learn java base conversion supplementary learning
  38. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  39. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  40. 初学java进制转换方面补充学习
  41. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  42. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  43. Learn java base conversion supplementary learning
  44. JDBC测试连接数据库
  45. JDBC test connection database
  46. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  47. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  48. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  49. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  50. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  51. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  52. Java微服务 vs Go微服务,究竟谁更强!?
  53. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  54. Who is stronger, Java microservice vs go microservice!?
  55. Java微服务 vs Go微服务,究竟谁更强!?
  56. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  57. Who is stronger, Java microservice vs go microservice!?
  58. springboot异常处理之404
  59. Spring boot exception handling 404
  60. Spring Boot Security 国际化 多语言 i18n 趟过巨坑