Kafka of message queue

pluto-charon 2021-01-23 23:45:16
kafka message queue

Message queue activeMQ

Message queue RabbitMQ

1.kafka Introduce

kafka By scala A multi partition of language development , Multiple copies and living in zookeeper Coordinated distributed Publishing - Subscribe to the messaging system . With high throughput 、 Be persistent 、 Scalable horizontally 、 Support stream processing and other features ; It can support massive data transmission ; And persist the message to disk , And create a backup of the message to ensure the security of the data .kafka While ensuring high processing speed , It can also ensure the low delay of data processing and zero loss of data .

kafka Characteristics of :

  1. High throughput , Low latency :kafka Can process hundreds of thousands of messages per second , The minimum delay is about milliseconds , Each topic can be divided into several sections , The consumption group performs consumption operation on the partition
  2. Extensibility : Support for hot expansion
  3. persistence , reliability : Messages are persisted to the local disk , And support data backup
  4. Fault tolerance : Allow nodes in the cluster to fail , If the number of copies is n, allows n-1 Nodes failed
  5. High concurrency : Allow thousands of clients to read and write at the same time
  6. Scalability :kafka It can easily expand or shrink during operation ; Can expand one kafka Topics to include more partitions

kafka Main application scenarios of :

  • Message processing
  • Website tracking
  • Index storage
  • Log aggregation
  • Streaming
  • The source of the event

The basic flow :

kafka The key role of :

  • Producer: Producers are the publishers of data , The role publishes messages to kafka Of topic in
  • Consumer: consumer , It can be downloaded from broker Read data from
  • Consumer Group: Every Consumer Belong to a particular Consumer Group( For each Consumer Appoint group name, If not specified group name It belongs to the default group)
  • Topic: A Category attribute of the class to which the data belongs
  • Partition:topic The data in is divided into one or more partition, Every topic It contains at least one partition
  • Partition offset: Every message has a current partition The only one under 64 Bytes of offset, It named the beginning of the message
  • Replicas of Partition: copy , It's a backup of a partition
  • Broker:kafka The cluster contains one or more servers , The node of the server is called broker
  • Leader: Every partition From multiple copies , There is and only one of them leader,leader Is currently responsible for reading and writing data partition
  • Follower:Follower Follow Leader, All write requests are made through leader route , Data changes are broadcast to all follower On ,follower And leader Keep your data in sync
  • AR: All copies in the partition are collectively referred to as AR
  • ISR: All and leader Parts keep a certain degree of replica composition ISR
  • OSR: And leader Replica synchronization lag too many replicas
  • HW: High water level , Identifies a specific offset, Consumers can only pull this offset Previous news
  • LEO: That is, the log end shift , Record the displacement value of the next message in the underlying log of the replica

2.kafka Installation

install kafka The premise is to install zookeeper as well as jdk Environmental Science . The version I installed here is jdk1.8.0_20,kafka_2.11-1.0.0,zookeeper-3.4.14.kafka And jdk The version of must correspond to . I used to use kafka_2.12_2.3.0, No way.

1. take kafka Upload your files to home Directory and unzip to /usr/local Under the table of contents

root@localhost home]# tar -xvzf kafka_2.11-1.0.0.tgz -C /usr/local

2. Get into kafka Of config

[root@localhost local]# cd /usr/local/kafka_2.11-1.0.0/config

3. edit server.properties file

# If it's in a cluster environment , Then each broker.id To set it to different
# Open the next line , This is equivalent to kafka Access to external services
# Log storage location :log.dirs=/tmp/kafka_logs Change it to
# modify zookeeper The address of
# modify zookeeper Connection timeout for , The default is 6000( It may time out )

3. start-up zookeeper

Because I'm configured zookeeper colony , So we need to put three zookeeper All started . Only start a single server zookeeper It will not be possible at the time of the election ( When more than half of the entire cluster goes down ,zookeeper The cluster will be considered unavailable )

[root@localhost ~]# zkServer.sh start
# Check the status
[root@localhost ~]# zkServer.sh status

4. start-up kafka

[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties
# You can also start in the background , If you don't use background boot , After starting, you need to open a new window to operate
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh -daemon config/server.properties

5. Create a theme

# --zookeeper: It specifies kafka The connected zookeeper Service address of
# --partitions: Specifies the number of partitions
# --replication-factor: The replica factor is specified
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic charon --partitions 2 --replication-factor 1
Created topic "charon".

6. Show all the themes ( Verify that there is a problem with the theme you created )

[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list

7. See the details of a topic

[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic charon
Topic:charon PartitionCount:2 ReplicationFactor:1 Configs:
Topic: charon Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: charon Partition: 1 Leader: 0 Replicas: 0 Isr: 0

8. Open a new window to start consumers to receive messages .

--bootstrap-server: Specify the connection kafka Cluster address ,9092 yes kafka Port of service . Because the specific address is configured in my configuration file , So we need to write down the specific address . Otherwise it will be reported [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. The fault of

[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server --topic charon

9. Open a new window and start the producer to generate messages

--bootstrap-server: Specify the connection kafka Cluster address ,9092 yes kafka Port of service . Because the address is configured in my configuration file .

[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list --topic charon

10. Generate and consume messages

# Producer production message
>hello charon good evening
# The message that the consumer receives
hello charon good evening

Of course, in this way , It can only be realized in the same network segment .

3. producers and consumers

kafka production process :

1)producer First from zookeeper Of "/brokers/.../state" The node finds the partition Of leader

2)producer Send the message to this leader

3)leader Write the message locally log

4)followers from leader pull news , Write local log Back leader send out ACK

5)leader Received all ISR Medium replication Of ACK after , increase HW(high watermark, Last commit Of offset) And to producer send out ACK

Consumer groups :

kafka Consumers are part of the consumer group , When multiple consumers form a consumption group to consume the theme , Every consumer receives messages from different partitions . If consumers are all in the same consumer group , It's work - The queue model . If consumers are in different groups , It's Publishing - Subscription model .

When a single consumer can't keep up with the speed of data generation , You can add more consumers to share the load , Each consumer processes only part of the partition's messages , So as to achieve the horizontal scaling of a single application . But don't let the number of consumers be less than the number of partitions , Because there will be extra consumers free at this time .

When there are multiple applications that need to be from kafka Get the message , Let each application correspond to a consumer group , So that each application can get one or more topic All the news of . Each consumer corresponds to a thread , If you want to run multiple consumers in the same consumer group , Each consumer needs to run in its own thread .

4. Code practice

1. Add dependency :

<!-- add to kafka Dependence -->

Producer code :

package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
* @className: Producer
* @description: kafka The producers of
* @author: charon
* @create: 2021-01-18 08:52
public class Producer {
private static final String topic = "charon";
public static void main(String[] args) {
// To configure kafka Properties of
Properties properties = new Properties();
// Set the address
// Set the response type , The default value is 0.(0: Producers will not wait kafka Response ;1:kafka Of leader This message will be written to the local log file , But don't wait for a successful response from other machines in the cluster ;
// -1(all):leader Will wait for all follower Synchronization complete , Make sure messages are not lost , Unless kafka All the machines in the cluster hang up , Guaranteed availability )
// Set number of retries , Greater than 0, The client will resend the message if it fails to send
// Set batch size , When multiple messages need to be sent to the same partition , Producers try to merge network requests , Submission efficiency
// Producer sets serialization mode , The default is :org.apache.kafka.common.serialization.StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create producer
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
String message = "hello,charon message "+ i ;
producer.send(new ProducerRecord(topic,message));
System.out.println(" Producer sends message :" + message);

Consumer code :

package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
* @className: Consumer
* @description: kafka The consumer
* @author: charon
* @create: 2021-01-18 08:53
public class Consumer implements Runnable{
private static final String topic = "charon";
/**kafka consumer */
private static KafkaConsumer kafkaConsumer;
/** News consumption */
private static ConsumerRecords<String,String> msgList;
public static void main(String[] args) {
// To configure kafka Properties of
Properties properties = new Properties();
// Set the address
// Consumer sets deserialization mode
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
// Set up a consumption group
// Set to allow automatic submission
// Set the time interval for automatic submission
// Set the timeout market for the connection
// Create consumer
kafkaConsumer = new KafkaConsumer(properties);
// The specified partition
Consumer consumer = new Consumer();
new Thread(consumer).start();
// kafkaConsumer.close();
public void run() {
for (;;){
// Time out to get data 1000ms
msgList = kafkaConsumer.poll(1000);
if(null != msgList && msgList.count() > 0){
for (ConsumerRecord<String,String> consumerRecord: msgList ) {
System.out.println(" Consumers get the message , Start spending :" + consumerRecord);
System.out.println("topic= "+consumerRecord.topic()+" ,partition= "+consumerRecord.partition()+" ,offset= "+consumerRecord.offset()+" ,value="+consumerRecord.value()+"\n");
// If you don't get the data , It's blocked for a while
try {
} catch (InterruptedException e) {

5. Commit and offset

kafka Don't like activemq That needs to be confirmed by consumers , So consumers need to track kafka Where is the message consumed in the partition , This position is called offset . The operation of updating the current location of a partition is called commit . If a consumer collapses or a new consumer joins the group , Will trigger rebalancing , After rebalancing , Each consumer may be assigned to a new partition , Not the one I dealt with before , In order to be able to continue the previous work , The consumer needs to read the offset of the last commit for each partition , Then continue processing from the place specified by the offset .

In this case, there may be the following two situations :

1. The submitted offset is less than the offset processed by the client

If the submitted offset is less than the offset of the last message processed by the client , The message between the two offsets is reprocessed .

2. The submitted offset is greater than the offset processed by the client

If the submitted offset is greater than the offset of the last message processed by the client , Then messages between the two offsets are lost .

kafka How to submit :

  • Auto submit mode : The consumer automatically submits the offset after pulling the data , I don't care if the subsequent processing of the message is correct . Advantage is : Fast consumption , It is suitable for business scenarios with weak data consistency , The disadvantages are : Messages are easy to be lost or consumed repeatedly .

    take enable.auto.commit Is set to true

  • Manual submission mode : Consumers do business processing after pulling data , And it takes business processing to be successful . shortcoming : stay broker Before responding to a submitted request , Applications will always block , Limits the throughput of the application .

    take enable.auto.commit Is set to false;

    Call... Manually after message processing is complete consumer.commitSync();

  • Asynchronous submission : Just send the submit request , No need to wait broker Response

    Call... Manually after message processing is complete consumer.commitAsync(); This method also supports callbacks , stay broker Callback when responding , Callbacks are often used to record submission failures, error messages and offsets , If you re submit , You need to pay attention to the order of submission .

6. Rebalance monitor

When assigning new partitions to consumers or removing old partitions , Through the consumer API Execute some application code , Calling subscribe(Pattern pattern, ConsumerRebalanceListener listener) when , You can pass in a rebalance listener .

Two methods that need to be implemented :

  • public void onPartitionRevoked(Collection partitions);

    Called before rebalancing starts and after the consumer stops reading the message , If you submit an offset here , The next consumer to take over the partition knows where to start reading , Note that the most recently processed offset is submitted , Instead of the last offset still being processed in the batch .

  • public void onPartitionAssigned(Collection partitions)

    Called after the partition is reallocated and before the consumer begins to send messages to the partition

7.kafka Message duplication and loss analysis

So let's see first kafka The type of response :

  • ack=0: Producers don't have to wait for broker And continue to send the next batch of messages ( The efficiency of data transmission is the highest , But the reliability is the lowest )
  • ack=1: Producers in the ISR Medium leader Data has been successfully received and written to the local log file , But don't wait for the rest of the cluster follower The successful response
  • ack=-1: Producers need to wait ISR All of them follower Synchronization complete , Make sure messages are not lost , Unless kafka All the machines in the cluster hang up , Guaranteed availability ( Highest reliability , But there is no guarantee that the data will not be lost )

In a stand-alone environment , There is no difference between the three .

kafka Message duplication and loss can occur in three stages :

1. The producer stage The reason for this is : The message sent by the producer did not receive the correct broker Response , Causes the producer to retry .

The producer sends a message ,broker After the compass because of the network and other reasons , The sender gets a response of sending failure or network interruption , then prodcuer Received a recoverable exception Retrying the message causes the message to retrying .

Retrying process :

  1. new KafkaProducer() Then create a background thread KafkaThread scanning RecordAccumulator Is there any news in the news ;
  2. call KafkaProducer.send() Send a message , It's actually just saving the message to RecordAccumulator in ;
  3. Background thread KafkaThread Scan to RecordAccumulator When there's news in the news , Send the message to kafka colony ;
  4. If the transmission is successful , So back to success ;
  5. If sending fails , Then judge whether it is allowed to try again . If you are not allowed to try again , Then return the result of failure ; If you are allowed to try again , Save the message to RecordAccumulator in , Waiting for the background thread KafkaThread Scan and send again ;

Solution :

1. start-up kafka Idempotency . To start the kafka Idempotency , You need to modify... In the configuration file :enable.idempotenmce=true, Simultaneous requirements ack=all And retries>1. If you want to improve the reliability of the data , It also needs to be min.insync.replicas This parameter matches , If ISR The number of copies of is less than min.insync.replicas Then there will be an exception , reason : The news was rejected , The number of synchronized copies is less than required

The principle of idempotency :

Every producer has one PID, The server returns through PID Association records the status of each producer , Each message from each producer carries an incremental sequence (sequence), The server will record the current maximum sequence corresponding to each producer (PID+seq), If the sequence on the new message band is not larger than the current maximum seq Just refuse the news , If the message is dropped, the largest one will be updated at the same time seq, At this time, the retransmitted message will be rejected by the server, so as to avoid message repetition .

2. Set up ack=0, That is, there is no need to confirm , Don't try again . But you may lose data , So it is suitable for throughput index, which is more important than data loss , for example : Log collection .

2. Producers and broker Stage Why :

  1. ack=0, Don't try again . After the producer sends the message , Whatever the outcome , If the transmission fails, the data will be lost .

  2. ack=1,leader Downtime (crash) 了 , The producer sends the message , Just wait leader Write success returns ,leader It's down. , This is a follower I haven't had time to synchronize , Then the news is lost .

  3. unclean.leader.election.enable Configure to true. Allow elections ISR Other than the copy as leader, Data loss , The default is fase( Not ISR The copy in cannot participate in the election ).

    Producer sends asynchronous message , Just wait leader Write success returns ,leader It's down. , At this time ISR There is no follower,leader from OSR The middle election , because OSR China was behind leader Data is lost .

Solution :

1. To configure :ack=-1,retries>1,unclean.leader.election.enable=false

The producer has sent the message , wait for follower The synchronization is over and back , If not, try again , At this point, the number of replicas may affect throughput , No more than 5 individual , Usually three is enough .

2. To configure :min.insync.replicas > 1

When producers will ack Set to all or -1 when ,min.insync Replica specifies the minimum number of replicas that must be acknowledged for successful write operations , If this minimum cannot be satisfied , Then the producer will throw an exception . When used together ,min.insync.replicas and ack Allow for greater persistence guarantees .

3. The failure of the offset Separate records

Producer sends message , Automatically try again , An unrecoverable exception is thrown , At this time, you can capture the exception record to the database or cache , Treat separately .

3. Consumption stage Why : The data is not submitted in time after consumption offset To broker. The message consumer fails to submit in time when it hangs up in the process of consumption offset To broker, Another consumer started to get what was recorded before offset Start spending , because offset The lag of may lead to a small amount of repeated consumption of newly started clients .

Solution :

1. Cancel auto submit , Submit manually every time the consumption is finished or the program exits , There's no way to make sure there's no repetition .

2. Do idempotency , Try to make the downstream idempotent or record every message consumed offset. It may be necessary for less books and strict scenes offset Or only ID And downstream state update in the same database to do transactions to ensure an accurate update, or record consumption in the downstream database table at the same time offset. Then when updating the data, use the consumption site as an optimistic lock to reject the data update of the old site .

Reference article :






  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云