Talk about Kafka: source code analysis of producer

Lao Zhou chat architecture 2021-09-15 06:49:56
talk kafka source code analysis

Welcome to my WeChat official account. 【 Lao Zhou talks about architecture 】,Java The principle of the back-end mainstream technology stack 、 Source code analysis 、 Architecture and all kinds of Internet high concurrency 、 High performance 、 Highly available solutions .

One 、 Preface

In the previous articles, we talked about Kafka Infrastructure and construction , From the beginning of this article, we will analyze a wave of source code . We use this Kafka The version is 2.7.0, Its Client End is by Java Realization ,Server End is by Scala To achieve , In the use of Kafka when ,Client It is the first part that users touch , therefore , We from Client End start , We'll start with Producer End start , Today we'll come to Producer Analyze the source code .

Two 、Producer Use

First, let's show... Through a piece of code KafkaProducer How to use . In the following example , We use KafkaProducer To achieve Kafka The ability to send messages . In the example program , First of all, will KafkaProduce Write the configuration used To Properties in , The specific meaning of each configuration is explained in the notes . After this Properties Object is constructed for parameters KafkaProducer object , Finally through send Method completes sending , The code contains synchronous sending 、 There are two cases of asynchronous transmission .

 Insert picture description here As you can see from the code above Kafka It provides users with a very simple and convenient API, When use , Just two steps :

  • initialization KafkaProducer example
  • call send Interface sends data

This paper mainly focuses on initialization KafkaProducer Examples and how to implement send Interface to send data .

3、 ... and 、KafkaProducer Instantiation

I understand KafkaProducer Basic use of , Then let's take a closer look at the core logic of the method :

public KafkaProducer(Properties properties) {
this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
 Copy code 

 Insert picture description here

Four 、 Message sending process

The user is directly using producer.send() Data sent , Have a look first send() Interface implementation

// Asynchronously to a topic send data 
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
// towards topic Send data asynchronously , After sending the confirmation, the callback function is called 
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
 Copy code 

The final implementation of data sending still calls Producer Of doSend() Interface .

4.1 Interceptor

First, the method will enter the interceptor set first ProducerInterceptors , onSend The method is to traverse the interceptor onSend Fang Law , The purpose of interceptor is to process data , Kafka It does not give the implementation of the default interceptor . If you need to use the interceptor function , You must implement the interface yourself .

4.1.1 Interceptor code

 Insert picture description here 4.1.2 Interceptor core logic  Insert picture description here ProducerInterceptor The interface consists of three methods :

  • onSend(ProducerRecord<K, V> var1): The method is encapsulated in KafkaProducer.send In the method , That is, it runs in the user's main thread . Ensure that the method is called before the message is serialized to calculate the partition . The user can do anything with the message in this method , But it's best to make sure that you don't modify the topic And zoning , Otherwise, it will affect the calculation of the target partition .
  • onAcknowledgement(RecordMetadata var1, Exception var2): This method will be called before the message is answered or when the message fails to be sent , And it's usually in producer Before the callback logic triggers .onAcknowledgement Running on the producer Of IO In the thread , So don't put heavy logic in this method , Otherwise it will slow down producer The efficiency of sending messages is .
  • close(): close interceptor, Mainly used to carry out some resource cleaning work .

Interceptors may be running in multiple threads , Therefore, in the specific implementation, users need to ensure their own thread safety . In addition, if more than one is specified interceptor, be producer They will be called in the specified order , And just capture each one interceptor Exceptions that may be thrown are recorded in the error log instead of being passed up .

4.2 Producer Of doSend Realization

Here is doSend() The concrete realization of :

 Insert picture description here stay doSend() Method implementation , One Record Data transmission , It is mainly divided into the following five steps :

  • Confirm the data to be sent to topic Of metadata Is available ( If it's time to partition Of leader Existence is available , If the permission is turned on ,client Have corresponding authority ), without topic Of metadata Information , You need to get the corresponding metadata;
  • serialize record Of key and value;
  • Get the record To send to partition( You can specify , It can also be calculated according to the algorithm );
  • towards accumulator Middle append record data , The data will be cached first ;
  • If after adding data , Corresponding RecordBatch It's reached batch.size Size ( perhaps batch There is not enough space left to add the next item Record), Then wake up sender Thread sending data .

Data sending process , It can be summarized as the above five points , The specific implementation of these parts will be analyzed in detail below .

5、 ... and 、 Message sending process

5.1 obtain topic Of metadata Information

Producer adopt waitOnMetadata() Method to get the corresponding topic Of metadata Information , I'll talk about this next time .

5.2 key and value Serialization

Producer End to record Of key and value Value for serialization , stay Consumer The end is deserialized accordingly ,Kafka The serialization and deserialization algorithms provided internally are shown in the figure below :  Insert picture description here Of course, we can also customize the specific implementation of serialization , But in general ,Kafka These methods provided internally are enough to use .

5.3 Get the record To send to partition

obtain partition value , It can be divided into the following three situations :

  • To specify partition Under the circumstances , Take the specified value directly as partiton value ;
  • There is no indication of partition It's worth it, but it's worth it key Under the circumstances , take key Of hash Value and topic Of partition Take the remainder of a number to get partition value ;
  • Neither partition It's not worth it key When it's worth it , The first call randomly generates an integer ( Every subsequent call is incremented by this integer ), Compare this value with topic Usable partition Take the rest of the total to get partition value , That's what they say round-robin Algorithm .

The specific implementation is as follows :

// When record There is partition When the value of , Go straight back to , Call... Without partitioner Of partition Method to calculate (KafkaProducer.class)
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
 Copy code 

Producer Default partitioner yes org.apache.kafka.clients.producer.internals.DefaultPartitioner, Users can also customize partition The strategy of , The following is the specific implementation of the default partition policy :

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
 Copy code 

 Insert picture description here The core of the above default algorithm is the adhesive partition cache

5.4 towards RecordAccmulator Middle append record data

Let's talk about RecordAccumulator Look at this picture before , In this way, you will have an overall view of the whole sending process .

 Insert picture description here RecordAccmulator Assume the role of buffer . The default is 32 MB.

stay Kafka Producer in , Messages are not sent one by one broker Of , Instead, multiple messages form a ProducerBatch, Then from Sender Send it out at once , there batch.size It's not the number of messages ( Send as many as you can get together ), It's a size . The default is 16 KB, It can be optimized according to the specific situation .

stay RecordAccumulator in , The core parameter is :

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
 Copy code 

It's a ConcurrentMap,key yes TopicPartition class , Representing one topic One of the partition.value Is a containing ProducerBatch The two terminal queue . wait for Sender Thread sent to broker. Draw a picture to see :  Insert picture description here

 Insert picture description here I don't know if you have any questions about the above code ? Why isn't the code that allocates memory synchronized Allocation in synchronization block ? This leads to the following synchronized There are also... In the synchronization block tryAppend once .

Because at this time, other threads may have been created RecordBatch 了 , Cause extra memory requests .

If you put the allocated memory in synchronized What's wrong with the synchronization block ?

If the memory request is not received, the thread will wait , If it is placed in the synchronization block, it will not be released all the time Deque Queue lock , Then other threads will not be able to Deque The queue performs thread safe synchronization operations .

Follow me again tryAppend() Method , It's easier .

 Insert picture description here See illustration for the above code :

 Insert picture description here 5.5 Wake up the sender Thread send RecordBatch

When record After writing successfully , If you find that RecordBatch The conditions for sending have been met ( Usually queue There are multiple batch, So the first ones added batch It must be possible to send ), Then it will wake up sender Threads , send out RecordBatch.

sender The thread of RecordBatch The treatment is in run() Method , The implementation of this method is as follows :  Insert picture description here  Insert picture description here

The core method is run() Methods org.apache.kafka.clients.producer.internals.Sender#sendProducerData

among pollTimeout It means the longest blocking until at least one channel is ready for the event you registered . return 0 It means that you have started .

 Insert picture description here Let's keep following :org.apache.kafka.clients.producer.internals.RecordAccumulator#ready  Insert picture description here Finally, let's look at the method inside org.apache.kafka.clients.producer.internals.RecordAccumulator#drain, from accumulator The buffer gets the data to be sent , Maximum one-time hair max.request.size Size data .

 Insert picture description here  Insert picture description here

6、 ... and 、 summary

Finally, in order to let you know Kafka Producer Have a macro structural understanding , Please look at the chart below. :

 Insert picture description here Brief description :

  • new KafkaProducer() Then create a background thread KafkaThread ( The actual running thread is Sender,KafkaThread It's right Sender Encapsulation ) scanning RecordAccumulator Is there any news in the news .
  • call KafkaProducer.send() Send a message , It's actually saving the message to RecordAccumulator in , It's actually saved to a Map in (ConcurrentMap<TopicPartition, Deque>), This message will be recorded in the same record batch ( The same subject, the same partition, the same batch ) Inside , All messages of this batch will be sent to the same subject and partition .
  • Independent threads in the background scan to RecordAccumulator When there's news in the news , Will send a message to Kafka In the cluster ( Not as soon as there is a message , It depends on whether the news ready)
  • If the transmission is successful ( The message was successfully written to Kafka), Just go back to one RecordMetaData object , It includes subject and partition information , And the offset recorded in the partition .
  • If the write fails , It will return an error , The producer will try to resend the message after receiving the error ( If allowed , The message will be saved to RecordAccumulator in ), After several times, if it still fails, an error message is returned .

Okay , This paper deals with Kafka Producer The source code is analyzed , The next article will detail metadata And in Producer End metadata The update mechanism of . Coming soon ~

本文为[Lao Zhou chat architecture]所创,转载请带上原文链接,感谢

  1. 入职3个月的Java程序员面临转正,字节跳动 京东 360 网易面试题整理,
  2. Lancement de l'ensemble du réseau, je fais l'expérience réelle de l'externalisation Java à Huawei!
  3. 全套Java視頻百度雲,終於找到一個看得懂的JVM內存模型了,
  4. Un ensemble complet de vidéos Java Baidu Cloud a finalement trouvé un modèle de mémoire JVM compréhensible.
  5. Déployez le projet Spring Boot avec docker, et parlez de la bonne posture pour que les programmeurs grandissent.
  6. 关于网络优化你必须要知道的重点,GC 堆排 Tomcat 算法题,
  7. 关于电商秒杀系统中防超卖处理方案简述,Java开发热门前沿知识,
  8. Les programmeurs Java qui sont entrés dans l'entreprise pendant trois mois ont dû faire face à une correction d'échelle, et les octets ont sauté dans le traitement des questions d'entrevue de JD 360 Netease.
  9. What is the new syntax of XX ≠ null in Java?
  10. Spring scheduled task cron expression (@ scheduled)
  11. Une brève description du plan de traitement anti - surproduction dans le système d'arrêt du commerce électronique et les connaissances de pointe du développement Java.
  12. Ce que vous devez savoir sur l'optimisation du réseau, c'est que le problème de l'algorithme Tomcat de gerbage GC,
  13. 凭借这份Java面试题集,成体系化的神级Java进阶笔记,
  14. 凭借这份Java面试题集,BAT大厂面试基础题集合,
  15. Docker Knowledge point collation
  16. Redis sur la réalisation élégante des tâches retardées
  17. 憑借這份Java面試題集,BAT大廠面試基礎題集合,
  18. Avec cet ensemble de questions d'entrevue Java, l'ensemble de questions de base d'entrevue de bat,
  19. Avec cet ensemble de questions d'entrevue Java, les notes avancées Java de niveau divin sont systématisées,
  20. Opérateurs arithmétiques et opérateurs de comparaison pour JavaScript, Introduction classique au développement web
  21. MySQL + +: slow query log analysis (I)
  22. Android Architect path 21 Responsive Programming RX Java thread transformation Principles
  23. Explorer le cadre open source Android - 1. Okhttp Source Analysis
  24. 分布式宝典:限流 缓存 通讯,Java开发中常见的一些问题面试专题,
  25. 分享面试经历的网站,腾讯大牛教你自己写Java框架!
  26. Expliquer les six principes de base du modèle de conception par des exemples réels
  27. Site Web pour partager vos expériences d'entrevue, Tencent Bull vous apprend à écrire votre propre cadre Java!
  28. Dictionnaire distribué: communication de cache limitée par le courant, sujets d'entrevue pour certaines questions courantes dans le développement Java,
  29. Another uncle circle man is angry! The high-quality acting skills make people admire and achieve the highlight of the ending of spring in Jade House
  30. 10. MySQL database import, export and authorization
  31. 9. MySQL data query
  32. 8. MySQL data operation DML
  33. 7. MySQL database table engine and character set
  34. 分享面試經曆的網站,騰訊大牛教你自己寫Java框架!
  35. Les points de connaissance de Java Real - time Video Download, Byte Jumping Java R & D post ont été divulgués à l'intérieur.
  36. Introduction au JavaScript chapitre 15 (objets, clairvoyance)
  37. 前方高能,Java程序员最大的悲哀是什么?
  38. 别再说你不会JVM性能监控和调优了,2021华为Java高级面试题及答案,
  39. Tencent private cloud MySQL solution tdsql
  40. 前方高能,Java程序員最大的悲哀是什麼?
  41. Quelle est la plus grande tristesse des programmeurs Java à l'avenir?
  42. 15 useful cron work examples commonly used by Senior Linux system administrators
  43. Ne dites pas que vous ne pouvez pas surveiller et ajuster les performances JVM, 2021 Huawei Java Advanced interview Questions and Answers,
  44. 別再說你不會JVM性能監控和調優了,2021華為Java高級面試題及答案,
  45. 十多家大厂Java面试真题锦集干货整理,Java开发者该学习哪些东西提高竞争力?
  46. 十分钟带你回顾Spring常问的知识点,35岁老年程序员的绝地翻身之路,
  47. 区区一个SpringBoot问题就被干趴下了,看完这一篇就够了!
  48. K8s gestion des ressources (opérations de base)
  49. Java and cloud native dating
  50. 區區一個SpringBoot問題就被幹趴下了,看完這一篇就够了!
  51. Un problème de démarrage de printemps dans le district a été séché, et c'est assez!
  52. Dix minutes pour vous faire passer en revue les points de connaissance que Spring demande souvent, le chemin du retour Jedi d'un programmeur âgé de 35 ans,
  53. Plus d'une douzaine de grandes usines Java interview vrai sujet brocade collection de marchandises sèches, les développeurs Java devraient apprendre ce qui améliore la compétitivité?
  54. 十分鐘帶你回顧Spring常問的知識點,35歲老年程序員的絕地翻身之路,
  55. Guide de route d'apprentissage Java, redis a plusieurs types de données?
  56. 华为Java高级工程师面试题,字节跳动上千道精选面试题还不刷起来!
  57. 华为Java面试题目,腾讯Java开发面试记录,
  58. Propriétés et méthodes des objets Array en javascript!,Pseudo - classes et pseudo - éléments pour CSS
  59. 華為Java面試題目,騰訊Java開發面試記錄,
  60. Huawei Java interview title, Tencent Java Development interview record,