Netty source code analysis -- implementation of channeloutboundbuffer and flush process

Waste 2020-11-08 23:46:03
netty source code analysis implementation


The previous article said ,ChannelHandlerContext#write Just cache the data to ChannelOutboundBuffer, wait until ChannelHandlerContext#flush when , then ChannelOutboundBuffer The cached data is written to Channel in .
This article shares Netty in ChannelOutboundBuffer And Flush The process .
Source code analysis is based on Netty 4.1

Every Channel Of AbstractUnsafe#outboundBuffer All maintained a ChannelOutboundBuffer.
ChannelOutboundBuffer, Outbound data buffer , Responsible for caching ChannelHandlerContext#write​ The data of . Manage data through linked lists , Linked list nodes are internal classes Entry.

The key fields are as follows

Entry tailEntry; // The last node in the list , Add new nodes after .
Entry unflushedEntry; // The first node in the linked list that has not been refreshed
Entry flushedEntry; // The first node in the linked list that has been refreshed but not written to
int flushed; // The number of nodes that have been refreshed but data has not been written

ChannelHandlerContext#flush Before operation , You need to refresh the pending nodes first ( It's mainly about statistics this time ChannelHandlerContext#flush How many node data can the operation write ), from unflushedEntry Start . After refreshing, use flushedEntry Marks the first node to be written ,flushed Is the number of nodes to be written .

Let's share Netty The article in the process of reading and writing said ,AbstractUnsafe#write When processing write operations , Would call ChannelOutboundBuffer#addMessage Cache the data

public void addMessage(Object msg, int size, ChannelPromise promise) {
// #1
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(entry.pendingSize, false);
}

#1 Construct a Entry, Be careful , The object pool is used here RECYCLER, There is a detailed analysis of the article .
It's mainly about updating tailEntry and unflushedEntry
#2 If the current number of caches exceeds the threshold WriteBufferWaterMark#high, to update unwritable Mark is true, And trigger pipeline.fireChannelWritabilityChanged() Method .
because ChannelOutboundBuffer There is no size limit for linked lists , Accumulating data can lead to OOM,
To avoid this problem , We can do it in unwritable Mark is true when , No more caching of data .
Netty Only updated unwritable sign , Does not block data caching , We can implement this function as needed . Examples are as follows

if (ctx.channel().isActive() && ctx.channel().isWritable()) {
ctx.writeAndFlush(responseMessage);
} else {
...
}

addFlush Method is responsible for refreshing nodes (ChannelHandlerContext#flush This method is called before the operation to count the number of data that can be written )

public void addFlush() {
// #1
Entry entry = unflushedEntry;
if (entry != null) {
// #2
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
// #3
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
// #4
} while (entry != null);
// All flushed so reset unflushedEntry
// #5
unflushedEntry = null;
}
}

#1 from unflushedEntry The node starts processing
#2 assignment flushedEntry by unflushedEntry.
ChannelHandlerContext#flush After writing, it will be empty flushedEntry
#3 increase flushed
Set node's ChannelPromise Cannot be cancelled
#4 from unflushedEntry Start , Traverse the following nodes
#5 empty unflushedEntry, Indicates that all nodes have been refreshed .

nioBuffers Method is responsible for changing the current cached ByteBuf Forward to (jvm)ByteBuffer

public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
// #1
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
// #2
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
break;
}
nioBufferSize += readableBytes;
// #3
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount();
}
int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
// #4
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
nioBuffers[nioBufferCount++] = nioBuf;
} else {
...
}
if (nioBufferCount == maxCount) {
break;
}
}
}
entry = entry.next;
}
this.nioBufferCount = nioBufferCount;
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}

#1 Get from thread cache nioBuffers Variable , This avoids repeated construction ByteBuffer Array performance loss
#2 maxBytes, The maximum number of bytes in this operation .
maxBytes - readableBytes < nioBufferSize, Indicates that if this operation will exceed maxBytes, sign out
#3
buf.nioBufferCount(), obtain ByteBuffer Number ,CompositeByteBuf There may be more than one ByteBuffer form .
neededSpace, namely nioBuffers Array ByteBuffer Number ,nioBuffers When the length is not enough, it needs to be expanded .
#4
buf.internalNioBuffer(readerIndex, readableBytes), Use readerIndex, readableBytes Construct a ByteBuffer.
This involves ByteBuf Related knowledge , There is a detailed analysis of the article .

ChannelHandlerContext#flush After completion , You need to remove the corresponding cache node .

public void removeBytes(long writtenBytes) {
for (;;) {
// #1
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
// #2
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
// #3
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}

#1
current Method returns flushedEntry Nodes cache data .
result null when , Exit loop
#2 All data of the current node has been written to ,
progress Method to wake up the data node ChannelProgressivePromise The monitor of
writtenBytes Subtract the corresponding number of bytes
remove() Method to remove the node , Release ByteBuf,flushedEntry Mark back .
#3 Part of the data of the current node is written to , It should be this time ChannelHandlerContext#flush The last node of the operation
to update ByteBuf Of readerIndex, Next time you start reading data from here .
sign out

Remove data nodes

public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
// #1
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
// #2
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
// #3
e.recycle();
return true;
}

#1
flushed reduce 1
When flushed by 0 when ,flushedEntry The assignment is null, otherwise flushedEntry Point to the next node .
#2 Release ByteBuf
#3 The current node returns to the object pool , For reuse .

So let's see ChannelHandlerContext#flush The operation process .
ChannelHandlerContext#flush -> HeadContext#flush -> AbstractUnsafe#flush

public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// #1
outboundBuffer.addFlush();
// #2
flush0();
}

#1 Refresh outboundBuffer Middle data node
#2 Write operation

flush -> NioSocketChannel#doWrite

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
// #1
if (in.isEmpty()) {
clearOpWrite();
return;
}
// #2
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
// #3
writeSpinCount -= doWrite0(in);
break;
case 1: {
// #4
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
// #5
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// #6
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// #7
...
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}

#1 adopt ChannelOutboundBuffer#flushed Judge whether there is no data to write , If there is no data, clear the focus event OP_WRITE, Go straight back to .
#2 obtain ChannelOutboundBuffer in ByteBuf Maintenance of (jvm)ByteBuffer, And statistics nioBufferSize,nioBufferCount.
#3 There was no ByteBuffer, But there may be other types of data ( Such as FileRegion type ), call doWrite0 To continue processing , There's no more depth here
#4 only one ByteBuffer, call SocketChannel#write Write data to Channel.
#5 If the number of data written is less than or equal to 0, The data is not written out ( It may be because the socket buffer is full and so on ), Then we need to pay attention to Channel Upper OP_WRITE event , For the next time EventLoop take Channel When the poll comes out , Can continue to write data .
#6 remove ChannelOutboundBuffer Cache data nodes .
#7 There are many. ByteBuffer, call SocketChannel#write(ByteBuffer[] srcs, int offset, int length), Batch write , Similar to the previous case

Review previous articles 《 The implementation principle of event loop mechanism 》 Chinese vs NioEventLoop#processSelectedKey Method analysis

 ...
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}

This is going to call forceFlush Method , Write data again .

FlushConsolidationHandler
ChannelHandlerContext#flush It's a very expensive operation , May trigger system call , But the data can't be cached too long , Use FlushConsolidationHandler The trade-off between write latency and throughput can be achieved as much as possible .
FlushConsolidationHandler In the maintenance of explicitFlushAfterFlushes Variable ,
stay ChannelOutboundHandler#channelRead Call in flush, If the number of calls is less than explicitFlushAfterFlushes, Will intercept flush The operation is not executed .
stay channelReadComplete After the call flush, It won't intercept flush operation .

This article concerns ByteBuf Components , It is Netty Memory buffer in , There is an analysis of the article .

If you think this article is good , Welcome to my WeChat official account. , The series is being updated . Your concern is the driving force of my persistence !

版权声明
本文为[Waste]所创,转载请带上原文链接,感谢

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云