Message queue activeMQ

Message queue RabbitMQ

1.kafka Introduce

kafka By scala A multi partition of language development , Multiple copies and living in zookeeper Coordinated distributed Publishing - Subscribe to the messaging system . With high throughput 、 Be persistent 、 Scalable horizontally 、 Support stream processing and other features ; It can support massive data transmission ; And persist the message to disk , And create a backup of the message to ensure the security of the data .kafka While ensuring high processing speed , It can also ensure the low delay of data processing and zero loss of data .

kafka Characteristics of :

  1. High throughput , Low latency :kafka Can process hundreds of thousands of messages per second , The minimum delay is about milliseconds , Each topic can be divided into several sections , The consumption group performs consumption operation on the partition
  2. Extensibility : Support for hot expansion
  3. persistence , reliability : Messages are persisted to the local disk , And support data backup
  4. Fault tolerance : Allow nodes in the cluster to fail , If the number of copies is n, allows n-1 Nodes failed
  5. High concurrency : Allow thousands of clients to read and write at the same time
  6. Scalability :kafka It can easily expand or shrink during operation ; Can expand one kafka Topics to include more partitions

kafka Main application scenarios of :

  • Message processing
  • Website tracking
  • Index storage
  • Log aggregation
  • Streaming
  • The source of the event

The basic flow :

kafka The key role of :

  • Producer: Producers are the publishers of data , The role publishes messages to kafka Of topic in
  • Consumer: consumer , It can be downloaded from broker Read data from
  • Consumer Group: Every Consumer Belong to a particular Consumer Group( For each Consumer Appoint group name, If not specified group name It belongs to the default group)
  • Topic: A Category attribute of the class to which the data belongs
  • Partition:topic The data in is divided into one or more partition, Every topic It contains at least one partition
  • Partition offset: Every message has a current partition The only one under 64 Bytes of offset, It named the beginning of the message
  • Replicas of Partition: copy , It's a backup of a partition
  • Broker:kafka The cluster contains one or more servers , The node of the server is called broker
  • Leader: Every partition From multiple copies , There is and only one of them leader,leader Is currently responsible for reading and writing data partition
  • Follower:Follower Follow Leader, All write requests are made through leader route , Data changes are broadcast to all follower On ,follower And leader Keep your data in sync
  • AR: All copies in the partition are collectively referred to as AR
  • ISR: All and leader Parts keep a certain degree of replica composition ISR
  • OSR: And leader Replica synchronization lag too many replicas
  • HW: High water level , Identifies a specific offset, Consumers can only pull this offset Previous news
  • LEO: That is, the log end shift , Record the displacement value of the next message in the underlying log of the replica

2.kafka Installation

install kafka The premise is to install zookeeper as well as jdk Environmental Science . The version I installed here is jdk1.8.0_20,kafka_2.11-1.0.0,zookeeper-3.4.14.kafka And jdk The version of must correspond to . I used to use kafka_2.12_2.3.0, No way.

1. take kafka Upload your files to home Directory and unzip to /usr/local Under the table of contents

root@localhost home]# tar -xvzf kafka_2.11-1.0.0.tgz -C /usr/local

2. Get into kafka Of config

[root@localhost local]# cd /usr/local/kafka_2.11-1.0.0/config

3. edit server.properties file

# If it's in a cluster environment , Then each broker.id To set it to different 
broker.id=0
# Open the next line , This is equivalent to kafka Access to external services
listeners=PLAINTEXT://192.168.189.150:9092
# Log storage location :log.dirs=/tmp/kafka_logs Change it to
log.dirs=/usr/local/kafka_2.11-1.0.0/logs
# modify zookeeper The address of
zookeeper.connect=192.168.189.150:2181
# modify zookeeper Connection timeout for , The default is 6000( It may time out )
zookeeper.connection.timeout.ms=10000

3. start-up zookeeper

Because I'm configured zookeeper colony , So we need to put three zookeeper All started . Only start a single server zookeeper It will not be possible at the time of the election ( When more than half of the entire cluster goes down ,zookeeper The cluster will be considered unavailable )

[root@localhost ~]# zkServer.sh start
# Check the status
[root@localhost ~]# zkServer.sh status

4. start-up kafka

[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties
# You can also start in the background , If you don't use background boot , After starting, you need to open a new window to operate
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh -daemon config/server.properties

5. Create a theme

# --zookeeper: It specifies kafka The connected zookeeper Service address of 
# --partitions: Specifies the number of partitions
# --replication-factor: The replica factor is specified
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic charon --partitions 2 --replication-factor 1
Created topic "charon".

6. Show all the themes ( Verify that there is a problem with the theme you created )

[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
charon

7. See the details of a topic

[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic charon
Topic:charon PartitionCount:2 ReplicationFactor:1 Configs:
Topic: charon Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: charon Partition: 1 Leader: 0 Replicas: 0 Isr: 0

8. Open a new window to start consumers to receive messages .

--bootstrap-server: Specify the connection kafka Cluster address ,9092 yes kafka Port of service . Because the specific address is configured in my configuration file , So we need to write down the specific address . Otherwise it will be reported [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available. The fault of

[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.150:9092 --topic charon

9. Open a new window and start the producer to generate messages

--bootstrap-server: Specify the connection kafka Cluster address ,9092 yes kafka Port of service . Because the address is configured in my configuration file .

[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.189.150:9092 --topic charon

10. Generate and consume messages

# Producer production message 
>hello charon good evening
# The message that the consumer receives
hello charon good evening

Of course, in this way , It can only be realized in the same network segment .

3. producers and consumers

kafka production process :

1)producer First from zookeeper Of "/brokers/.../state" The node finds the partition Of leader

2)producer Send the message to this leader

3)leader Write the message locally log

4)followers from leader pull news , Write local log Back leader send out ACK

5)leader Received all ISR Medium replication Of ACK after , increase HW(high watermark, Last commit Of offset) And to producer send out ACK

Consumer groups :

kafka Consumers are part of the consumer group , When multiple consumers form a consumption group to consume the theme , Every consumer receives messages from different partitions . If consumers are all in the same consumer group , It's work - The queue model . If consumers are in different groups , It's Publishing - Subscription model .

When a single consumer can't keep up with the speed of data generation , You can add more consumers to share the load , Each consumer processes only part of the partition's messages , So as to achieve the horizontal scaling of a single application . But don't let the number of consumers be less than the number of partitions , Because there will be extra consumers free at this time .

When there are multiple applications that need to be from kafka Get the message , Let each application correspond to a consumer group , So that each application can get one or more topic All the news of . Each consumer corresponds to a thread , If you want to run multiple consumers in the same consumer group , Each consumer needs to run in its own thread .

4. Code practice

1. Add dependency :

<!-- add to kafka Dependence -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>

Producer code :

package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /**
* @className: Producer
* @description: kafka The producers of
* @author: charon
* @create: 2021-01-18 08:52
*/
public class Producer { /**topic*/
private static final String topic = "charon"; public static void main(String[] args) {
// To configure kafka Properties of
Properties properties = new Properties();
// Set the address
properties.put("bootstrap.servers","192.168.189.150:9092");
// Set the response type , The default value is 0.(0: Producers will not wait kafka Response ;1:kafka Of leader This message will be written to the local log file , But don't wait for a successful response from other machines in the cluster ;
// -1(all):leader Will wait for all follower Synchronization complete , Make sure messages are not lost , Unless kafka All the machines in the cluster hang up , Guaranteed availability )
properties.put("acks","all");
// Set number of retries , Greater than 0, The client will resend the message if it fails to send
properties.put("reties",0);
// Set batch size , When multiple messages need to be sent to the same partition , Producers try to merge network requests , Submission efficiency
properties.put("batch.size",10000);
// Producer sets serialization mode , The default is :org.apache.kafka.common.serialization.StringSerializer
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create producer
KafkaProducer producer = new KafkaProducer(properties);
for (int i = 0; i < 5; i++) {
String message = "hello,charon message "+ i ;
producer.send(new ProducerRecord(topic,message));
System.out.println(" Producer sends message :" + message);
}
producer.close();
}
}

Consumer code :

package kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer; import java.util.Arrays;
import java.util.List;
import java.util.Properties; /**
* @className: Consumer
* @description: kafka The consumer
* @author: charon
* @create: 2021-01-18 08:53
*/
public class Consumer implements Runnable{ /**topic*/
private static final String topic = "charon"; /**kafka consumer */
private static KafkaConsumer kafkaConsumer; /** News consumption */
private static ConsumerRecords<String,String> msgList; public static void main(String[] args) {
// To configure kafka Properties of
Properties properties = new Properties();
// Set the address
properties.put("bootstrap.servers","192.168.189.150:9092");
// Consumer sets deserialization mode
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
// Set up a consumption group
properties.put("group.id","test01");
// Set to allow automatic submission
properties.put("enable.auto.commit","true");
// Set the time interval for automatic submission
properties.put("auto.commit.interval.ms","1000");
// Set the timeout market for the connection
properties.put("session.timeout.ms","30000");
// Create consumer
kafkaConsumer = new KafkaConsumer(properties);
// The specified partition
kafkaConsumer.subscribe(Arrays.asList(topic));
Consumer consumer = new Consumer();
new Thread(consumer).start();
// kafkaConsumer.close();
} @Override
public void run() {
for (;;){
// Time out to get data 1000ms
msgList = kafkaConsumer.poll(1000);
if(null != msgList && msgList.count() > 0){
for (ConsumerRecord<String,String> consumerRecord: msgList ) {
System.out.println(" Consumers get the message , Start spending :" + consumerRecord);
System.out.println("topic= "+consumerRecord.topic()+" ,partition= "+consumerRecord.partition()+" ,offset= "+consumerRecord.offset()+" ,value="+consumerRecord.value()+"\n");
}
}else{
// If you don't get the data , It's blocked for a while
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

5. Commit and offset

kafka Don't like activemq That needs to be confirmed by consumers , So consumers need to track kafka Where is the message consumed in the partition , This position is called offset . The operation of updating the current location of a partition is called commit . If a consumer collapses or a new consumer joins the group , Will trigger rebalancing , After rebalancing , Each consumer may be assigned to a new partition , Not the one I dealt with before , In order to be able to continue the previous work , The consumer needs to read the offset of the last commit for each partition , Then continue processing from the place specified by the offset .

In this case, there may be the following two situations :

1. The submitted offset is less than the offset processed by the client

If the submitted offset is less than the offset of the last message processed by the client , The message between the two offsets is reprocessed .

2. The submitted offset is greater than the offset processed by the client

If the submitted offset is greater than the offset of the last message processed by the client , Then messages between the two offsets are lost .

kafka How to submit :

  • Auto submit mode : The consumer automatically submits the offset after pulling the data , I don't care if the subsequent processing of the message is correct . Advantage is : Fast consumption , It is suitable for business scenarios with weak data consistency , The disadvantages are : Messages are easy to be lost or consumed repeatedly .

    take enable.auto.commit Is set to true

  • Manual submission mode : Consumers do business processing after pulling data , And it takes business processing to be successful . shortcoming : stay broker Before responding to a submitted request , Applications will always block , Limits the throughput of the application .

    take enable.auto.commit Is set to false;

    Call... Manually after message processing is complete consumer.commitSync();

  • Asynchronous submission : Just send the submit request , No need to wait broker Response

    Call... Manually after message processing is complete consumer.commitAsync(); This method also supports callbacks , stay broker Callback when responding , Callbacks are often used to record submission failures, error messages and offsets , If you re submit , You need to pay attention to the order of submission .

6. Rebalance monitor

When assigning new partitions to consumers or removing old partitions , Through the consumer API Execute some application code , Calling subscribe(Pattern pattern, ConsumerRebalanceListener listener) when , You can pass in a rebalance listener .

Two methods that need to be implemented :

  • public void onPartitionRevoked(Collection partitions);

    Called before rebalancing starts and after the consumer stops reading the message , If you submit an offset here , The next consumer to take over the partition knows where to start reading , Note that the most recently processed offset is submitted , Instead of the last offset still being processed in the batch .

  • public void onPartitionAssigned(Collection partitions)

    Called after the partition is reallocated and before the consumer begins to send messages to the partition

7.kafka Message duplication and loss analysis

So let's see first kafka The type of response :

  • ack=0: Producers don't have to wait for broker And continue to send the next batch of messages ( The efficiency of data transmission is the highest , But the reliability is the lowest )
  • ack=1: Producers in the ISR Medium leader Data has been successfully received and written to the local log file , But don't wait for the rest of the cluster follower The successful response
  • ack=-1: Producers need to wait ISR All of them follower Synchronization complete , Make sure messages are not lost , Unless kafka All the machines in the cluster hang up , Guaranteed availability ( Highest reliability , But there is no guarantee that the data will not be lost )

In a stand-alone environment , There is no difference between the three .

kafka Message duplication and loss can occur in three stages :

1. The producer stage The reason for this is : The message sent by the producer did not receive the correct broker Response , Causes the producer to retry .

The producer sends a message ,broker After the compass because of the network and other reasons , The sender gets a response of sending failure or network interruption , then prodcuer Received a recoverable exception Retrying the message causes the message to retrying .

Retrying process :

  1. new KafkaProducer() Then create a background thread KafkaThread scanning RecordAccumulator Is there any news in the news ;
  2. call KafkaProducer.send() Send a message , It's actually just saving the message to RecordAccumulator in ;
  3. Background thread KafkaThread Scan to RecordAccumulator When there's news in the news , Send the message to kafka colony ;
  4. If the transmission is successful , So back to success ;
  5. If sending fails , Then judge whether it is allowed to try again . If you are not allowed to try again , Then return the result of failure ; If you are allowed to try again , Save the message to RecordAccumulator in , Waiting for the background thread KafkaThread Scan and send again ;

Solution :

1. start-up kafka Idempotency . To start the kafka Idempotency , You need to modify... In the configuration file :enable.idempotenmce=true, Simultaneous requirements ack=all And retries>1. If you want to improve the reliability of the data , It also needs to be min.insync.replicas This parameter matches , If ISR The number of copies of is less than min.insync.replicas Then there will be an exception , reason : The news was rejected , The number of synchronized copies is less than required

The principle of idempotency :

Every producer has one PID, The server returns through PID Association records the status of each producer , Each message from each producer carries an incremental sequence (sequence), The server will record the current maximum sequence corresponding to each producer (PID+seq), If the sequence on the new message band is not larger than the current maximum seq Just refuse the news , If the message is dropped, the largest one will be updated at the same time seq, At this time, the retransmitted message will be rejected by the server, so as to avoid message repetition .

2. Set up ack=0, That is, there is no need to confirm , Don't try again . But you may lose data , So it is suitable for throughput index, which is more important than data loss , for example : Log collection .

2. Producers and broker Stage Why :

  1. ack=0, Don't try again . After the producer sends the message , Whatever the outcome , If the transmission fails, the data will be lost .

  2. ack=1,leader Downtime (crash) 了 , The producer sends the message , Just wait leader Write success returns ,leader It's down. , This is a follower I haven't had time to synchronize , Then the news is lost .

  3. unclean.leader.election.enable Configure to true. Allow elections ISR Other than the copy as leader, Data loss , The default is fase( Not ISR The copy in cannot participate in the election ).

    Producer sends asynchronous message , Just wait leader Write success returns ,leader It's down. , At this time ISR There is no follower,leader from OSR The middle election , because OSR China was behind leader Data is lost .

Solution :

1. To configure :ack=-1,retries>1,unclean.leader.election.enable=false

The producer has sent the message , wait for follower The synchronization is over and back , If not, try again , At this point, the number of replicas may affect throughput , No more than 5 individual , Usually three is enough .

2. To configure :min.insync.replicas > 1

When producers will ack Set to all or -1 when ,min.insync Replica specifies the minimum number of replicas that must be acknowledged for successful write operations , If this minimum cannot be satisfied , Then the producer will throw an exception . When used together ,min.insync.replicas and ack Allow for greater persistence guarantees .

3. The failure of the offset Separate records

Producer sends message , Automatically try again , An unrecoverable exception is thrown , At this time, you can capture the exception record to the database or cache , Treat separately .

3. Consumption stage Why : The data is not submitted in time after consumption offset To broker. The message consumer fails to submit in time when it hangs up in the process of consumption offset To broker, Another consumer started to get what was recorded before offset Start spending , because offset The lag of may lead to a small amount of repeated consumption of newly started clients .

Solution :

1. Cancel auto submit , Submit manually every time the consumption is finished or the program exits , There's no way to make sure there's no repetition .

2. Do idempotency , Try to make the downstream idempotent or record every message consumed offset. It may be necessary for less books and strict scenes offset Or only ID And downstream state update in the same database to do transactions to ensure an accurate update, or record consumption in the downstream database table at the same time offset. Then when updating the data, use the consumption site as an optimistic lock to reject the data update of the old site .

Reference article :

https://www.cnblogs.com/qingyunzong/p/9004509.html

https://www.cnblogs.com/frankdeng/p/9310704.html

https://www.jianshu.com/p/6845469d99e6

https://www.cnblogs.com/wangzhuxing/p/10124308.html

Message queue kafka More articles about

  1. Message queuing and Kafka

    2019-04-09 key word :  Message queue . Why use message queuing . The benefits of message queuing . The meaning of message queuing .Kafka What is it? This article is about my current knowledge   Message queue   And kafka  Some brief introduction of knowledge points , No ...

  2. redis Message queue of VS kafka

    redis push/pop VS pub/sub (1)push/pop There will only be one consumer per message , and pub/sub There can be multiple For task queues ,push/pop enough , But when it comes to distributed messaging ...

  3. Message queue Kafka

    turn https://www.jianshu.com/p/2c4caed49343 Message queue Kafka Preflow 2018.01.15 16:27* Number of words 3533 read 1114 Comment on 0 like 12 K ...

  4. 【 Knowledge point 】 It's also message queuing ,Kafka Why so fast ?

    It's also message queuing ,Kafka Why so fast ? author | MrZhangxd Kafka The messages are stored or cached on disk , Generally speaking, reading and writing data on disk will degrade performance , Because addressing takes time , But actually ,Kafk ...

  5. Spring Cloud(7): Event driven (Stream) Distributed cache (Redis) And message queuing (Kafka)

    Distributed cache (Redis) And message queuing (Kafka) Imagine a situation , service A Frequent calls to services B The data of , But service B The data is not updated frequently . actually , This is not uncommon , In most cases , The user's operation is more query . If we slow down ...

  6. Used message queuing ?Kafka? Can I write a message queue ? meng

    Do you have the same experience ? The interviewer asks you what projects you have done , I'll have a good talk , The project takes advantage of message queuing ,kafka,rocketMQ wait . well , Please start your show , The interviewer handed over a pen : Hand me a message queue !!WHAT? For you to meet ...

  7. 【 Message queue 】kafka How to ensure that messages are not consumed repeatedly

    One .kafka The consumption mechanism that comes with it kafka There is one offset The concept of , When every message is written in , There is one. offset, It's his serial number , then consumer After consuming the data , After a while , I'll take the news I've consumed offs ...

  8. 【 Message queue 】kafka How to ensure high availability

    One .kafka A basic understanding of architecture By multiple broker form , Every broker It's just a node : Create a topic, This topic It can be divided into many partition, Every partition Can exist in different br ...

  9. Message queue Kafka—— Re understand from architecture technology Kafka

    Apache Kafka yes   A distributed stream processing platform . What does that mean ? We know that the stream processing platform has the following three features : It allows you to publish and subscribe to streaming records . This aspect is similar to message queuing or enterprise message system . Can store streaming ...

  10. 01 . Message queue (Kafka+ZooKeeper)

    Introduction to message queuing What is message queuing ? First , Let's see what message queuing is , The explanation in Wikipedia translates as follows : Queues provide an asynchronous communication protocol , This means that the sender and receiver of the message do not need to keep in touch with the message at the same time , Messages sent by the sender are stored ...

Random recommendation

  1. C In language union

    1.union Multiple members can be defined in ,union The size of is determined by the size of the largest member . 2.union Members share the same block size of memory , Only one of these members can be used at a time , And struct In sharp contrast . 3. Assign a value to a member , Will reply ...

  2. Full text search engine Solr series —– Basic principles of full text retrieval

    scene : We used Xinhua dictionary when we were children , Mom told you to open the second 38 page , find “ Cheating father ” Where it is , How would you look it up ? without doubt , Your eyes will come from 38 The first word of the page is scanned from beginning to end , Until I find “ Cheating father ” Two words . This search method is called Shun ...

  3. Sqrt Function efficient implementation

    From a Sqrt The murder caused by function We usually have some data operations , Need to call sqrt,exp,abs Such as function , Have you ever thought about that : How do these function systems work ? Take the most commonly used sqrt Function , How does the system work ...

  4. problem - About SizeOf stay Delphi7 and Delphi2009 The results are as follows 16/32

    problem : The same code is in Delphi7 and Delphi2009 The results are as follows 16/32, Why? ?var   LWindCode : array [0..15] of char; begin   showmess ...

  5. turn : Single chip microcomputer C In language data,idata,xdata,pdata,code

    In terms of data storage types ,8051 The series has on-film . Off chip program memory , Intraslice . Off chip data memory , On chip program memory is also divided into direct addressing area and indirect addressing type , They correspond to each other code.data.xdata.idata And according to 51 Series of characteristics ...

  6. SALT encryption

    Everybody knows ,MD5 Encryption is irreversible . But in fact , We usually value MD5 Algorithm . The hacking rate is relatively high . There are also many sites simply provide batch decryption MD5 Service for , It's a charge, of course .http://www.xmd5.org. Here is ...

  7. Official website jquery Compressed version reference address :

    3.1.1 edition <!DOCTYPE html> <html lang="en"> <head> <meta charset="U ...

  8. Why graduated a year, the salary is still only 7K

    “ It will be a great task for us , We must work hard first , Work hard , Hungry body skin , Empty and empty , Do what you do , So be patient , Zeng Yiqi can't .”. Let's start with the internship , I worked as an intern in a company in Shanghai winform The company , This company will always ...

  9. vue One of the parameters passed by the parent-child component props

    vue The parent component in the props Passing data to subcomponents , props There are two ways of delivery 1.props:['msg']2.props: { msg:{ type:String, default:"&quo ...

  10. Javascript Full stack technology architecture

    https://worktile.com/tech/basic/the-worktile-tech-stack https://worktile.com/tech/basic/worktile-rea ...