Netty之编解码

黑洞代码 2021-01-14 16:26:28
腾讯云 Netty 技术开发 编解码 解码


内容目录

MessagePack 简介MessagePack SDKMessagePack编码器开发MessagePack解码器编写客户端代码服务端代码POJO测试结果

MessagePack 简介

MessagePack是一个高效的二进制序列化和反序列化框架。

  • 跨语言数据交换
  • 性能更快
  • 产生的码流更小

MessagePack SDK

<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>

MessagePack编码器开发

public class MsgpackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 创建MessagePack对象
MessagePack msgpack = new MessagePack();
// 将对象编码为MessagePack格式的字节数组
byte[] raw = msgpack.write(msg);
// 将字节数组写入到ByteBuf中
out.writeBytes(raw);
}
}

MessagePack解码器编写

public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// 从数据报msg中(这里的数据类型为ByteBuf,因为Netty的通信基于ByteBuf对象)
final byte[] array;
final int length = msg.readableBytes();
array = new byte[length];
/**
* 这里使用的是ByteBuf的getBytes方法来将ByteBuf对象转换为字节数组,前面是使用readBytes,直接传入一个接收的字节数组参数即可
* 这里的参数比较多,第一个参数是index,关于readerIndex,说明如下:
* ByteBuf是通过readerIndex跟writerIndex两个位置指针来协助缓冲区的读写操作的,具体原理等到Netty源码分析时再详细学习一下
* 第二个参数是接收的字节数组
* 第三个参数是dstIndex the first index of the destination
* 第四个参数是length the number of bytes to transfer
*/
msg.getBytes(msg.readerIndex(), array, 0, length);
// 创建一个MessagePack对象
MessagePack msgpack = new MessagePack();
// 解码并添加到解码列表out中
out.add(msgpack.read(array));
}
}

客户端代码

public class EchoClient {
public void connect(String host, int port, int sendNumber) throws Exception {
// 配置客户端NIO线程组
try(EventLoopGroup group = new NioEventLoopGroup();) {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
// 设置TCP连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加长度字段解码器
// 在MessagePack解码器之前增加LengthFieldBasedFrameDecoder,用于处理半包消息
// 它会解析消息头部的长度字段信息,这样后面的MsgpackDecoder接收到的永远是整包消息
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
// 添加MesspagePack解码器
ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
// 添加长度字段编码器
// 在MessagePack编码器之前增加LengthFieldPrepender,它将在ByteBuf之前增加2个字节的消息长度字段
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
// 添加MessagePack编码器
ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
// 添加业务处理handler
ch.pipeline().addLast(new EchoClientHandler(sendNumber));
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// 采用默认值
}
}
int sendNumber = 100;
new EchoClient().connect("localhost", port, sendNumber);
}
}
public class EchoClientHandler extends ChannelHandlerAdapter {
// sendNumber为写入发送缓冲区的对象数量
private int sendNumber;
public EchoClientHandler(int sendNumber) {
this.sendNumber = sendNumber;
}
/**
* 构建长度为userNum的User对象数组
* @param userNum
* @return
*/
private User[] getUserArray(int userNum) {
User[] users = new User[userNum];
User user = null;
for(int i = 0; i < userNum; i++) {
user = new User();
user.setName("ABCDEFG --->" + i);
user.setAge(i);
users[i] = user;
}
return users;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
User[] users = getUserArray(sendNumber);
for (User user : users) {
ctx.writeAndFlush(user);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client receive the msgpack message : " + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

服务端代码

public class EchoServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组
try(EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup()) {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加长度字段解码器
// 在MessagePack解码器之前增加LengthFieldBasedFrameDecoder,用于处理半包消息
// 它会解析消息头部的长度字段信息,这样后面的MsgpackDecoder接收到的永远是整包消息
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
// 添加MesspagePack解码器
ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
// 添加长度字段编码器
// 在MessagePack编码器之前增加LengthFieldPrepender,它将在ByteBuf之前增加2个字节的消息长度字段
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
// 添加MessagePack编码器
ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
// 添加业务处理handler
// 添加业务处理handler
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
if(args != null && args.length > 0) {
try {
port = Integer.valueOf(port);
} catch (NumberFormatException e) {
// TODO: handle exception
}
}
new EchoServer().bind(port);
}
}
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server receive the msgpack message : " + msg);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 发生异常,关闭链路
ctx.close();
}
}

POJO

@Message
public class User {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User [name=" + name + ", age=" + age + "]";
}
}

测试结果

服务器端输出
Server receive the msgpack message : ["ABCDEFG --->0",0]
Server receive the msgpack message : ["ABCDEFG --->1",1]
Server receive the msgpack message : ["ABCDEFG --->2",2]
······省去代码······
Server receive the msgpack message : ["ABCDEFG --->98",98]
Server receive the msgpack message : ["ABCDEFG --->99",99]
客户端输出
Client receive the msgpack message : ["ABCDEFG --->0",0]
Client receive the msgpack message : ["ABCDEFG --->1",1]
Client receive the msgpack message : ["ABCDEFG --->2",2]
······省去代码······
Client receive the msgpack message : ["ABCDEFG --->98",98]
Client receive the msgpack message : ["ABCDEFG --->99",99]

本文分享自微信公众号 - 落叶飞翔的蜗牛(A_GallopingSnail) ,作者:超神的蜗牛

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间: 2018-09-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

版权声明
本文为[黑洞代码]所创,转载请带上原文链接,感谢
https://cloud.tencent.com/developer/article/1773780

  1. To what extent can I go out to work?
  2. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  3. Implementation of rocketmq message sending based on JMeter
  4. How to choose the ticket grabbing app in the Spring Festival? We have measured
  5. Implementation of rocketmq message sending based on JMeter
  6. My programmer's Road: self study java
  7. My programmer's Road: self study java
  8. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  9. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  10. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  11. [Java training project] Java ID number recognition system
  12. How does serverless deal with the resource supply demand of k8s in the offline scenario
  13. Detailed explanation of HBase basic principle
  14. Explain the function of thread pool and how to use it in Java
  15. Kubernetes official java client 8: fluent style
  16. 010_MySQL
  17. Vibrant special purchases for the Spring Festival tiktok section, hundreds of good things to make the year more rich flavor.
  18. 010_MySQL
  19. Of the 4 million docker images, 51% have high-risk vulnerabilities
  20. Rocketmq CPP client visual studio 2019 compilation
  21. Rocketmq CPP client visual studio 2019 compilation
  22. Usage of data custom attribute in jquery
  23. Common decompression in Linux
  24. Upload large files in Java
  25. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  26. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  27. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  28. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  29. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  30. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  31. 010_ MySQL
  32. 010_ MySQL
  33. Fast integration of imsdk and Huawei offline push
  34. 消息队列之RabbitMQ
  35. Rabbitmq of message queue
  36. 初学java进制转换方面补充学习
  37. Learn java base conversion supplementary learning
  38. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  39. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  40. 初学java进制转换方面补充学习
  41. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  42. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  43. Learn java base conversion supplementary learning
  44. JDBC测试连接数据库
  45. JDBC test connection database
  46. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  47. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  48. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  49. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  50. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  51. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  52. Java微服务 vs Go微服务,究竟谁更强!?
  53. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  54. Who is stronger, Java microservice vs go microservice!?
  55. Java微服务 vs Go微服务,究竟谁更强!?
  56. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  57. Who is stronger, Java microservice vs go microservice!?
  58. springboot异常处理之404
  59. Spring boot exception handling 404
  60. Spring Boot Security 国际化 多语言 i18n 趟过巨坑