Welcome to my WeChat official account. 【 Lao Zhou talks about architecture 】,Java The principle of the back-end mainstream technology stack 、 Source code analysis 、 Architecture and all kinds of Internet high concurrency 、 High performance 、 Highly available solutions .
In the previous articles, we talked about Kafka Infrastructure and construction , From the beginning of this article, we will analyze a wave of source code . We use this Kafka The version is 2.7.0, Its Client End is by Java Realization ,Server End is by Scala To achieve , In the use of Kafka when ,Client It is the first part that users touch , therefore , We from Client End start , We'll start with Producer End start , Today we'll come to Producer Analyze the source code .
First, let's show... Through a piece of code KafkaProducer How to use . In the following example , We use KafkaProducer To achieve Kafka The ability to send messages . In the example program , First of all, will KafkaProduce Write the configuration used To Properties in , The specific meaning of each configuration is explained in the notes . After this Properties Object is constructed for parameters KafkaProducer object , Finally through send Method completes sending , The code contains synchronous sending 、 There are two cases of asynchronous transmission .
As you can see from the code above Kafka It provides users with a very simple and convenient API, When use , Just two steps :
This paper mainly focuses on initialization KafkaProducer Examples and how to implement send Interface to send data .
I understand KafkaProducer Basic use of , Then let's take a closer look at the core logic of the method :
public KafkaProducer(Properties properties) {
this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}
Copy code
The user is directly using producer.send() Data sent , Have a look first send() Interface implementation
// Asynchronously to a topic send data
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
// towards topic Send data asynchronously , After sending the confirmation, the callback function is called
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
Copy code
The final implementation of data sending still calls Producer Of doSend() Interface .
4.1 Interceptor
First, the method will enter the interceptor set first ProducerInterceptors , onSend The method is to traverse the interceptor onSend Fang Law , The purpose of interceptor is to process data , Kafka It does not give the implementation of the default interceptor . If you need to use the interceptor function , You must implement the interface yourself .
4.1.1 Interceptor code
4.1.2 Interceptor core logic
ProducerInterceptor The interface consists of three methods :
onSend(ProducerRecord<K, V> var1)
: The method is encapsulated in KafkaProducer.send In the method , That is, it runs in the user's main thread . Ensure that the method is called before the message is serialized to calculate the partition . The user can do anything with the message in this method , But it's best to make sure that you don't modify the topic And zoning , Otherwise, it will affect the calculation of the target partition .onAcknowledgement(RecordMetadata var1, Exception var2)
: This method will be called before the message is answered or when the message fails to be sent , And it's usually in producer Before the callback logic triggers .onAcknowledgement Running on the producer Of IO In the thread , So don't put heavy logic in this method , Otherwise it will slow down producer The efficiency of sending messages is .close()
: close interceptor, Mainly used to carry out some resource cleaning work .Interceptors may be running in multiple threads , Therefore, in the specific implementation, users need to ensure their own thread safety . In addition, if more than one is specified interceptor, be producer They will be called in the specified order , And just capture each one interceptor Exceptions that may be thrown are recorded in the error log instead of being passed up .
4.2 Producer Of doSend Realization
Here is doSend() The concrete realization of :
stay doSend() Method implementation , One Record Data transmission , It is mainly divided into the following five steps :
Data sending process , It can be summarized as the above five points , The specific implementation of these parts will be analyzed in detail below .
5.1 obtain topic Of metadata Information
Producer adopt waitOnMetadata() Method to get the corresponding topic Of metadata Information , I'll talk about this next time .
5.2 key and value Serialization
Producer End to record Of key and value Value for serialization , stay Consumer The end is deserialized accordingly ,Kafka The serialization and deserialization algorithms provided internally are shown in the figure below : Of course, we can also customize the specific implementation of serialization , But in general ,Kafka These methods provided internally are enough to use .
5.3 Get the record To send to partition
obtain partition value , It can be divided into the following three situations :
The specific implementation is as follows :
// When record There is partition When the value of , Go straight back to , Call... Without partitioner Of partition Method to calculate (KafkaProducer.class)
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
Copy code
Producer Default partitioner yes org.apache.kafka.clients.producer.internals.DefaultPartitioner, Users can also customize partition The strategy of , The following is the specific implementation of the default partition policy :
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
Copy code
The core of the above default algorithm is the adhesive partition cache
5.4 towards RecordAccmulator Middle append record data
Let's talk about RecordAccumulator Look at this picture before , In this way, you will have an overall view of the whole sending process .
RecordAccmulator Assume the role of buffer . The default is 32 MB.
stay Kafka Producer in , Messages are not sent one by one broker Of , Instead, multiple messages form a ProducerBatch, Then from Sender Send it out at once , there batch.size It's not the number of messages ( Send as many as you can get together ), It's a size . The default is 16 KB, It can be optimized according to the specific situation .
stay RecordAccumulator in , The core parameter is :
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
Copy code
It's a ConcurrentMap,key yes TopicPartition class , Representing one topic One of the partition.value Is a containing ProducerBatch The two terminal queue . wait for Sender Thread sent to broker. Draw a picture to see :
I don't know if you have any questions about the above code ? Why isn't the code that allocates memory synchronized Allocation in synchronization block ? This leads to the following synchronized There are also... In the synchronization block tryAppend once .
Because at this time, other threads may have been created RecordBatch 了 , Cause extra memory requests .
If you put the allocated memory in synchronized What's wrong with the synchronization block ?
If the memory request is not received, the thread will wait , If it is placed in the synchronization block, it will not be released all the time Deque Queue lock , Then other threads will not be able to Deque The queue performs thread safe synchronization operations .
Follow me again tryAppend() Method , It's easier .
See illustration for the above code :
5.5 Wake up the sender Thread send RecordBatch
When record After writing successfully , If you find that RecordBatch The conditions for sending have been met ( Usually queue There are multiple batch, So the first ones added batch It must be possible to send ), Then it will wake up sender Threads , send out RecordBatch.
sender The thread of RecordBatch The treatment is in run() Method , The implementation of this method is as follows :
The core method is run() Methods org.apache.kafka.clients.producer.internals.Sender#sendProducerData
among pollTimeout It means the longest blocking until at least one channel is ready for the event you registered . return 0 It means that you have started .
Let's keep following :org.apache.kafka.clients.producer.internals.RecordAccumulator#ready
Finally, let's look at the method inside org.apache.kafka.clients.producer.internals.RecordAccumulator#drain, from accumulator The buffer gets the data to be sent , Maximum one-time hair max.request.size Size data .
Finally, in order to let you know Kafka Producer Have a macro structural understanding , Please look at the chart below. :
Brief description :
Okay , This paper deals with Kafka Producer The source code is analyzed , The next article will detail metadata And in Producer End metadata The update mechanism of . Coming soon ~