Netty: a high performance communication framework

IT tycoon says 2021-04-08 12:02:00
netty high performance communication framework


Netty What is it? ?

  • Netty It's an asynchronous 、 Network application framework based on event driven , For rapid development of high performance 、 A highly reliable network IO Program
  • Netty Mainly for TCP Under the agreement , oriented Clients High concurrency applications at the end , perhaps Peer-to-Peer The application of large amount of data continuous transmission under the scenario
  • Netty The essence is a NIO frame , It is suitable for a variety of application scenarios related to server communication

Netty Application scenarios of

  • Remote service invocation of distributed services RPC frame , such as Dubbo Just use Netty Frame work RPC
  • Netty As a basic communication component with high performance , Provides TCP/UDP、HTTP Equal protocol stack , And can customize and develop private protocol stack

I'm learning Netty Before , Let's first look at why Netty Can be widely used .

One 、IO Model

What is? I/O Model ?

Simple understanding is to use what kind of channel to send and receive data , And it largely determines the performance of program communication .

Java Supported by 3 Network programming model /IO Pattern

  • BIO Synchronized and blocked

The server implements a pattern of one thread per connection , That is, when the client has a connection request, the server needs to start a thread to process it . It is suitable for the fixed mechanism with small number of connections , The requirements for server resources are relatively high , If this connection doesn't do anything, it will cause unnecessary thread overhead .

  • NIO Synchronous nonblocking

The server implementation mode deals with multiple requests for one thread ( Connect ), That is, the connection requests sent by the client will be registered on the multiplexer , The multiplexer polls the connection to have I/O The request is processed . Selectors Selector To maintain the connection channel channel.Netty Based on the framework of NIO Realization .

  • AIO Asynchronous non-blocking

AIO Introduce the concept of asynchronous channel , Adopted Proactor Pattern , Simplified programming , Valid request to start thread . After the completion of the operating system, the server program is informed to start the thread to process , It is generally used in applications with more connections and longer connection time .

Two 、BIO Model

Each read-write request creates a thread to process .

BIO Model

2.1 BIO Programming flow

  1. The server starts a ServerSocket
  2. After the client sends the request , First, ask the server if there is thread response , If not, they will wait , Or be rejected
  3. If there is a response , The client thread will wait for the end of the request , And go on with it

Server side

public class BIOMain {
public static void main(String[] args) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(6666));
System.out.println(" The server has started , Port number :6666");
while (true){
System.out.println(" Wait for the client to connect ...");
// Wait for the client to connect , When there is no client connection , It will block
Socket socket = serverSocket.accept();
System.out.println(" client :" + socket.getLocalAddress() + " Successful connection ");
// Whenever a client comes in , Start a thread to process
new BioServer(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(serverSocket !=null) {
System.out.println(" The server is down ");
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

client :

public class BioServer extends Thread {
private Socket socket;
public BioServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
while (true) {
BufferedInputStream bufferedInputStream =
new BufferedInputStream(socket.getInputStream());
byte[] bytes = new byte[1024];
System.out.println(" Waiting for data to be sent ...");
// When there is no data , This place is going to block
int read = bufferedInputStream.read(bytes, 0, 1024);
String result = new String(bytes, 0, read);
System.out.println(">>> " + result);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

shortcoming : Each connection creates a thread , Consume CPU resources , If the process pool is added, the effect is not good , Because it's dealing with connections Accept and Read Places can cause thread blocking , Waste resources .

3、 ... and 、NIO Model

We know BIO The main problem with the model is where the thread is blocked , therefore ,NIO introduce Selector It solves the problem of thread blocking .

public class NioServer {
public static void main(String[] args) {
try {
// 1. Create a ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. Get the binding port
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 3. Set to non-blocking mode
serverSocketChannel.configureBlocking(false);
// 4. obtain Selector
Selector selector = Selector.open();
// 5. take serverSocketChannel Sign up to selector On , And set up selector Client side Accept Events of interest
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. Loop waiting for client connection
while (true) {
// When no events are registered to selector when , So let's go to the next loop
if (selector.select(1000) == 0) {
//System.out.println(" There are no events happening right now , So let's go to the next loop ");
continue;
}
// Get relevant SelectionKey aggregate
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
// Based on event processing handler
handler(selectionKey);
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Based on event processing , according to key Handle the corresponding channel events
* @param selectionKey
* @throws IOException
*/
private static void handler(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) { // If it is OP_ACCEPT event , Indicates that there is a new client connection
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
// Generate the corresponding Channel
SocketChannel socketChannel = channel.accept();
// take socketChannel Set to non blocking
socketChannel.configureBlocking(false);
System.out.println(" Client connection successful ... Generate socketChannel");
// Change the current socketChannel Sign up to selector On , Events of concern : read , At the same time socketChannel Associated with a Buffer
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024));
} else if (selectionKey.isReadable()) { // If it's a read event
// adopt key Reverse access Channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// Get the channel The associated buffer
//ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
ByteBuffer buffer = ByteBuffer.allocate(512);
// Put the present channel The data reads buffer Go inside
socketChannel.read(buffer);
System.out.println(" Read data from client :"+new String(buffer.array()));
//
ByteBuffer buffer1 = ByteBuffer.wrap("hello client".getBytes());
socketChannel.write(buffer1);
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} else if (selectionKey.isWritable()){ // If it's writing events
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
System.out.println(" Write events ");
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
}

In order to make it easy for everyone to have a clear understanding of NIO framework , Here's an overall flow chart to show :

NIO

Speaking of NIO, You have to know its three core modules :

NIO Three core parts :

  • Channel passageway : Duplex connection channel between client and server . So in the process of the request , Between client and server Channel It's just going on and on “ Connect 、 inquiry 、 To break off ” The process of . Until the data is ready , Re pass Channel Send it back .Channel There are mainly 4 A type of :FileChannel( Read data from file )、DatagramChannel( Reading and writing UDP Network protocol data )、SocketChannel( Reading and writing TCP Network protocol data )、ServerSocketChannel( Can monitor TCP Connect )
  • Buffer buffer : A buffer container for client to store server information , If the server has the data ready , Would pass Channel Go to Buffer In the buffer .Buffer Yes 7 A type of :ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer.
  • Selector Selectors : Server choice Channel A multiplexer for .Selector There are two core tasks : Whether the monitoring data is ready , The reply Channel.

NIO working principle :

NIO It's buffer oriented programming . It's reading data into a buffer , Move back and forth in the buffer if needed .

NIO Working mode —— Non-blocking mode :

Java NIO Non blocking mode of , To cause a thread to send a request or read data from a channel , But it can only get the data that is currently available , If no data is currently available , You will get nothing , Instead of keeping threads blocked .

NIO characteristic :

One thread maintains one Selector, Selector Maintain multiple Channel, When channel When there is an incident , Then the thread processes .

BIO and NIO Comparison of

  • BIO Processing data as a stream ,NIO Processing data in blocks , Block processing is more efficient than stream processing
  • BIO It's blocked , and NIO It's non blocking
  • BIO Operation is based on byte stream and character stream , and NIO Is based on channel and buffer To operate , Data is read from the channel to the buffer or written from the buffer to the channel ,selector Used to listen for multiple channel events ( such as : Connection request , Data arrival, etc ), So a single thread can listen to multiple client channels

NIO shortcoming :

Programming complexity , buffer Buffer Consider the read-write pointer switch . and Netty After sealing it , Optimize and provide an easy to use mode and interface , therefore Netty It is widely used in communication framework .

3、 ... and 、Netty

Netty It's an asynchronous 、 Network application framework based on event driven , It encapsulates NIO.

Netty frame :

Netty Execute the process

Netty And NIO The difference between server and client

Netty

NIO

Server side

NioServerSocketChannel

ServerSocketChannel

client

NioSocketChannel

SocketChanel

Threading model

Based on master-slave Reactor Multithreading model , It maintains two thread pools , One is to deal with Accept Connect , The other is dealing with read-write Events .

Server side :

@Slf4j
public class TcpServer extends Thread {
private Integer port;
public TcpServer(Integer port){
this.port = port;
}
@Override
public void run() {
// Create... Based on the host name and port number ip Socket address (ip Address + Port number )
InetSocketAddress socketAddress = new InetSocketAddress(port);
// Main thread group , Handle Accept The thread connecting the event , Here, the number of threads is set to 1 that will do ,netty Processing link events is single threaded by default , Over setting is a waste of cpu resources
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// The worker thread , Handle hadnler Worker thread , In fact, it's dealing with IO Reading and writing , The default thread data is CPU Multiply the number of cores by 2
EventLoopGroup workerGroup = new NioEventLoopGroup();
// establish ServerBootstrap example
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // initialization ServerBootstrap Thread group
.channel(NioServerSocketChannel.class) // Set the... To be instantiated ServerChannel class
.childHandler(new ServerChannelInitializer()) // initialization ChannelPipeline Responsibility chain
.localAddress(socketAddress)
.option(ChannelOption.SO_BACKLOG, 1024) // Set queue size
.childOption(ChannelOption.SO_KEEPALIVE, true); // Whether to start the heartbeat keeping alive mechanism
try {
// Binding port , Start receiving incoming connections , Asynchronous connection
ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
log.info("TCP The server starts listening on the port :{}", socketAddress.getPort());
if (channelFuture.isSuccess()) {
log.info("TCP Service started successfully -------------------");
}
// The main thread executes here wait Child thread end , The subthread is really listening and accepting requests ,
// closeFuture() It's opening up a channel The monitor for , Responsible for monitoring channel Whether it is closed or not ,
// If it detects channel Shut down the , The child thread will be released ,syncUninterruptibly() Let the main thread wait for the result of the sub thread synchronously
channelFuture.channel().closeFuture().sync();
log.info("TCP The service has been shut down ");
} catch (InterruptedException e) {
log.error("tcp server exception: {}", e.getMessage());
} finally {
// Close the main thread group
bossGroup.shutdownGracefully();
// Close the team
workerGroup.shutdownGracefully();
}
}
}

Customize Handler

@Slf4j
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
/**
* Client connection ID
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(" Client connected :{}", ctx.channel().localAddress().toString());
// Get the unique identity of the current client
String uuid = ctx.channel().id().asLongText();
log.info(" The currently connected client id:{}", uuid);
// The corresponding identification and channel Deposit to map in
CLIENT_MAP.put(uuid, ctx.channel());
}
/**
* Read messages sent by clients
* @param ctx
* @param msg Data sent by the client
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Use netty Provided ByteBuf Generate bytes Buffer, It maintains a byte array , Notice that it's not JDK Self contained ByteBuffer
ByteBuf byteBuf = (ByteBuf) msg;
// Read byteBuf
// Business processing
// Send a message back to the client
}
/**
* Triggered when client disconnects
* When the client actively breaks the link of the server , This channel is inactive . That is to say, the communication channel between client and server is closed and data can not be transmitted
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info(" Before disconnection ,CLIENT_MAP:{}", CLIENT_MAP);
// When the client disconnects , eliminate map Cached client information
CLIENT_MAP.clear();
log.info(ctx.channel().localAddress().toString() + " The channel is not active ! And close .");
log.info(" After disconnection ,CLIENT_MAP:{}", CLIENT_MAP);
// Closed flow
ctx.close();
}
/**
* Trigger when an exception occurs
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error(" Abnormal situation : {}", cause.toString());
}
/**
* channelRead Call after execution of method , Send a message to the client
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// writeAndFlush = write + flush: Write data to cache , And refresh
// You need to encode the data you send
ctx.writeAndFlush(Unpooled.copiedBuffer(" Received a message , return ok!"));
}
}

client :

public class NettyClient {
public void run(){
// An event loop group
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// The client starts helper
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println(" The client is ready , About to connect to the server ...");
// Connect server , And back to channelFuture object , It's used for asynchronous notification
// Generally in Socket Programming , Waiting for response results are blocked synchronously , and Netty It doesn't cause obstruction , because ChannelFuture It takes the form of an observer model to get the results
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6666).sync();
// Monitor channel closure
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// close netty
eventLoopGroup.shutdownGracefully();
}
}
}

Client defined Handler

public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" Client connected ..");
ctx.writeAndFlush(Unpooled.copiedBuffer("msg", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Read the message sent by the server
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(" Received server :" + ctx.channel().remoteAddress() + " The news of :" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
}
}

thus , One Netty The client server is set up , Start two services .

The client console prints the results :

The client is ready , About to connect to the server ... Client connected .. Received server :/127.0.0.1:6666 The news of : The server received your message , And send you a ok

Server console print results :

The server is ready Client connection address :/127.0.0.1:6666, Messages received :msg

source :

https://www.toutiao.com/i6930944968446362123/

This article is from WeChat official account. - IT Big guy says (itdakashuo)

The source and reprint of the original text are detailed in the text , If there is any infringement , Please contact the yunjia_community@tencent.com Delete .

Original publication time : 2021-03-28

Participation of this paper Tencent cloud media sharing plan , You are welcome to join us , share .

版权声明
本文为[IT tycoon says]所创,转载请带上原文链接,感谢
https://javamana.com/2021/04/20210408111751717c.html

  1. A love diary about http
  2. navicat连接win10 mysql8.0 报错2059
  3. [rocketmq source code analysis] in depth message storage (3)
  4. Implementation of service configuration center with spring cloud + Nacos (Hoxton version)
  5. SCIP: constructing data abstraction -- Explanation of queue and tree in data structure
  6. SCIP: abstraction of construction process -- object oriented explanation
  7. Using docker to build elasticsearch + kibana cluster
  8. What are the spring IOC features? I can't understand the source code!
  9. Spring cloud upgrade road - 2020.0. X - 3. Accesslog configuration of undertow
  10. 导致Oracle性能抖动的参数提醒
  11. 风险提醒之Oracle RAC高可用失效
  12. 小机上运行Oracle需要注意的进程调度bug
  13. Oracle内存过度消耗风险提醒
  14. Oracle SQL monitor
  15. 使用Bifrost实现Mysql的数据同步
  16. 揭秘Oracle数据库truncate原理
  17. 看了此文,Oracle SQL优化文章不必再看!
  18. Mybatis (3) map and fuzzy query expansion
  19. Kafka性能篇:为何这么“快”?
  20. 两个高频设计类面试题:如何设计HashMap和线程池
  21. [TTS] AIX - & gt; Linux -- Based on RMAN (real environment)
  22. 为什么学编程大部分人选Java编程语言?
  23. Redis 高可用篇:你管这叫 Sentinel 哨兵集群原理
  24. redis 为什么把简单的字符串设计成 SDS?
  25. [TTS] transfer table space AIX - & gt; Linux based on RMAN
  26. Linux 网卡数据收发过程分析
  27. Redis 高可用篇:你管这叫 Sentinel 哨兵集群原
  28. Redis 6.X Cluster 集群搭建
  29. [TTS] transfer table space AIX ASM - & gt; Linux ASM
  30. [TTS] transfer table space Linux ASM - & gt; AIX ASM
  31. 高性能通讯框架——Netty
  32. Brief introduction and test of orchestrator, a high availability management tool for MySQL
  33. [TTS] transfer table space Linux - & gt; AIX based on RMAN
  34. A love diary about http
  35. [rocketmq source code analysis] in depth message storage (3)
  36. Implementation of service configuration center with spring cloud + Nacos (Hoxton version)
  37. SiCp: abstraction of construction process -- object oriented explanation
  38. springboot网上点餐系统
  39. 【SPM】oracle如何固定执行计划
  40. 用好HugePage,告别Linux性能故障
  41. 3 W word long text, java basic interview questions! It's amazing!!!
  42. Spring cloud upgrade road - 2020.0. X - 3. Accesslog configuration of undertow
  43. Win10 uninstall mysql5.7
  44. CentOS下dotnet Core使用HttpWebRequest进行HTTP通讯,系统存在大量CLOSE_WAIT连接问题的分析,已解决。
  45. MySQL batch insert, how not to insert duplicate data?
  46. K8s cronjob application example
  47. Unconventional method, easy to deal with Oracle database critical exception
  48. How to use sqlplus - prelim in Oracle hang
  49. How to search Oracle official documents in full text
  50. Install mysql8.0 on win10
  51. Oracle OCR的备份与恢复
  52. Oracle kill session相关问题
  53. 《Oracle DBA工作笔记》第二章 常用工具和问题分析
  54. Oracle回收站及flashback drop
  55. Hand in hand to teach you to write a spring IOC container
  56. Exception in Java (1) - basic concept
  57. 3w 字长文爆肝 Java 基础面试题!太顶了!!!
  58. Error 2059 when Navicat connects to win10 mysql8.0
  59. Parameter reminder causing Oracle Performance jitter
  60. 「技术分享」Java线程状态间的互相转换看这个就行了