Netty解码器

黑洞代码 2021-01-14 16:26:31
腾讯云 Netty 技术开发 解码 解码器


Part1

1

Netty——分隔符和定长解码器

TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用一下4种方式:

1.消息长度固定。累计读取到指定长度的消息后就认为读取了一个完整的消息;将计数器置位,重新开始读取下一个数据报

2.将回车符作为消息结束符。如FTP协议

3.将特殊的分隔符作为消息结束的标志。换行符就是一种特殊的结束分隔符

4.通过在消息头中长度字段来表示消息的总长度

Netty对上述4种方式提供了统一的抽象,提供4种解码器来解决对应的问。

2

解码器介绍

DelimiterBasedFrameDecoder:自动完成以分隔符作为标识符的消息接码

FixedLengthFrameDecoder:自动完成对定长消息的接码

Part2

1

DelimiterBasedFrameDecoder客户端

public class DelimiterBasedFrameDecoderEchoClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler());
}
});
//异步链接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待客户端
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1");
}
}

2

DelimiterBasedFrameDecoder客户端处理类

public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
private byte[] req;
public DelimiterBasedFrameDecoderEchoClientHandler() {
req = ("hello world" + "$_").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 读取并打印消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

3

DelimiterBasedFrameDecoder服务器端

public class DelimiterBasedFrameDecoderEchoServer {
public void bind(int port) {
//配置服务器端NIO线程组
//NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//功能类似于NIO中的ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的参数
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
//绑定事件的处理类ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//DelimiterBasedFrameDecoder解码器 $_ 作为分隔符
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//StringDecoder解码器
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler());
}
});
//绑定端口,同步等待绑定操作完成
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new DelimiterBasedFrameDecoderEchoServer().bind(port);
}
}

4

DelimiterBasedFrameDecoder服务器端处理类

public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println("服务器端第" + count.incrementAndGet() + "次收到消息:" + body);
ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + "$_").getBytes());
//并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//将消息发送队列中的消息写入SocketChannel中,发送到对方
//防止频繁的唤醒Selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//发生异常关闭ChannelHandlerContext等资源
ctx.close();
}
}

5

DelimiterBasedFrameDecoder执行结果

服务器端第1次收到消息:hello world
服务器端第2次收到消息:hello world
服务器端第3次收到消息:hello world
服务器端第4次收到消息:hello world
服务器端第5次收到消息:hello world
服务器端第6次收到消息:hello world
服务器端第7次收到消息:hello world
服务器端第8次收到消息:hello world
服务器端第9次收到消息:hello world
服务器端第10次收到消息:hello world
······
服务器端第99次收到消息:hello world
服务器端第100次收到消息:hello world
客户端第1次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第2次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第3次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第4次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第5次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第6次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第7次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第8次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第9次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第10次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
·····
客户端第98次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第99次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
客户端第100次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018

Part3

1

FixedLengthFrameDecoder客户端

public class FixedLengthFrameDecoderEchoClient {
public void connect(int port, String host) {
try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler());
}
});
//异步链接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//等待客户端
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1");
}
}

2

FixedLengthFrameDecoder客户端处理类

public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter {
private AtomicInteger count = new AtomicInteger(0);
private byte[] req;
public FixedLengthFrameDecoderEchoClientHandler() {
req = ("hello world").getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
//循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 读取并打印消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

3

FixedLengthFrameDecoder服务器端

public class FixedLengthFrameDecoderEchoServer {
public void bind(int port) {
//配置服务器端NIO线程组
//NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组
try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){
//netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度
ServerBootstrap bootstrap = new ServerBootstrap();
//功能类似于NIO中的ServerSocketChannel
bootstrap.group(bossLoopGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的参数
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
//绑定事件的处理类ChildChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//FixedLengthFrameDecoder解码器
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11));
//StringDecoder解码器
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler());
}
});
//绑定端口,同步等待绑定操作完成
ChannelFuture channelFuture = bootstrap.bind(port).sync();
//等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 8888;
new FixedLengthFrameDecoderEchoServer().bind(port);
}
}

4

FixedLengthFrameDecoder服务器端处理类

public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String body = (String) msg;
System.out.println("服务器端收到消息:" + body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//将消息发送队列中的消息写入SocketChannel中,发送到对方
//防止频繁的唤醒Selector进行消息发送
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//发生异常关闭ChannelHandlerContext等资源
ctx.close();
}
}

5

FixedLengthFrameDecoder测试结果

服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
服务器端收到消息:hello world
······

END

本文分享自微信公众号 - 落叶飞翔的蜗牛(A_GallopingSnail) ,作者:超神的蜗牛

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间: 2018-09-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

版权声明
本文为[黑洞代码]所创,转载请带上原文链接,感谢
https://cloud.tencent.com/developer/article/1773778

  1. To what extent can I go out to work?
  2. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  3. Implementation of rocketmq message sending based on JMeter
  4. How to choose the ticket grabbing app in the Spring Festival? We have measured
  5. Implementation of rocketmq message sending based on JMeter
  6. My programmer's Road: self study java
  7. My programmer's Road: self study java
  8. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  9. All in one, one article talks about the use of virtual machine VirtualBox and Linux
  10. Java 使用拦截器无限转发/重定向无限循环/重定向次数过多报错(StackOverflowError) 解决方案
  11. [Java training project] Java ID number recognition system
  12. How does serverless deal with the resource supply demand of k8s in the offline scenario
  13. Detailed explanation of HBase basic principle
  14. Explain the function of thread pool and how to use it in Java
  15. Kubernetes official java client 8: fluent style
  16. 010_MySQL
  17. Vibrant special purchases for the Spring Festival tiktok section, hundreds of good things to make the year more rich flavor.
  18. 010_MySQL
  19. Of the 4 million docker images, 51% have high-risk vulnerabilities
  20. Rocketmq CPP client visual studio 2019 compilation
  21. Rocketmq CPP client visual studio 2019 compilation
  22. Usage of data custom attribute in jquery
  23. Common decompression in Linux
  24. Upload large files in Java
  25. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  26. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  27. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  28. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  29. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  30. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  31. 010_ MySQL
  32. 010_ MySQL
  33. Fast integration of imsdk and Huawei offline push
  34. 消息队列之RabbitMQ
  35. Rabbitmq of message queue
  36. 初学java进制转换方面补充学习
  37. Learn java base conversion supplementary learning
  38. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  39. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  40. 初学java进制转换方面补充学习
  41. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  42. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  43. Learn java base conversion supplementary learning
  44. JDBC测试连接数据库
  45. JDBC test connection database
  46. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  47. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  48. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  49. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  50. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  51. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  52. Java微服务 vs Go微服务,究竟谁更强!?
  53. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  54. Who is stronger, Java microservice vs go microservice!?
  55. Java微服务 vs Go微服务,究竟谁更强!?
  56. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  57. Who is stronger, Java microservice vs go microservice!?
  58. springboot异常处理之404
  59. Spring boot exception handling 404
  60. Spring Boot Security 国际化 多语言 i18n 趟过巨坑