In depth analysis of rocketmq source code - message storage module

Vivo Internet technology 2021-11-25 18:19:33
depth analysis rocketmq source code

One 、 brief introduction

RocketMQ It is Alibaba's open source distributed message middleware , It draws on Kafka Realization , Support message subscription and publishing 、 Sequential message 、 Transaction message 、 Timing message 、 The message goes back 、 Dead letter queue and other functions .RocketMQ The structure is divided into four parts , As shown in the figure below :

  • Producer: Message producer , Support distributed cluster deployment .
  • Consumer: Message consumer , Support distributed cluster deployment .
  • NameServer: Name service , It's a very simple one Topic Route registry , Support Broker Dynamic registration and discovery ,Producer and Consumer adopt NameServer Dynamic perception Broker Routing information for .
  • Broker:Broker Mainly responsible for the storage of messages 、 Forward and query .

This article is based on Apache RocketMQ 4.9.1 Version analysis Broker How the message storage module in is designed .

Two 、 Storage architecture

RocketMQ The path of the message file is shown in the figure .

CommitLog

Message body and metadata storage body , Storage Producer Message body content written by end , Message content is not fixed length . Single file size default 1G, File name length is 20 position , Left padding , Remaining as start offset , such as 00000000000000000000 Represents the first document , The starting offset is 0, File size is 1G=1073741824; When the first file is full , The second document is 00000000001073741824, The starting offset is 1073741824, And so on .

ConsumeQueue

Message consumption queue ,Consumequeue Documents can be seen as based on CommitLog The index file of .ConsumeQueue Fixed length design for documents , Each item has 20 Bytes , Respectively 8 Bytes of CommitLog Physical offset 、4 Byte message length 、8 byte tag hashcode, Single file by 30W Items , You can access every entry randomly like an array , Every ConsumeQueue File size is about 5.72M.

IndexFile

Index file , Provides a way to key Or time interval to query the message . Single IndexFile The file size is about 400M, One IndexFile It can be saved 2000W An index ,IndexFile The underlying storage design is similar to JDK Of HashMap data structure .

Other documents : Include config Folder , Store runtime configuration information ;abort file , explain Broker Whether it is closed normally ;checkpoint file , Storage Commitlog、ConsumeQueue、Index Time stamp of the last disk swiping of the file . These are beyond the scope of this article .

Same as Kafka comparison ,Kafka Every Topic Each partition Corresponding to a file , Write in sequence , Brush the disc regularly . But once a single Broker Of Topic Too much , Sequential writes will degenerate into random writes . and RocketMQ Single Broker all Topic In the same CommitLog Write... In Chinese order , Is to ensure that the strict order of writing .RocketMQ To read a message, you need to read from ConsumeQueue Get the actual physical offset of the message and then go CommitLog Read message content , It will cause random reading .

2.1 Page Cache and mmap

In the official introduction Broker Before the implementation of the message storage module , Let's explain first Page Cache and mmap These two concepts .

Page Cache yes OS Caching of files , Used to speed up the reading and writing of files . Generally speaking , The speed of reading and writing files in sequence is close to the speed of reading and writing files in memory , The main reason is that OS Use Page Cache The mechanism optimizes the performance of read and write access operations , Use a portion of memory for Page Cache. For writing data ,OS It will be written to Cache Inside , And then, asynchronously, by pdflush The kernel thread will Cache The data in the disk is flushed to the physical disk . For reading data , If a miss occurs while reading a file Page Cache The situation of ,OS While accessing and reading files from the physical disk , It will read the data files of other adjacent blocks in sequence .

mmap Is to directly map the physical files on the disk to the memory address of the user state , Reduce the tradition IO The performance overhead of copying disk file data back and forth between the buffer of the operating system kernel address space and the buffer of the user application address space .Java NIO Medium FileChannel Provides map() Method can be implemented mmap.FileChannel ( file channel ) and mmap ( Memory mapping ) For comparison of reading and writing performance, please refer to This article .

2.2 Broker modular

The picture below is Broker Storage architecture diagram , It shows Broker The module processes the business flow from receiving the message to returning the response .

Service access layer :RocketMQ be based on Netty Of Reactor The multithreading model realizes the underlying communication .Reactor Main process pool eventLoopGroupBoss Responsible for creating TCP Connect , There is only one thread by default . After the connection is established , Then throw it to the Reactor Child thread pool eventLoopGroupSelector Handle read / write events .

defaultEventExecutorGroup be responsible for SSL verification 、 codec 、 Idle check 、 Network connection management . And then according to RomotingCommand Business request code of code Go to processorTable This local cache variable finds the corresponding processor, Encapsulated into task After the task , Submit to the corresponding business processor Processing thread pools to execute .Broker The module improves the system throughput through the four level thread pool .

Business processing layer : Deal with all kinds of problems through RPC The business request that is invoked , among :

  • SendMessageProcessor Responsible for handling Producer Request to send a message ;
  • PullMessageProcessor Responsible for handling Consumer Request for consumption message ;
  • QueryMessageProcessor Responsible for processing according to the message Key Wait for the request to query the message .

Storage logical layer :DefaultMessageStore yes RocketMQ Core class of storage logic , Provide message storage 、 Read 、 Delete and other capabilities .

File mapping layer : hold Commitlog、ConsumeQueue、IndexFile Files are mapped to storage objects MappedFile.

Data transfer layer : Support based on mmap Memory mapping for reading and writing messages , It also supports mmap Read the message 、 Read and write messages by writing messages to out of heap memory .

The following chapters will analyze... From the perspective of source code RocketMQ How do you achieve high performance storage .

3、 ... and 、 Message write

Take single message production as an example , The message writing timing logic is shown in the figure below , The business logic is as above Broker As shown in the storage architecture, it flows between layers .

The bottom message is written to the core code in CommitLog Of asyncPutMessage In the method , It is mainly divided into acquisition MappedFile、 Write a message to the buffer 、 There are three steps to submit a disk brushing request . It should be noted that there are spin locks or... Before and after these three steps ReentrantLock Lock of 、 Release the lock , Guarantee a single Broker Writing messages is serial .

//org.apache.rocketmq.store.CommitLog::asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
...
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
// Get the latest MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
...
// Write a message to the buffer
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
...
// Submit a disk brushing request
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
...
} finally {
putMessageLock.unlock();
}
...
}

Here's what these three steps do .

3.1 MappedFile initialization

stay Broker Initialization will start management MappedFile Created AllocateMappedFileService Asynchronous thread . Message processing thread and AllocateMappedFileService The thread passes through the queue requestQueue relation .

Called when a message is written AllocateMappedFileService Of putRequestAndReturnMappedFile The way to requestQueue Put in submission to create MappedFile request , Two... Will be built here at the same time AllocateRequest Put in queue .

AllocateMappedFileService Thread loop from requestQueue obtain AllocateRequest To create MappedFile. The message processing thread passes through CountDownLatch Wait to get the first MappedFile If the creation is successful, it will return .

When the message processing thread needs to be created again MappedFile when , At this point, you can directly get the pre created MappedFile. This is done through pre creation MappedFile , Reduce file creation wait time .

//org.apache.rocketmq.store.AllocateMappedFileService::putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// Request creation MappedFile
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
...
// Request to pre create the next MappedFile
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
...
// Get the created MappedFile
AllocateRequest result = this.requestTable.get(nextFilePath);
...
}
//org.apache.rocketmq.store.AllocateMappedFileService::run
public void run() {
..
while (!this.isStopped() && this.mmapOperation()) {
}
...
}
//org.apache.rocketmq.store.AllocateMappedFileService::mmapOperation
private boolean mmapOperation() {
...
// Get... From queue AllocateRequest
req = this.requestQueue.take();
...
// Determine whether to open the off heap memory pool
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// Open out of heap memory MappedFile
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} else {
// Ordinary MappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
...
//MappedFile preheating
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
...
}

Every time you create a new ordinary MappedFile request , Will create mappedByteBuffer, The following code shows Java mmap How is it realized .

//org.apache.rocketmq.store.MappedFile::init
private void init(final String fileName, final int fileSize) throws IOException {
...
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
...
}

If off heap memory is turned on , namely transientStorePoolEnable = true when ,mappedByteBuffer Just to read messages , Off heap memory is used to write messages , So as to realize the separation of message reading and writing . Out of heap memory objects are not created every time MappedFile All need to create , Instead, it is initialized according to the size of the off heap memory pool when the system is started . Each out of heap memory DirectByteBuffer Both with CommitLog Same file size , By locking the out of heap memory , Ensure that it will not be replaced into virtual memory .

//org.apache.rocketmq.store.TransientStorePool
public void init() {
for (int i = 0; i < poolSize; i++) {
// Assign and CommitLog Out of heap memory with the same file size
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
// Lock out of heap memory , Ensure that it will not be replaced into virtual memory
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}

above mmapOperation There is a paragraph in the method MappedFile Preheating logic . Why do you need to preheat files ? How to preheat files ?

Because by mmap mapping , It just establishes the mapping relationship between the process virtual memory address and the physical memory address , Did not Page Cache Load to memory . When reading and writing data, if there is no hit, write Page Cache Page break occurs , Reload data from disk to memory , This will affect the reading and writing performance . To prevent page missing exceptions , Prevent the operating system from scheduling related memory pages to swap space (swap space),RocketMQ By preheating the file , File preheating is realized as follows .

//org.apache.rocketmq.store.MappedFile::warmMappedFile
public void warmMappedFile(FlushDiskType type, int pages) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
// By writing 1G Bytes of 0 To allow the operating system to allocate physical memory space , If there is no fill value , The operating system does not actually allocate physical memory , Prevent page missing exceptions when writing messages
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
//prevent gc
if (j % 1000 == 0) {
Thread.sleep(0);
}
}
//force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
mappedByteBuffer.force();
}
...
this.mlock();
}
//org.apache.rocketmq.store.MappedFile::mlock
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
// By system call mlock Lock the of the file Page Cache, To prevent it from being exchanged to swap Space
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
// By system call madvise Advise the operating system , Indicates that the file will be accessed in the near future
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
}

in summary ,RocketMQ One file is pre created at a time to reduce file creation latency , Through file preheating, the page missing exception during reading and writing is avoided .

3.2 Message write

3.2.1 write in CommitLog

CommitLog The logical view of each message store in is shown in the following figure , TOTALSIZE Is the storage space occupied by the whole message .

The following table describes which fields each message contains , And the space occupied by these fields and field introduction .

Writing a message is a call to MappedFile Of appendMessagesInner Method .

//org.apache.rocketmq.store.MappedFile::appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
// use DirectBuffer still MappedByteBuffer Write operation
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
..
byteBuffer.position(currentPos);
AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
..
return result;
}
//org.apache.rocketmq.store.CommitLog::doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
...
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
...
// This is just to write the message to the buffer , The disc has not been actually brushed
byteBuffer.put(preEncodeBuffer);
msgInner.setEncodedBuff(null);
...
return result;
}

thus , The message is finally written to ByteBuffer, Not yet persistent to disk , When to persist , The next section will specifically talk about the disk brushing mechanism . Here's a question ConsumeQueue and IndexFile How is it written ?

The answer is to store the logical layer in the storage architecture diagram ReputMessageService.MessageStore At initialization time , Will start a ReputMessageService Asynchronous thread , After it starts, it will call... Continuously in the loop doReput Method , Used to inform ConsumeQueue and IndexFile updated .ConsumeQueue and IndexFile Asynchronous updates are possible because CommitLog The recovery... Is saved in ConsumeQueue and IndexFile Required queues and Topic Etc , Even if Broker Abnormal service downtime ,Broker After restart, you can according to CommitLog recovery ConsumeQueue and IndexFile.

//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::run
public void run() {
...
while (!this.isStopped()) {
Thread.sleep(1);
this.doReput();
}
...
}
//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::doReput
private void doReput() {
...
// obtain CommitLog New messages stored in
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// If there's new news , Call... Respectively CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex Build ConsumeQueue and IndexFile
DefaultMessageStore.this.doDispatch(dispatchRequest);
}
...
}

3.2.2 write in ConsumeQueue

As shown in the figure below ,ConsumeQueue Each record has a total of 20 Bytes , Respectively 8 Bytes of CommitLog Physical offset 、4 Byte message length 、8 byte tag hashcode.

ConsumeQueue The logic of record persistence is as follows .

//org.apache.rocketmq.store.ConsumeQueue::putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
...
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
...
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
}

3.2.3 write in IndexFile

IndexFile The logical structure of the file is shown in the figure below , Be similar to JDK Of HashMap Array plus linked list structure . Mainly by Header、Slot Table、Index Linked List Three parts .

Header:IndexFile The head of , Occupy 40 Bytes . It mainly contains the following fields :

  • beginTimestamp: The IndexFile The minimum storage time of the message contained in the file .
  • endTimestamp: The IndexFile The maximum storage time of the message contained in the file .
  • beginPhyoffset: The IndexFile The minimum number of messages contained in the file CommitLog File offset .
  • endPhyoffset: The IndexFile The maximum number of messages contained in the file CommitLog File offset .
  • hashSlotcount: The IndexFile The file contains hashSlot Total of .
  • indexCount: The IndexFile Used in the file Index Number of entries .

Slot Table: Default includes 500w individual Hash Slot , Every Hash Slots store the same hash The first one worth it IndexItem Storage location .

Index Linked List: By default, it contains at most 2000w individual IndexItem. Its composition is as follows :

  • Key Hash: news key Of hash, When according to key When searching, we compare its hash, It will be more key In itself .
  • CommitLog Offset: The physical displacement of the message .
  • Timestamp: The difference between the message storage time and the timestamp of the first message .
  • Next Index Offset: happen hash The next saved after the conflict IndexItem The location of .

Slot Table Each of them hash The tank stores IndexItem stay Index Linked List The location of , If hash When the conflict , new IndexItem Insert chain header , its Next Index Offset Chain header before storage in IndexItem Location , Simultaneous coverage Slot Table Medium hash The slot is the latest IndexItem Location . The code is as follows :

//org.apache.rocketmq.store.index.IndexFile::putKey
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
...
// from Slot Table Get the current latest message location
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
...
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
// Chain header before storage IndexItem Location
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// to update Slot Table in hash The value of the slot is the latest message location
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
...
}

To sum up, a complete message writing process includes : Synchronous write Commitlog File cache , Asynchronous build ConsumeQueue、IndexFile file .

3.3 Message swipe

RocketMQ Message disk brushing is mainly divided into synchronous disk brushing and asynchronous disk brushing .

(1) Synchronous brush set : Only after the message is actually persisted to disk RocketMQ Of Broker The end will really return to Producer Bring a successful ACK Respond to . Synchronous brush disc pair MQ Message reliability is a good guarantee , But the performance will have a greater impact , This model is widely used in general financial business .

(2) Asynchronous brush set : Be able to make full use of OS Of Page Cache The advantages of , As long as the message is written Page Cache Will be successful ACK Return to Producer End . Message swiping is done by background asynchronous thread submission , Reduced read and write latency , Improved MQ Performance and throughput . Asynchronous disk brushing includes two ways to open the off heap memory and not open the off heap memory .

stay CommitLog When submitting a disk brushing request in , Will be based on the current Broker The related configuration determines whether to brush the disk synchronously or asynchronously .

//org.apache.rocketmq.store.CommitLog::submitFlushRequest
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronous brush set
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous brush set
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
// Open asynchronous disk flushing of off heap memory
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}

GroupCommitService、FlushRealTimeService、CommitRealTimeService The inheritance relationship of the three is shown in the figure ;

GroupCommitService: Synchronous disk brushing thread . As shown in the figure below , The message is written to Page Cache After passage GroupCommitService Synchronous brush set , The message processing thread is blocked waiting for the disk brushing result .

//org.apache.rocketmq.store.CommitLog.GroupCommitService::run
public void run() {
...
while (!this.isStopped()) {
this.waitForRunning(10);
this.doCommit();
}
...
}
//org.apache.rocketmq.store.CommitLog.GroupCommitService::doCommit
private void doCommit() {
...
for (GroupCommitRequest req : this.requestsRead) {
boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
// Wake up the message processing thread waiting for the completion of disk brushing
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
...
}
//org.apache.rocketmq.store.MappedFile::flush
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
...
// Using the writeBuffer perhaps fileChannel Of position Not for 0 Time use fileChannel Perform forced disc brushing
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
// Use MappedByteBuffer Perform forced disc brushing
this.mappedByteBuffer.force();
}
...
}
}

FlushRealTimeService: The asynchronous disk flushing thread of out of heap memory is not opened . As shown in the figure below , The message is written to Page Cache after , The message processing thread immediately returns , adopt FlushRealTimeService Asynchronous brush set .

//org.apache.rocketmq.store.CommitLog.FlushRealTimeService
public void run() {
...
// Judge whether it is necessary to brush the disc periodically
if (flushCommitLogTimed) {
// Fixed dormancy interval The time interval
Thread.sleep(interval);
} else {
// If you wake up, brush the disc , Aperiodic brush disc
this.waitForRunning(interval);
}
...
// This way and GroupCommitService Using the same forced disc brushing method
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
...
}

CommitRealTimeService: Open the asynchronous disk flushing thread of off heap memory . As shown in the figure below , The message processing thread immediately returns... After writing the message to off heap memory . Follow up first through CommitRealTimeService Asynchronously submit messages from off heap memory to Page Cache, Again by FlushRealTimeService Thread asynchronous disk brushing .

Be careful : Asynchronously submit messages to Page Cache after , Business can start from MappedByteBuffer Read the message .

Messages are written to off heap memory writeBuffer after , Will pass isAbleToCommit Method to determine whether it has accumulated to at least the number of submitted pages ( Default 4 page ). If the number of pages reaches the minimum number of submitted pages , Then batch submit ; Otherwise, it still resides in off heap memory , There is a risk of losing messages here . Through this batch operation , Read and write Page Cahe Several pages apart , To reduce the Page Cahe The probability of read-write conflict , Read and write separation is achieved . The specific implementation logic is as follows :

//org.apache.rocketmq.store.CommitLog.CommitRealTimeService
class CommitRealTimeService extends FlushCommitLogService {
@Override
public void run() {
while (!this.isStopped()) {
...
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
...
// Put the news commit To the memory buffer , The final call is MappedFile::commit0 Method , Only the minimum number of submitted pages can be submitted successfully , Otherwise, it is still in off heap memory
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
if (!result) {
// Wake up the flushCommitLogService, Perform forced disc brushing
flushCommitLogService.wakeup();
}
...
this.waitForRunning(interval);
}
}
}
//org.apache.rocketmq.store.MappedFile::commit0
protected void commit0() {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
// The message is submitted to Page Cache, Did not actually brush the disc
if (writePos - lastCommittedPosition > 0) {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
}
}

The following summarizes the usage scenarios, advantages and disadvantages of the three disk brushing mechanisms .

Four 、 Message read

Message read logic is much simpler than write logic , The following focuses on the analysis according to offset Query messages and according to key How query messages are implemented .

4.1 according to offset Inquire about

The process of reading a message is to start with ConsumeQueue Find the message in CommitLog The physical offset address of , And then from CommitLog Read the entity content of the message in the file .

//org.apache.rocketmq.store.DefaultMessageStore::getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
long nextBeginOffset = offset;
GetMessageResult getResult = new GetMessageResult();
final long maxOffsetPy = this.commitLog.getMaxOffset();
// Find the corresponding ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
...
// according to offset Find the corresponding ConsumeQueue Of MappedFile
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long maxPhyOffsetPulling = 0;
int i = 0;
// The maximum size of information that can be returned , Not greater than 16M
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//CommitLog Physical address
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
maxPhyOffsetPulling = offsetPy;
...
// according to offset and size from CommitLog Get specific Message
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
...
// take Message Put the result set
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
}
// to update offset
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
...
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
return getResult;
}

4.2 according to key Inquire about

The process of reading a message is to use topic and key find IndexFile A record in an index file , According to the record CommitLog Of offset from CommitLog Read the entity content of the message in the file .

//org.apache.rocketmq.store.DefaultMessageStore::queryMessage
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) {
// obtain IndexFile The messages recorded in the index file are in CommitLog File physical offset address
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
...
for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);
...
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp();
}
...
// stay CommitLog File to get the message content
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
...
queryMessageResult.addMessage(result);
...
}
}
return queryMessageResult;
}

stay IndexFile Index file , lookup CommitLog The physical offset address of the file is implemented as follows :

//org.apache.rocketmq.store.index.IndexFile::selectPhyOffset
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
// Get the same hash value key One of the first IndexItme Storage location , That is, the first node of the linked list
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// Traversing linked list nodes
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// Qualified results are added phyOffsets
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
// Continue to traverse the list
nextIndexToRead = prevIndexRead;
}
...
}

5、 ... and 、 summary

This paper introduces... From the perspective of source code RocketMQ The core module of the storage system is realized , Including storage architecture 、 Message writing and message reading .

RocketMQ Put all the Topic All messages under are written to CommitLog Inside , Strict sequential writing is realized . Prevent... By file preheating Page Cache Be exchanged to swap Space , Reduce page missing interruption when reading and writing files . Use mmap Yes CommitLog Read and write files , The operation of the file is converted to the operation of the memory address directly , This greatly improves the efficiency of reading and writing files .

High performance requirements 、 In scenarios where data consistency requirements are not high , You can open out of heap memory , Read and write separation , Improve disk throughput . All in all , Memory module learning requires a certain understanding of the principle of the operating system . The extreme performance optimization scheme adopted by the author is worthy of our good study .

6、 ... and 、 reference

1.RocketMQ Official documents

author :vivo Internet server team -Zhang Zhenglin
版权声明
本文为[Vivo Internet technology]所创,转载请带上原文链接,感谢
https://javamana.com/2021/11/20211109101113009V.html

  1. MySQL Learning - Logging System Redo log and Bin log
  2. Springboot Common comments | @ configuration
  3. Mécanisme d'expiration du cache redis et d'élimination de la mémoire
  4. Analyse concise du code source redis 01 - configuration de l'environnement
  5. Redis source Concise Analysis 02 - SDS String
  6. Spring cloud gateway practice 2: more routing configuration methods
  7. Principe de mise en œuvre ultime du mécanisme de concurrence Java sous - jacent
  8. [démarrer avec Java 100 exemples] 13. Modifier l’extension de fichier - remplacement de chaîne
  9. Java期末作业——王者荣耀的洛克王国版游戏
  10. Elasticsearch聚合学习之五:排序结果不准的问题分析,阿里巴巴java性能调优实战
  11. Java期末作業——王者榮耀的洛克王國版遊戲
  12. Java final work - King's Glory Rock Kingdom Game
  13. 【网络编程】TCP 网络应用程序开发
  14. 【网络编程入门】什么是 IP、端口、TCP、Socket?
  15. 【網絡編程入門】什麼是 IP、端口、TCP、Socket?
  16. [Introduction à la programmation réseau] qu'est - ce que IP, port, TCP et socket?
  17. [programmation réseau] développement d'applications réseau TCP
  18. [Java Basics] comprendre les génériques
  19. Dix outils open source que les architectes de logiciels Java devraient maîtriser!!
  20. Java经典面试题详解,突围金九银十面试季(附详细答案,mysql集群架构部署方案
  21. java架构之路(多线程)synchronized详解以及锁的膨胀升级过程,mysql数据库实用教程pdf
  22. java整理,java高级特性编程及实战第一章
  23. java教程——反射,mongodb下载教程
  24. Java岗大厂面试百日冲刺 - 日积月累,每日三题【Day12,zookeeper原理作用
  25. Java后端互联网500道中高级面试题(含答案),linux钩子技术
  26. java8 Stream API及常用方法,java初级程序员面试
  27. java-集合-Map(双列)——迪迦重制版,2021Java开发社招面试解答之性能优化
  28. Flink处理函数实战之二:ProcessFunction类,java线程面试题目
  29. flex 布局详解,【Java面试题
  30. Linux basic command learning
  31. Why did docker lose to kubernetes? Docker employee readme!
  32. MySQL安装
  33. Elastic Search Aggregate Learning five: Problem Analysis of Uncertainty of sequencing results, Alibaba Java Performance Tuning Practical
  34. Installing, configuring, starting and accessing rabbitmq under Linux
  35. Oracle SQL injection summary
  36. Installation MySQL
  37. L'exposition à la photo d'essai sur la route i7 du nouveau vaisseau amiral de BMW Pure Electric a également été comparée à celle de Xiaopeng p7.
  38. spring JTA 关于异常处理的时机问题
  39. Le problème du temps de traitement des exceptions dans la JTA printanière
  40. Flink Handling Function Real War II: processfunction class, Java thread interview subject
  41. Oracle SQL injection summary
  42. [Java data structure] you must master the classic example of linked list interview (with super detailed illustration and code)
  43. Do you really know MySQL order by
  44. Record a java reference passing problem
  45. spring JTA 關於异常處理的時機問題
  46. Java - Set - Map (double file) - dija Rewriting, 2021 Java Developer's Performance Optimization
  47. Android入门教程 | OkHttp + Retrofit 取消请求的方法
  48. Java 8 Stream API and common methods, Java Junior Program interview
  49. Github 疯传!史上最强!BAT 大佬,2021年最新Java大厂面试笔试题分享
  50. git(3)Git 分支,zookeeper下载教程
  51. Java Backend Internet 500 questions d'entrevue moyennes et avancées (y compris les réponses), technologie de crochet Linux
  52. Entretien d'entretien d'usine Java post sprint de 100 jours - accumulation de jours et de mois, trois questions par jour [jour 12, fonction de principe de Zookeeper
  53. Tutoriel Java - reflection, tutoriel de téléchargement mongodb
  54. How to analyze several common key and hot issues in redis from multiple dimensions
  55. GIT (3) GIT Branch, Zookeeper Download tutoriel
  56. Tutoriel de démarrage Android | okhttp + Retrofit comment annuler une demande
  57. Design pattern [3.3] - Interpretation of cglib dynamic agent source code
  58. Share the actual operation of private collection project nodejs backend + Vue + Mysql to build a management system
  59. Springboot has 44 application initiators
  60. GitHub上标星2,java项目开发实训教程