Part1
1
Netty——分隔符和定长解码器
TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用一下4种方式:
1.消息长度固定。累计读取到指定长度的消息后就认为读取了一个完整的消息;将计数器置位,重新开始读取下一个数据报
2.将回车符作为消息结束符。如FTP协议
3.将特殊的分隔符作为消息结束的标志。换行符就是一种特殊的结束分隔符
4.通过在消息头中长度字段来表示消息的总长度
Netty对上述4种方式提供了统一的抽象,提供4种解码器来解决对应的问。
2
解码器介绍
DelimiterBasedFrameDecoder:自动完成以分隔符作为标识符的消息接码
FixedLengthFrameDecoder:自动完成对定长消息的接码
Part2
1
DelimiterBasedFrameDecoder客户端
public class DelimiterBasedFrameDecoderEchoClient { public void connect(int port, String host) { try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler()); } }); //异步链接操作 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //等待客户端 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1"); } }
2
DelimiterBasedFrameDecoder客户端处理类
public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); private byte[] req; public DelimiterBasedFrameDecoderEchoClientHandler() { req = ("hello world" + "$_").getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } /** * 读取并打印消息 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
3
DelimiterBasedFrameDecoder服务器端
public class DelimiterBasedFrameDecoderEchoServer { public void bind(int port) { //配置服务器端NIO线程组 //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组 try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){ //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度 ServerBootstrap bootstrap = new ServerBootstrap(); //功能类似于NIO中的ServerSocketChannel bootstrap.group(bossLoopGroup, workerGroup) .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的参数 .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) //绑定事件的处理类ChildChannelHandler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //DelimiterBasedFrameDecoder解码器 $_ 作为分隔符 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); //StringDecoder解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler()); } }); //绑定端口,同步等待绑定操作完成 ChannelFuture channelFuture = bootstrap.bind(port).sync(); //等待服务器监听端口关闭 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new DelimiterBasedFrameDecoderEchoServer().bind(port); } }
4
DelimiterBasedFrameDecoder服务器端处理类
public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String body = (String) msg; System.out.println("服务器端第" + count.incrementAndGet() + "次收到消息:" + body); ByteBuf response = Unpooled.copiedBuffer(("当前时间:" + new Date() + "$_").getBytes()); //并不是直接把消息发送到SocketChannel中,只是把消息发送到缓冲数组,通过flush方法将消息发到SocketChannel ctx.writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //将消息发送队列中的消息写入SocketChannel中,发送到对方 //防止频繁的唤醒Selector进行消息发送 ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //发生异常关闭ChannelHandlerContext等资源 ctx.close(); } }
5
DelimiterBasedFrameDecoder执行结果
服务器端第1次收到消息:hello world 服务器端第2次收到消息:hello world 服务器端第3次收到消息:hello world 服务器端第4次收到消息:hello world 服务器端第5次收到消息:hello world 服务器端第6次收到消息:hello world 服务器端第7次收到消息:hello world 服务器端第8次收到消息:hello world 服务器端第9次收到消息:hello world 服务器端第10次收到消息:hello world ······ 服务器端第99次收到消息:hello world 服务器端第100次收到消息:hello world 客户端第1次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第2次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第3次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第4次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第5次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第6次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第7次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第8次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第9次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第10次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 ····· 客户端第98次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第99次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018 客户端第100次收到消息:当前时间:Tue Sep 11 16:17:57 CST 2018
Part3
1
FixedLengthFrameDecoder客户端
public class FixedLengthFrameDecoderEchoClient { public void connect(int port, String host) { try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler()); } }); //异步链接操作 ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //等待客户端 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1"); } }
2
FixedLengthFrameDecoder客户端处理类
public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); private byte[] req; public FixedLengthFrameDecoderEchoClientHandler() { req = ("hello world").getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; //循环发送100条消息,每发送一条就刷新一次,理论上服务器端会收到100条hello world for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } /** * 读取并打印消息 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println("客户端第" + count.incrementAndGet() + "次收到消息:" + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
3
FixedLengthFrameDecoder服务器端
public class FixedLengthFrameDecoderEchoServer { public void bind(int port) { //配置服务器端NIO线程组 //NioEventLoopGroup是个线程组,包含了一组NIO线程,处理网络事件,实际上就是Reactor线程组 try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){ //netty用于启动NIO服务端的启动类,目的是降低NIO开发的复杂度 ServerBootstrap bootstrap = new ServerBootstrap(); //功能类似于NIO中的ServerSocketChannel bootstrap.group(bossLoopGroup, workerGroup) .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的参数 .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) //绑定事件的处理类ChildChannelHandler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //FixedLengthFrameDecoder解码器 socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11)); //StringDecoder解码器 socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler()); } }); //绑定端口,同步等待绑定操作完成 ChannelFuture channelFuture = bootstrap.bind(port).sync(); //等待服务器监听端口关闭 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new FixedLengthFrameDecoderEchoServer().bind(port); } }
4
FixedLengthFrameDecoder服务器端处理类
public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String body = (String) msg; System.out.println("服务器端收到消息:" + body); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //将消息发送队列中的消息写入SocketChannel中,发送到对方 //防止频繁的唤醒Selector进行消息发送 ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //发生异常关闭ChannelHandlerContext等资源 ctx.close(); } }
5
FixedLengthFrameDecoder测试结果
服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world 服务器端收到消息:hello world ······
END
本文分享自微信公众号 - 落叶飞翔的蜗牛(A_GallopingSnail) ,作者:超神的蜗牛
原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。
原始发表时间: 2018-09-11
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。