高性能通讯框架——Netty

IT大咖说 2021-04-08 11:18:49
框架 性能 Netty 高性能 通讯


Netty是什么?

  • Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序
  • Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用
  • Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景

Netty的应用场景

  • 分布式服务的远程服务调用RPC框架,比如Dubbo就采用Netty框架做RPC
  • Netty作为高性能的基础通信组件,提供了TCP/UDP、HTTP等协议栈,并且能够定制和开发私有协议栈

在学习Netty之前,我们先来看一下为什么Netty能够被广泛使用。

一、IO模型

什么是I/O模型?

简单理解就是用什么样的通道进行数据的发送和接收,并且很大程序上决定了程序通信的性能。

Java中支持的3种网络编程模型/IO模式

  • BIO同步且阻塞

服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理。适用于连接数较小且固定的机构,对服务器资源要求比较高,如果这个连接不做任何事情就会造成不必要的线程开销。

  • NIO同步非阻塞

服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。选择器Selector来维护连接通道channel。Netty框架基于NIO实现。

  • AIO异步非阻塞

AIO引入异步通道的概念,采用了Proactor模式,简化了编程,有效的请求才启动线程。由操作系统完成后才通知服务端程序启动线程去处理,一般应用于连接数较多且连接时间较长的应用。

二、BIO模型

每次读写请求都会创建一个线程去处理。

BIO模型

2.1 BIO编程流程

  1. 服务端启动一个ServerSocket
  2. 客户端发送请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
  3. 如果有响应,客户端线程会等待请求结束后,再继续执行

服务端

public class BIOMain {
public static void main(String[] args) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(6666));
System.out.println("服务器已启动,端口号:6666");
while (true){
System.out.println("等待客户端连接...");
// 等待客户端连接,当没有客户端连接时,会阻塞
Socket socket = serverSocket.accept();
System.out.println("客户端:" + socket.getLocalAddress() + "连接成功");
// 每当有客户端连接进来,就启动一个线程进行处理
new BioServer(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(serverSocket !=null) {
System.out.println("服务器关闭了");
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

客户端:

public class BioServer extends Thread {
private Socket socket;
public BioServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
while (true) {
BufferedInputStream bufferedInputStream =
new BufferedInputStream(socket.getInputStream());
byte[] bytes = new byte[1024];
System.out.println("等待数据发送...");
// 当没有数据的时候,这个地方会阻塞
int read = bufferedInputStream.read(bytes, 0, 1024);
String result = new String(bytes, 0, read);
System.out.println(">>> " + result);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

缺点:每来一个连接都会创建一个线程,消耗CPU资源,如果加上线程池也效果不好,因为它在处理连接Accept和Read地方会造成线程阻塞,浪费资源。

三、NIO模型

我们知道BIO模型主要问题就在线程阻塞的地方,因此,NIO引入Selector就解决了线程阻塞的问题。

public class NioServer {
public static void main(String[] args) {
try {
// 1. 创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2. 获取绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
// 3. 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 4. 获取Selector
Selector selector = Selector.open();
// 5. 将serverSocketChannel注册到selector上, 并且设置selector对客户端Accept事件感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 循环等待客户端连接
while (true) {
// 当没有事件注册到selector时,继续下一次循环
if (selector.select(1000) == 0) {
//System.out.println("当前没有事件发生,继续下一次循环");
continue;
}
// 获取相关的SelectionKey集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey selectionKey = it.next();
// 基于事件处理的handler
handler(selectionKey);
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 基于事件处理的,根据key对应的通道发生的事件做相应的处理
* @param selectionKey
* @throws IOException
*/
private static void handler(SelectionKey selectionKey) throws IOException {
if (selectionKey.isAcceptable()) { // 如果是OP_ACCEPT事件,则表示有新的客户端连接
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
// 给客户端生成相应的Channel
SocketChannel socketChannel = channel.accept();
// 将socketChannel设置为非阻塞
socketChannel.configureBlocking(false);
System.out.println("客户端连接成功...生成socketChannel");
// 将当前的socketChannel注册到selector上, 关注事件:读, 同时给socketChannel关联一个Buffer
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024));
} else if (selectionKey.isReadable()) { // 如果是读取事件
// 通过key反向获取Channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 获取该channel关联的buffer
//ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
ByteBuffer buffer = ByteBuffer.allocate(512);
// 把当前channel数据读到buffer里面去
socketChannel.read(buffer);
System.out.println("从客户端读取数据:"+new String(buffer.array()));
//
ByteBuffer buffer1 = ByteBuffer.wrap("hello client".getBytes());
socketChannel.write(buffer1);
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} else if (selectionKey.isWritable()){ // 如果是写事件
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
System.out.println("写事件");
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
}

为了方便大家清晰地认识NIO架构,下面以一个总体流程图来展示:

NIO

说起NIO,就必须要知道其三大核心模块:

NIO三大核心部分:

  • Channel通道:客户端与服务端之间的双工连接通道。所以在请求的过程中,客户端与服务端中间的Channel就在不停地执行“连接、询问、断开”的过程。直到数据准备好,再通过Channel传回来。Channel主要有4个类型:FileChannel(从文件读取数据)、DatagramChannel(读写UDP网络协议数据)、SocketChannel(读写TCP网络协议数据)、ServerSocketChannel(可以监听TCP连接)
  • Buffer缓冲区:客户端存放服务端信息的一个缓冲区容器,服务端如果把数据准备好了,就会通过Channel往Buffer缓冲区里面传。Buffer有7个类型:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer。
  • Selector选择器:服务端选择Channel的一个复用器。Selector有两个核心任务:监控数据是否准备好,应答Channel。

NIO工作原理:

NIO是面向缓冲区编程的。它是将数据读取到缓冲区中,需要时可在缓冲区前后移动。

NIO工作模式——非阻塞模式:

Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能获得目前可用的数据,如果目前没有数据可用,就什么都不会获取,而不是保持线程阻塞。

NIO特点:

一个线程维护一个Selector, Selector维护多个Channel, 当channel有事件时,则该线程进行处理。

BIO和NIO的对比

  • BIO以流的方式处理数据,NIO以块的方式处理数据,块的方式处理数据比流的效率高
  • BIO是阻塞的,而NIO是非阻塞的
  • BIO是基于字节流和字符流进行操作,而NIO是基于channel和buffer进行操作,数据从通道读到缓冲区或者从缓冲区写到通道中,selector用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

NIO缺点:

编程复杂,缓冲区Buffer要考虑读写指针切换。而Netty把它封装之后,进行优化并提供了一个易于操作的使用模式和接口,因此Netty就被广泛使用于通信框架。

三、Netty

Netty是一个异步的、基于事件驱动的网络应用框架,它底层封装了NIO。

Netty框架:

Netty执行流程

Netty与NIO服务端和客户端的区别

Netty

NIO

服务端

NioServerSocketChannel

ServerSocketChannel

客户端

NioSocketChannel

SocketChanel

线程模型

基于主从Reactor多线程模型,它维护两个线程池,一个是处理Accept连接,另一个是处理读写事件。

服务端:

@Slf4j
public class TcpServer extends Thread {
private Integer port;
public TcpServer(Integer port){
this.port = port;
}
@Override
public void run() {
// 根据主机名和端口号创建ip套接字地址(ip地址+端口号)
InetSocketAddress socketAddress = new InetSocketAddress(port);
// 主线程组,处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程,处理hadnler的工作线程,其实也就是处理IO读写,线程数据默认为 CPU 核心数乘以2
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建ServerBootstrap实例
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) //初始化ServerBootstrap的线程组
.channel(NioServerSocketChannel.class) // 设置将要被实例化的ServerChannel类
.childHandler(new ServerChannelInitializer()) // 初始化ChannelPipeline责任链
.localAddress(socketAddress)
.option(ChannelOption.SO_BACKLOG, 1024) //设置队列大小
.childOption(ChannelOption.SO_KEEPALIVE, true); // 是否启动心跳保活机制
try {
// 绑定端口,开始接收进来的连接,异步连接
ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
log.info("TCP服务器开始监听端口:{}", socketAddress.getPort());
if (channelFuture.isSuccess()) {
log.info("TCP服务启动成功-------------------");
}
// 主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,
// closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,
// 如果监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果
channelFuture.channel().closeFuture().sync();
log.info("TCP服务已关闭");
} catch (InterruptedException e) {
log.error("tcp server exception: {}", e.getMessage());
} finally {
// 关闭主线程组
bossGroup.shutdownGracefully();
// 关闭工作组
workerGroup.shutdownGracefully();
}
}
}

自定义Handler

@Slf4j
public class TCPServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接标识
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端已连接:{}", ctx.channel().localAddress().toString());
// 获取当前客户端的唯一标识
String uuid = ctx.channel().id().asLongText();
log.info("当前连接的客户端id:{}", uuid);
// 将其对应的标识和channel存入到map中
CLIENT_MAP.put(uuid, ctx.channel());
}
/**
* 读取客户端发送的消息
* @param ctx
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 使用netty提供的ByteBuf生成字节Buffer,里面维护一个字节数组,注意不是JDK自带的ByteBuffer
ByteBuf byteBuf = (ByteBuf) msg;
// 读取byteBuf
// 业务处理
// 回消息给客户端
}
/**
* 客户端断开连接时触发
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("断开前,CLIENT_MAP:{}", CLIENT_MAP);
//当客户端断开连接时,清除map缓存的客户端信息
CLIENT_MAP.clear();
log.info(ctx.channel().localAddress().toString() + " 通道不活跃!并且关闭。");
log.info("断开后,CLIENT_MAP:{}", CLIENT_MAP);
// 关闭流
ctx.close();
}
/**
* 发生异常时触发
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("异常情况: {}", cause.toString());
}
/**
* channelRead方法执行完成后调用,发送消息给客户端
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// writeAndFlush = write + flush:将数据写入缓存,并刷新
// 需要对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("收到消息,返回ok!"));
}
}

客户端:

public class NettyClient {
public void run(){
// 一个事件循环组
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客户端启动helper
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客户端准备就绪,即将连接服务端...");
// 连接服务端,并返回channelFuture对象,它用来进来异步通知
// 一般在Socket编程中,等待响应结果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取类似观察者模式的形式进行获取结果
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6666).sync();
// 对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭netty
eventLoopGroup.shutdownGracefully();
}
}
}

客户端自定义的Handler

public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端已连接..");
ctx.writeAndFlush(Unpooled.copiedBuffer("msg", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 读取服务器发送的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("接收到服务器:" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
}
}

至此,一个Netty客户端服务器就搭建完成,启动两个服务。

客户端控制台打印结果:

客户端准备就绪,即将连接服务端... 客户端已连接.. 接收到服务器:/127.0.0.1:6666的消息:服务器收到了你的消息,并给你发送一个ok

服务端控制台打印结果:

服务端已经准备就绪 客户端连接地址:/127.0.0.1:6666, 收到的消息:msg

来源:

https://www.toutiao.com/i6930944968446362123/

本文分享自微信公众号 - IT大咖说(itdakashuo)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间: 2021-03-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

版权声明
本文为[IT大咖说]所创,转载请带上原文链接,感谢
https://cloud.tencent.com/developer/article/1810390

  1. Java 学生成绩管理系统课程设计,附源码!
  2. Java arbitrary audio to MP3
  3. DNS of docker
  4. Docker - build log monitoring system
  5. SSM + MySQL + Maven + Shiro WMS
  6. Top 10 reasons to fall in love with java!
  7. 一本关于HTTP的恋爱日记
  8. 【RocketMQ源码分析】深入消息存储(3)
  9. SpringCloud+Nacos实现服务配置中心(Hoxton版本)
  10. SCIP:构造数据抽象--数据结构中队列与树的解释
  11. SCIP:构造过程抽象--面向对象的解释
  12. 使用 docker 进行 ElasticSearch + Kibana 集群搭建
  13. Spring IOC 特性有哪些,不会读不懂源码!
  14. [DB Bao 41] use of PMM -- monitoring mysql, PG, mongodb, proxysql, etc
  15. Spring Cloud 升级之路 - 2020.0.x - 3. Undertow 的 accesslog 配置
  16. [DB Bao 42] MySQL high availability architecture MHA + proxysql realizes read-write separation and load balancing
  17. [DataGuard] recovery of physical DG in case of losing archive files in main database (7)
  18. MyBatis(3)Map和模糊查询拓展
  19. 【TTS】AIX-&gt;Linux--基于RMAN(真实环境)
  20. 【TTS】传输表空间AIX-&gt;linux基于rman
  21. 【TTS】传输表空间AIX asm -&gt; linux asm
  22. 【TTS】传输表空间Linux asm -&gt; AIX asm
  23. 【DB宝40】MySQL高可用管理工具Orchestrator简介及测试
  24. 【TTS】传输表空间Linux -&gt;AIX 基于rman
  25. 一本关于HTTP的恋爱日记
  26. 【RocketMQ源码分析】深入消息存储(3)
  27. SpringCloud+Nacos实现服务配置中心(Hoxton版本)
  28. SICP:构造过程抽象--面向对象的解释
  29. 3w 字长文爆肝 Java 基础面试题!太顶了!!!
  30. Spring Cloud 升级之路 - 2020.0.x - 3. Undertow 的 accesslog 配置
  31. win10卸载mysql5.7
  32. MySQL 批量插入,如何不插入重复数据?
  33. k8s cronjob应用示例
  34. 非常规方法,轻松应对Oracle数据库危急异常
  35. Oracle hang 之sqlplus -prelim使用方法
  36. 如何全文搜索oracle官方文档
  37. Java student achievement management system course design, with source code!
  38. win10安装mysql8.0
  39. 手把手教你写一个spring IOC容器
  40. JAVA 中的异常(1)- 基本概念
  41. A love diary about http
  42. navicat连接win10 mysql8.0 报错2059
  43. [rocketmq source code analysis] in depth message storage (3)
  44. Implementation of service configuration center with spring cloud + Nacos (Hoxton version)
  45. SCIP: constructing data abstraction -- Explanation of queue and tree in data structure
  46. SCIP: abstraction of construction process -- object oriented explanation
  47. Using docker to build elasticsearch + kibana cluster
  48. What are the spring IOC features? I can't understand the source code!
  49. Spring cloud upgrade road - 2020.0. X - 3. Accesslog configuration of undertow
  50. 导致Oracle性能抖动的参数提醒
  51. 风险提醒之Oracle RAC高可用失效
  52. 小机上运行Oracle需要注意的进程调度bug
  53. Oracle内存过度消耗风险提醒
  54. Oracle SQL monitor
  55. 使用Bifrost实现Mysql的数据同步
  56. 揭秘Oracle数据库truncate原理
  57. 看了此文,Oracle SQL优化文章不必再看!
  58. Mybatis (3) map and fuzzy query expansion
  59. Kafka性能篇:为何这么“快”?
  60. 两个高频设计类面试题:如何设计HashMap和线程池