Kafka's producer principle and important parameters

osc_ 15vyay19 2021-01-21 12:37:02
kafka producer principle important parameters


Last one A case is given to illustrate how to consider a Kafka Cluster deployment , It's a reference , After all, people working in different companies will surely have their own set of implementation plans .

This time we return to the question of principle , This will continue Chapter one Style , Lead you to draw the picture step by step .

Kafka Of Producer principle

First we have to have a cluster , Then there are several servers in the cluster , Every server we call it Broker, In fact, one by one Kafka process .

If you remember Chapter one The content of , It's not hard to guess , There must be one next controller And multiple follower, There's another. ZooKeeper colony , At the beginning of our Broker Will be registered to our ZooKeeper Above the cluster .

then controller And I'll monitor ZooKeeper Changes in clusters , Change your metadata information when the cluster changes . also follower They will go to their eldest controller Where to synchronize metadata information , So a Kafka Metadata information on all servers in the cluster is consistent .

When the above preparations are completed , We officially start our producer content .

Noun 1——ProducerRecord

Before producers need to send messages to the cluster , We need to encapsulate every message as ProducerRecord object , This is done within the producer . After that, we will go through a process of serialization . Several previous columns have also mentioned , The data to be transmitted over the network are all binary byte data , Serialization is required to transfer .

There will be a problem , We need to send a message to Topic Under a leader partition in , But what about producers get To this topic Which section is next leader partition Well ?

Maybe some of them forgot , As a reminder ,controller It can be regarded as broker Leadership of , Manage the metadata of the cluster , and leader partition It is used for load balancing , They will be distributed and stored on different servers . The production data in the cluster is also good , Consumption data is good , All aimed at leader partition And operational .

Noun 2——partitioner

How to know which is leader partition, Just get the metadata .

It's not hard to get metadata , Just find a server under the cluster ( Because every server metadata in the cluster is the same ).

Noun 3—— buffer

At this time, the producer is not in a hurry to send out the message , Instead, put it in a buffer .

Noun 4——Sender

After putting the message in the buffer , At the same time there will be a separate thread Sender To package messages in batches Batch, It's not hard to think of if Kafka It's really a message by message transmission , A message is a network connection , Then the performance will be very poor . To improve throughput , So we took the method of batch .

One by one batch after , Start sending it to the corresponding host . At this time, after the first mentioned Kakfa Model in network design , And then write os cache, Then write it on the disk .

The picture below is what we have explained at that time Kafka Network design model .

Producer code

Set the parameters section

//  Create profile object 
Properties props = new Properties();
//  The purpose of this parameter is to get kafka Metadata of the cluster 
//  It's OK to write a host , More than one is safer 
//  The host name is used here , According to server.properties To decide 
//  When using the host name, you need to configure the hosts file ( a key )
props.put("bootstrap.servers""hadoop1:9092,hadoop2:9092,hadoop3:9092");  
//  This is responsible for sending key Serialize from string to byte array 
//  We can set... For each message key, I'll explain it later 
props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
//  This is responsible for sending you the actual message Serialize from string to byte array 
props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
//  The following are tuning , Then explain 
props.put("acks""-1");
props.put("retries"3);
props.put("batch.size"323840);
props.put("linger.ms"10);
props.put("buffer.memory"33554432);
props.put("max.block.ms"3000);

Create a producer instance

//  Create a Producer example : Thread resources , With each broker establish socket Connect resources 
KafkaProducer<StringString> producer = new KafkaProducer<StringString>(props);

Create a message

ProducerRecord<StringString> record = new ProducerRecord<>(
    "test-topic""test-value");

Of course, you can also specify a key, The effect will be explained later :

ProducerRecord<StringString> record = new ProducerRecord<>(
    "test-topic""test-key""test-value");

Send a message

With a callback function , If there is no exception, the message is sent successfully .

//  This is the mode of asynchronous sending 
producer.send(record, new Callback() {
 @Override
 public void onCompletion(RecordMetadata metadata, Exception exception) {
  if(exception == null) {
   //  Message sent successfully 
   System.out.println(" Message sent successfully ");  
  } else {
   //  Message delivery failed , Need to resend 
  }
 }
});
Thread.sleep(10 * 1000); 
  
//  This is the mode of synchronous transmission ( It's not used in general , Poor performance , Tests can be used )
//  You have to wait for the next series of steps to be completed , After sending the message 
//  A response with a message is returned to you , You're the only way out of this 
producer.send(record).get();

Close the connection

producer.close();

Dry time : Tuning part of the code

The part that distinguishes between a thoughtful typist is in fact 1 That part of tuning that hasn't been talked about there , Come out one by one and explain , It's the next one .

props.put("acks""-1");
props.put("retries"3);
props.put("batch.size"32384);
props.put("linger.ms"100);
props.put("buffer.memory"33554432);
props.put("max.block.ms"3000);

acks Message validation

props.put("acks""-1");
acks Message sending success judgment
-1
leader & all follower receive
1
leader receive
0
Just send the message

This acks Parameters have 3 It's worth , Namely -1,0,1, Set this 3 A different value will be kafka The basis for judging whether the message is sent successfully .Kafka The partitions in it have copies , If acks by -1. The message is being written to the leader partition after , These messages also need to be synchronized by all other replicas of this partition , Send successfully ( The corresponding code is the output System.out.println(" Message sent successfully ")), At this time, the performance of sending data is reduced .

If you set acks by 1, The message to be sent only needs to be written leader partition, Send successfully , But there's a risk of losing data in this way , For example, when the message is sent to leader partition after , This leader partition It went down immediately , The rest of this time follower Whoever elects to be leader, There is no message just sent .

If you set acks by 0, As long as the message is sent , The default sending is successful . Nothing .

retries Retry count ( important )

This parameter is still very important , In the production environment, it is necessary to set parameters , To set the number of message resends .

props.put("retries"3);

stay Kafka We may encounter all kinds of exceptions ( You can directly jump to the supplementary exception type below ), But no matter what kind of exception , There's a problem sending messages , Especially the network suddenly has problems , But the cluster can't throw every exception , Maybe the network will recover in the next second , So we have to set up a retry mechanism .

Add here : Set up retries after , In the cluster 95% All the anomalies will fly by themselves , I'm not kidding !

I configured... In the code 3 Time , In fact, the settings 5~10 It's all reasonable , Add a , If we need to set how often we try again , There are parameters , If I remember correctly retry.backoff.ms, Now I set 100 Try again in milliseconds , That is to say 0.1 second .

props.put("retry.backoff.ms",100);

batch.size Batch size

The default batch size is 16K, I've set it up here 32K, Larger settings can slightly improve throughput , Setting the size of this batch is also related to the message size , Suppose the size of a message is 16K, A batch is also 16K, In this way, the batch is meaningless . So we need to estimate the size of messages in the cluster in advance , Normally, it will be several times of the size .

props.put("batch.size"32384);

linger.ms Send time limit

For example, I now set the batch size to 32K, And one message is 2K, There is already 3 Message sent to , Total size is 6K, And there's no news from the producers , That's not enough 32K Do not send the cluster in the past ? Obviously not ,linger.ms It's just how long it's set , Even if it's not full Batch, It will also send , Now I set 100 millisecond , So even my Batch It's not full 32K,100 After milliseconds, it will send... To the cluster Batch.

props.put("linger.ms"100);

buffer.memory Buffer size

When our Sender Thread processing is very slow , And when production data is fast , If we don't have enough buffer in the middle , Producers can no longer produce data , So we need to increase the buffer memory a little bit , The default buffer size is 32M, In fact, it is basically reasonable .

props.put("buffer.memory"33554432);

How to verify that we should adjust the buffer size at this time , We can use general Java Test by calculating the end time minus the start time , When the end time minus the start time is greater than 100ms, We think at this time Sender Thread processing speed is slow , You need to increase the buffer size .

Of course, in general, we don't need to set this parameter ,32M In general, it is enough to deal with .

Long startTime=System.currentTime();
producer.send(record, new Callback() {
 @Override
 public void onCompletion(RecordMetadata metadata, Exception exception) {
  if(exception == null) {
   //  Message sent successfully 
   System.out.println(" Message sent successfully ");  
  } else {
   //  Message delivery failed , Need to resend 
  }
 }
});
Long endTime=System.currentTime();
If(endTime - startTime > 100){// It means that the memory is full 
  Explain that there is a problem
}

compression.type Compression way

compression.type, The default is none, Uncompressed , But it can also be used lz4 Compress , Efficiency is still good , Compression can reduce the amount of data , Improve throughput , But it will increase producer Terminal CPU expenses .

props.put("compression.type", lz4);

max.block.ms

Leave it to the source code , Set the blocking time of some methods .

props.put("max.block.ms", 3000);

max.request.size Maximum message size

max.request.size: This parameter is used to control the size of messages sent , The default is 1048576 byte , It's just 1M, This is usually too small , A lot of news may exceed 1mb Size , So we need to optimize and adjust ourselves , Set it up a little bigger ( Enterprises are generally set to 10M), Otherwise, the program runs well and suddenly comes with a 2M The news of , The system reported a mistake , That's not worth it .

props.put("max.request.size", 1048576);

request.timeout.ms request timeout

request.timeout.ms: This means that after sending a request out , He has a time limit , The default is 30 second , If 30 No response in seconds ( That is, the callback function above does not return ), Then it would be considered abnormal , Will throw out a TimeoutException Let's deal with it . If the company's network is not good , Adjust this parameter properly .

props.put("request.timeout.ms", 30000);

Add :Kafka The abnormal

Whether it's asynchronous or synchronous , It's possible for you to handle exceptions , Common exceptions are as follows :

  • LeaderNotAvailableException: This is if a machine hangs up , here leader Copy not available , Will cause you to write failure , Wait for the others follower Copy switch to leader After the copy , To continue writing , At this time, you can retry sending . If you usually restart kafka Of broker process , It will definitely lead to leader Switch , It will cause you to write an error , yes LeaderNotAvailableException

  • NotControllerException: This is the same thing , if Controller Where Broker Hang up , Then there will be a problem , Need to wait Controller Re election , At this time, the same is to try again

  • NetworkException: Network anomalies , Just try again . We configured a parameter before ,retries, He'll try again automatically , But if you try again a few times, it won't work , Will provide Exception Let's deal with it .

Parameters :retries The default value is 3

Parameters :retry.backoff.ms The interval between retries

summary

The above process is analyzed from the production message of the producer to the sending , This leads to the following various settings of parameters for the whole process , If we can understand these basic knowledge clearly , Believe that it will help you .

Source of the article : Say what you want , Click to view the original .

Kubernetes Administrator authentication (CKA) train

This time CKA The training is held in Shanghai , Based on the latest Syllabus , adopt Offline teaching 、 Interpretation of examination questions 、 Simulation exercise Methods such as , Help the students to master Kubernetes The theoretical knowledge and professional skills of , And do special intensive training for the exam , Let the students face it calmly CKA Certification examination , So that students can master Kubernetes Related knowledge , Can pass again CKA Certification examination , Trainees can attend the training many times , Until it's certified . Click on the picture below or read the link to see the details .

版权声明
本文为[osc_ 15vyay19]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210121123606213b.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云