Netty source code analysis -- channel pipeline mechanism and reading and writing process

Waste 2020-11-07 21:19:19
netty source code analysis channel

This article continues to read Netty Source code , analysis ChannelPipeline The principle of event propagation , as well as Netty Reading and writing process .
Source code analysis is based on Netty 4.1


Netty Medium ChannelPipeline It can be understood as interceptor chain , One was maintained ChannelHandler Linked list ,ChannelHandler The specific interceptor , You can read and write , Process the data .
ChannelHandler It can also be divided into two categories .
ChannelInboundHandler, monitor Channel State change , Such as channelActive,channelRegistered, Usually by rewriting ChannelOutboundHandler#channelRead Method to process the read data , Such as HttpObjectDecoder Parse the read data into (netty)HttpRequest.
ChannelOutboundHandler, Intercept IO event , Such as bind,connect,read,write, Usually by rewriting ChannelInboundHandler#write Method processing will write to Channel The data of . Such as HttpResponseEncoder, Convert the data to be written into Http Format .

ChannelPipeline The default implementation class of is DefaultChannelPipeline, It's in ChannelHandler The first and last of the linked list maintain two special ChannelHandler -- HeadContext,TailContext.
HeadContext Responsible for IO The event is forwarded to the corresponding UnSafe Handle , For example, as mentioned in the previous article register,bind,read Wait for the operation .
TailContext It's mainly about the bottom line , Such as channelRead Methods to release ByteBuf And so on .

Event communication

ChannelOutboundInvoker Responsible for the trigger ChannelOutboundHandler Methods , They have the same method name , It's just ChannelOutboundInvoker The method is missing ChannelHandlerContext Parameters .
Again ,ChannelInboundInvoker Responsible for the trigger ChannelInboundHandler Methods , but ChannelInboundInvoker The name of the method is more fire, Such as ChannelInboundInvoker#fireChannelRead Method , Trigger ChannelInboundHandler#channelRead.
ChannelPipeline and ChannelHandlerContext Both inherit these two interfaces .
But they work differently ,ChannelPipeline It's the interceptor chain , The actual request is delegated to ChannelHandlerContext Handle .
ChannelHandlerContext Interface ( namely ChannelHandler Context ) Maintain the upper and lower nodes of the linked list , It is used as a ChannelHandler Method parameter , Responsible for working with ChannelPipeline And others ChannelHandler Interaction . It allows dynamic modification Channel Properties of , to EventLoop Submit tasks , You can also go down to the next ( the previous )ChannelHandler Spread Events .
for example , stay ChannelInboundHandler#channelRead After processing the data , Can pass ChannelHandlerContext#write Write data to Channel.
ChannelInboundHandler#handler Method returns the real ChannelHandler, And use the ChannelHandler Perform the actual operation .
adopt DefaultChannelPipeline#addFirst And so on ChannelHandler when ,Netty Would be ChannelHandler Construct a DefaultChannelHandlerContext,handler Method returns the corresponding ChannelHandler.
HeadContext,TailContext It has also been realized. AbstractChannelHandlerContext,handler Method returns itself this.

We can also pass ChannelHandlerContext to EventLoop Submit asynchronous tasks Runnable() {
public void run() {

For operations with long blocking times , Using asynchronous task completion is a good choice .

Let's say DefaultChannelPipeline#fireChannelRead For example , Take a look at the spread of their events .

public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;

Use HeadContext As the start node , call AbstractChannelHandlerContext#invokeChannelRead Method to start calling the interceptor list .


static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
} else {
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// #1
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
} else {

handler Method to get AbstractChannelHandlerContext real Handler, Then trigger it ChannelPipeline#channelRead Method
because invokeChannelRead Method in HeadContext In the implementation of ,handler() Return here HeadContext, It will trigger HeadContext#channelRead


public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

HeadContext Method call ctx.fireChannelRead(msg), It's the next one ChannelInboundHandler Spread Events .


public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;

AbstractChannelHandlerContext#fireChannelRead(final Object msg) The method is mainly responsible for finding the next ChannelInboundHandler, And trigger it channelRead Method .

from DefaultChannelPipeline#fireChannelRead Method to see a complete call link :
#1 DefaultChannelPipeline adopt HeadContext Start calling
#2 ChannelInboundHandler After processing the current logic , call ctx.fireChannelRead(msg) Back propagation Events
#4 AbstractChannelHandlerContext Find the next one ChannelInboundHandler, And trigger it channelRead, To ensure that the interceptor chain continues to execute .

Be careful : about ChannelOutboundHandler The method in ,DefaultChannelPipeline from TailContext Start calling , And spread the event forward , And ChannelInboundHandler In the opposite direction .
Everybody is reading Netty Source code , about DefaultChannelPipeline Methods , Note that the underlying call to the method is ChannelInboundHandler still ChannelOutboundHandler Methods , And the direction of their spread .

If we define a Http Echo program , The schematic code is as follows

new ServerBootstrap().group(parentGroup, childGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpRequestDecoder());
p.addLast(new HttpResponseEncoder());
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new HttpEchoHandler());

among HttpEchoHandler Realized ChannelInboundHandler, And in channelRead Call in method ChannelHandlerContext#write Method return data .
that , The data flow is as follows -> head#channelRead -> HttpRequestDecoder#channelRead -> LoggingHandler#channelRead -> HttpEchoHandler#channelRead
Socket.write() <- head#write <- HttpResponseEncoder#write <- LoggingHandler#write <- ChannelHandlerContext#write

ChannelHandlerContext#write and DefaultChannelPipeline#write Different , The former finds a forward from the current node ChannelOutboundHandler Start calling , The latter is from tail Start calling .


Previous article 《 The implementation principle of event loop mechanism 》 Said in ,NioEventLoop#processSelectedKey in , adopt NioUnsafe#read Method treatment accept and read event . Let's take a look at some read The handling of events .

public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// #1
byteBuf = allocHandle.allocate(allocator);
// #2
// #3
if (allocHandle.lastBytesRead() <= 0) {
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
readPending = false;
// #4
byteBuf = null;
// #5
} while (allocHandle.continueReading());
// #6
// #7
if (close) {
// #8
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {

#1 Allocate memory to ByteBuf
#2 Read Socket Data to ByteBuf, By default, this will try to read 1024 Bytes of data .
#3 If lastBytesRead Method returns -1, Express Channel closed , Now release the current ByteBuf quote , Ready to close Channel
#4 Using the data read , Trigger ChannelPipeline#fireChannelRead, Usually we process data here .
#5 Determine if you need to continue reading data .
The default condition is , If the size of the read data is equal to the size of the attempted read data 1024 byte , Then continue reading .
#6 Reservation method , Provide to RecvByteBufAllocator Do some extension operations
#7 Trigger ChannelPipeline#fireChannelReadComplete, For example, convert the data read many times to an object .
#8 close Channel

Be careful ,ChannelPipeline#fireChannelRead If it doesn't continue to spread channelRead event , It's not going to work TailContext#channelRead Method , This is what we need to release on our own ByteBuf.
By inheritance SimpleChannelInboundHandler Class implementation ,SimpleChannelInboundHandler#channelRead Guaranteed final release ByteBuf.


We need to call ChannelHandlerContext#write Methods the trigger write operation .
ChannelHandlerContext#write -> HeadContext#write -> AbstractUnsafe#write

public final void write(Object msg, ChannelPromise promise) {
// #1
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
// #2
msg = filterOutboundMessage(msg);
// #3
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
} catch (Throwable t) {
safeSetFailure(promise, t);
// #4
outboundBuffer.addMessage(msg, size, promise);

#1 obtain AbstractUnsafe Maintained in ChannelOutboundBuffer, This class is responsible for caching write The data of , wait until flush And then actually write the data .
#2 AbstractChannel Extension methods provided to subclasses , You can do something ByteBuf Check , Conversion and other operations .
#3 Check the amount of data to be written
#4 Add data to ChannelOutboundBuffer In cache .
You can see ,write There's no real writing data , Instead, it puts the data in a buffered object ChannelOutboundBuffer.
ChannelOutboundBuffer You have to wait until ChannelHandlerContext#flush Then write .

ByteBuf yes Netty In charge of Channel Interactive memory buffer , and ByteBufAllocator,RecvByteBufAllocator Mainly responsible for allocating memory to ByteBuf, There is an article to analyze them .
ChannelOutboundBuffer It's mainly caching write data , wait until flush And then write Channel. There is an article to analyze it .

If you think this article is good , Welcome to my WeChat official account. , The series is being updated . Your concern is the driving force of my persistence !


