Talk about Kafka: source code analysis of producer

Lao Zhou chat architecture 2021-09-15 06:49:56
talk kafka source code analysis

One 、 Preface

In the previous articles, we talked about Kafka Infrastructure and construction , From the beginning of this article, we will analyze a wave of source code . We use this Kafka The version is 2.7.0, Its Client End is by Java Realization ,Server End is by Scala To achieve , In the use of Kafka when ,Client It is the first part that users touch , therefore , We from Client End start , We'll start with Producer End start , Today we'll come to Producer Analyze the source code .

Two 、Producer Use

First, let's show... Through a piece of code KafkaProducer How to use . In the following example , We use KafkaProducer To achieve Kafka The ability to send messages . In the example program , First of all, will KafkaProduce Write the configuration used To Properties in , The specific meaning of each configuration is explained in the notes . After this Properties Object is constructed for parameters KafkaProducer object , Finally through send Method completes sending , The code contains synchronous sending 、 There are two cases of asynchronous transmission .

 Insert picture description here As you can see from the code above Kafka It provides users with a very simple and convenient API, When use , Just two steps :

  • initialization KafkaProducer example
  • call send Interface sends data

This paper mainly focuses on initialization KafkaProducer Examples and how to implement send Interface to send data .

3、 ... and 、KafkaProducer Instantiation

I understand KafkaProducer Basic use of , Then let's take a closer look at the core logic of the method :

public KafkaProducer(Properties properties) {
this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
 Insert picture description here

Four 、 Message sending process

The user is directly using producer.send() Data sent , Have a look first send() Interface implementation

// Asynchronously to a topic send data 
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
// towards topic Send data asynchronously , After sending the confirmation, the callback function is called 
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
The final implementation of data sending still calls Producer Of doSend() Interface .

4.1 Interceptor

First, the method will enter the interceptor set first ProducerInterceptors , onSend The method is to traverse the interceptor onSend Fang Law , The purpose of interceptor is to process data , Kafka It does not give the implementation of the default interceptor . If you need to use the interceptor function , You must implement the interface yourself .

4.1.1 Interceptor code

 Insert picture description here 4.1.2 Interceptor core logic  Insert picture description here ProducerInterceptor The interface consists of three methods :

  • onSend(ProducerRecord<K, V> var1): The method is encapsulated in KafkaProducer.send In the method , That is, it runs in the user's main thread . Ensure that the method is called before the message is serialized to calculate the partition . The user can do anything with the message in this method , But it's best to make sure that you don't modify the topic And zoning , Otherwise, it will affect the calculation of the target partition .
  • onAcknowledgement(RecordMetadata var1, Exception var2): This method will be called before the message is answered or when the message fails to be sent , And it's usually in producer Before the callback logic triggers .onAcknowledgement Running on the producer Of IO In the thread , So don't put heavy logic in this method , Otherwise it will slow down producer The efficiency of sending messages is .
  • close(): close interceptor, Mainly used to carry out some resource cleaning work .

Interceptors may be running in multiple threads , Therefore, in the specific implementation, users need to ensure their own thread safety . In addition, if more than one is specified interceptor, be producer They will be called in the specified order , And just capture each one interceptor Exceptions that may be thrown are recorded in the error log instead of being passed up .

4.2 Producer Of doSend Realization

Here is doSend() The concrete realization of :

 Insert picture description here stay doSend() Method implementation , One Record Data transmission , It is mainly divided into the following five steps :

  • Confirm the data to be sent to topic Of metadata Is available ( If it's time to partition Of leader Existence is available , If the permission is turned on ,client Have corresponding authority ), without topic Of metadata Information , You need to get the corresponding metadata;
  • serialize record Of key and value;
  • Get the record To send to partition( You can specify , It can also be calculated according to the algorithm );
  • towards accumulator Middle append record data , The data will be cached first ;
  • If after adding data , Corresponding RecordBatch It's reached batch.size Size ( perhaps batch There is not enough space left to add the next item Record), Then wake up sender Thread sending data .

Data sending process , It can be summarized as the above five points , The specific implementation of these parts will be analyzed in detail below .

5、 ... and 、 Message sending process

5.1 obtain topic Of metadata Information

Producer adopt waitOnMetadata() Method to get the corresponding topic Of metadata Information , I'll talk about this next time .

5.2 key and value Serialization

Producer End to record Of key and value Value for serialization , stay Consumer The end is deserialized accordingly ,Kafka The serialization and deserialization algorithms provided internally are shown in the figure below :  Insert picture description here Of course, we can also customize the specific implementation of serialization , But in general ,Kafka These methods provided internally are enough to use .

5.3 Get the record To send to partition

obtain partition value , It can be divided into the following three situations :

  • To specify partition Under the circumstances , Take the specified value directly as partiton value ;
  • There is no indication of partition It's worth it, but it's worth it key Under the circumstances , take key Of hash Value and topic Of partition Take the remainder of a number to get partition value ;
  • Neither partition It's not worth it key When it's worth it , The first call randomly generates an integer ( Every subsequent call is incremented by this integer ), Compare this value with topic Usable partition Take the rest of the total to get partition value , That's what they say round-robin Algorithm .

The specific implementation is as follows :

// When record There is partition When the value of , Go straight back to , Call... Without partitioner Of partition Method to calculate (KafkaProducer.class)
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
Producer Default partitioner yes org.apache.kafka.clients.producer.internals.DefaultPartitioner, Users can also customize partition The strategy of , The following is the specific implementation of the default partition policy :

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
 Insert picture description here The core of the above default algorithm is the adhesive partition cache

5.4 towards RecordAccmulator Middle append record data

Let's talk about RecordAccumulator Look at this picture before , In this way, you will have an overall view of the whole sending process .

 Insert picture description here RecordAccmulator Assume the role of buffer . The default is 32 MB.

stay Kafka Producer in , Messages are not sent one by one broker Of , Instead, multiple messages form a ProducerBatch, Then from Sender Send it out at once , there batch.size It's not the number of messages ( Send as many as you can get together ), It's a size . The default is 16 KB, It can be optimized according to the specific situation .

stay RecordAccumulator in , The core parameter is :

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
It's a ConcurrentMap,key yes TopicPartition class , Representing one topic One of the partition.value Is a containing ProducerBatch The two terminal queue . wait for Sender Thread sent to broker. Draw a picture to see :  Insert picture description here

 Insert picture description here I don't know if you have any questions about the above code ? Why isn't the code that allocates memory synchronized Allocation in synchronization block ? This leads to the following synchronized There are also... In the synchronization block tryAppend once .

Because at this time, other threads may have been created RecordBatch 了 , Cause extra memory requests .

If you put the allocated memory in synchronized What's wrong with the synchronization block ?

If the memory request is not received, the thread will wait , If it is placed in the synchronization block, it will not be released all the time Deque Queue lock , Then other threads will not be able to Deque The queue performs thread safe synchronization operations .

Follow me again tryAppend() Method , It's easier .

 Insert picture description here See illustration for the above code :

 Insert picture description here 5.5 Wake up the sender Thread send RecordBatch

When record After writing successfully , If you find that RecordBatch The conditions for sending have been met ( Usually queue There are multiple batch, So the first ones added batch It must be possible to send ), Then it will wake up sender Threads , send out RecordBatch.

sender The thread of RecordBatch The treatment is in run() Method , The implementation of this method is as follows :  Insert picture description here  Insert picture description here

The core method is run() Methods org.apache.kafka.clients.producer.internals.Sender#sendProducerData

among pollTimeout It means the longest blocking until at least one channel is ready for the event you registered . return 0 It means that you have started .

 Insert picture description here Let's keep following :org.apache.kafka.clients.producer.internals.RecordAccumulator#ready  Insert picture description here Finally, let's look at the method inside org.apache.kafka.clients.producer.internals.RecordAccumulator#drain, from accumulator The buffer gets the data to be sent , Maximum one-time hair max.request.size Size data .

 Insert picture description here  Insert picture description here

6、 ... and 、 summary

Finally, in order to let you know Kafka Producer Have a macro structural understanding , Please look at the chart below. :

 Insert picture description here Brief description :

  • new KafkaProducer() Then create a background thread KafkaThread ( The actual running thread is Sender,KafkaThread It's right Sender Encapsulation ) scanning RecordAccumulator Is there any news in the news .
  • call KafkaProducer.send() Send a message , It's actually saving the message to RecordAccumulator in , It's actually saved to a Map in (ConcurrentMap<TopicPartition, Deque>), This message will be recorded in the same record batch ( The same subject, the same partition, the same batch ) Inside , All messages of this batch will be sent to the same subject and partition .
  • Independent threads in the background scan to RecordAccumulator When there's news in the news , Will send a message to Kafka In the cluster ( Not as soon as there is a message , It depends on whether the news ready)
  • If the transmission is successful ( The message was successfully written to Kafka), Just go back to one RecordMetaData object , It includes subject and partition information , And the offset recorded in the partition .
  • If the write fails , It will return an error , The producer will try to resend the message after receiving the error ( If allowed , The message will be saved to RecordAccumulator in ), After several times, if it still fails, an error message is returned .

Okay , This paper deals with Kafka Producer The source code is analyzed , The next article will detail metadata And in Producer End metadata The update mechanism of . Coming soon ~

