Part1
1
Netty—— Separators and fixed length decoders
TCP Data transmission in the form of stream , The upper application protocol is to distinguish messages , It's often used 4 Ways of planting :
1. Fixed message length . After accumulatively reading messages of specified length, it is considered that a complete message has been read ; Set the counter to , Start reading the next datagram again
2. Use carriage return as the end of the message . Such as FTP agreement
3. Use a special separator as the end of a message . The line break is a special kind of end separator
4. The total length of the message is represented by the length field in the message header
Netty For the above 4 One way provides a unified abstraction , Provide 4 A decoder is designed to solve the corresponding problem .
2
Decoder Introduction
DelimiterBasedFrameDecoder: Automatically complete the message receiving code with separator as identifier
FixedLengthFrameDecoder: Automatically complete the code receiving of fixed length messages
Part2
1
DelimiterBasedFrameDecoder client
public class DelimiterBasedFrameDecoderEchoClient { public void connect(int port, String host) { try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoClientHandler()); } }); // Asynchronous link operation ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); // Wait for the client channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new DelimiterBasedFrameDecoderEchoClient().connect(port, "127.0.0.1"); } }
2
DelimiterBasedFrameDecoder Client processing class
public class DelimiterBasedFrameDecoderEchoClientHandler extends ChannelHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); private byte[] req; public DelimiterBasedFrameDecoderEchoClientHandler() { req = ("hello world" + "$_").getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; // Cycle to send 100 Bar message , Refresh every time you send one , In theory, the server will receive 100 strip hello world for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } /** * Read and print messages * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println(" Client page " + count.incrementAndGet() + " The first time I got a message :" + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
3
DelimiterBasedFrameDecoder Server side
public class DelimiterBasedFrameDecoderEchoServer { public void bind(int port) { // Configure the server side NIO Thread group //NioEventLoopGroup It's a thread group , Contains a set of NIO Threads , Dealing with network events , It's actually Reactor Thread group try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup()){ //netty Used to start NIO Server startup class , The aim is to reduce NIO The complexity of development ServerBootstrap bootstrap = new ServerBootstrap(); // Function like NIO Medium ServerSocketChannel bootstrap.group(bossLoopGroup, workerGroup) .channel(NioServerSocketChannel.class) // To configure NioServerSocketChannel Parameters of .option(ChannelOption.SO_BACKLOG, 1024) .handler(new LoggingHandler(LogLevel.INFO)) // Binding event handling class ChildChannelHandler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //DelimiterBasedFrameDecoder decoder $_ As a separator socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)); //StringDecoder decoder socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoderEchoServerHandler()); } }); // Binding port , Wait for the binding operation to complete ChannelFuture channelFuture = bootstrap.bind(port).sync(); // Wait for the server listening port to close channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new DelimiterBasedFrameDecoderEchoServer().bind(port); } }
4
DelimiterBasedFrameDecoder Server side processing class
public class DelimiterBasedFrameDecoderEchoServerHandler extends ChannelHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String body = (String) msg; System.out.println(" Server side second " + count.incrementAndGet() + " The first time I got a message :" + body); ByteBuf response = Unpooled.copiedBuffer((" current time :" + new Date() + "$_").getBytes()); // It's not sending messages directly to SocketChannel in , Just send the message to the buffer array , adopt flush Method to send a message to SocketChannel ctx.writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // Write messages from the message sending queue to SocketChannel in , Send to the other party // Prevent frequent wakeups Selector Send message ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // An abnormal shutdown occurred ChannelHandlerContext And so on ctx.close(); } }
5
DelimiterBasedFrameDecoder Execution results
Server side second 1 The first time I got a message :hello world Server side second 2 The first time I got a message :hello world Server side second 3 The first time I got a message :hello world Server side second 4 The first time I got a message :hello world Server side second 5 The first time I got a message :hello world Server side second 6 The first time I got a message :hello world Server side second 7 The first time I got a message :hello world Server side second 8 The first time I got a message :hello world Server side second 9 The first time I got a message :hello world Server side second 10 The first time I got a message :hello world ······ Server side second 99 The first time I got a message :hello world Server side second 100 The first time I got a message :hello world Client page 1 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 2 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 3 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 4 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 5 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 6 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 7 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 8 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 9 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 10 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 ····· Client page 98 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 99 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018 Client page 100 The first time I got a message : current time :Tue Sep 11 16:17:57 CST 2018
Part3
1
FixedLengthFrameDecoder client
public class FixedLengthFrameDecoderEchoClient { public void connect(int port, String host) { try (EventLoopGroup eventLoopGroup = new NioEventLoopGroup()){ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoClientHandler()); } }); // Asynchronous link operation ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); // Wait for the client channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new FixedLengthFrameDecoderEchoClient().connect(port, "127.0.0.1"); } }
2
FixedLengthFrameDecoder Client processing class
public class FixedLengthFrameDecoderEchoClientHandler extends ChannelHandlerAdapter { private AtomicInteger count = new AtomicInteger(0); private byte[] req; public FixedLengthFrameDecoderEchoClientHandler() { req = ("hello world").getBytes(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf message = null; // Cycle to send 100 Bar message , Refresh every time you send one , In theory, the server will receive 100 strip hello world for (int i = 0; i < 100; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); } } /** * Read and print messages * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println(" Client page " + count.incrementAndGet() + " The first time I got a message :" + body); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
3
FixedLengthFrameDecoder Server side
public class FixedLengthFrameDecoderEchoServer { public void bind(int port) { // Configure the server side NIO Thread group //NioEventLoopGroup It's a thread group , Contains a set of NIO Threads , Dealing with network events , It's actually Reactor Thread group try (EventLoopGroup bossLoopGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup()){ //netty Used to start NIO Server startup class , The aim is to reduce NIO The complexity of development ServerBootstrap bootstrap = new ServerBootstrap(); // Function like NIO Medium ServerSocketChannel bootstrap.group(bossLoopGroup, workerGroup) .channel(NioServerSocketChannel.class) // To configure NioServerSocketChannel Parameters of .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) // Binding event handling class ChildChannelHandler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //FixedLengthFrameDecoder decoder socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(11)); //StringDecoder decoder socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new FixedLengthFrameDecoderEchoServerHandler()); } }); // Binding port , Wait for the binding operation to complete ChannelFuture channelFuture = bootstrap.bind(port).sync(); // Wait for the server listening port to close channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 8888; new FixedLengthFrameDecoderEchoServer().bind(port); } }
4
FixedLengthFrameDecoder Server side processing class
public class FixedLengthFrameDecoderEchoServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { String body = (String) msg; System.out.println(" The server receives a message :" + body); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // Write messages from the message sending queue to SocketChannel in , Send to the other party // Prevent frequent wakeups Selector Send message ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // An abnormal shutdown occurred ChannelHandlerContext And so on ctx.close(); } }
5
FixedLengthFrameDecoder test result
The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world The server receives a message :hello world ······
END
This article is from WeChat official account. - The snail flying in the fallen leaves (A_GallopingSnail) , author : The supernatural snail
The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .
Original publication time : 2018-09-11
Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .