Headline Interviewer: talk about Kafka's consumer submission method, how to achieve it

Kirin to fix bugs 2021-02-23 15:58:48
headline interviewer talk kafka consumer


 Headline interviewer : say something Kafka How consumers submit , How to achieve

 

1、Kafka How consumers submit

1)、 Automatic submission , It's a way for consumers to manage displacement , The application itself doesn't need to be explicitly manipulated . When we will enable.auto.commit Set to true, Then the consumer will be in poll Every five seconds after the method is called ( from auto.commit.interval.ms Appoint ) Submit a displacement . Like many other operations , Automatic submission is also made by poll Method to drive , Calling poll Method time , The consumer judges whether the submission time is reached , If so, submit the last poll Maximum displacement returned . It should be noted that , This approach can lead to repeated consumption of messages , If , Some consumer poll After the news , The app is processing messages , stay 3 Seconds later kafka It's been rebalanced , Then, this part of information is consumed repeatedly after rebalancing due to no update displacement .

2)、 Synchronous commit .

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Collections;
6 import java.util.List;
7 import java.util.Properties;
8 import java.util.regex.Pattern;
9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 
19 public class KafkaConsumerSimple {
20 
21 // Set the server address 
22 private static final String bootstrapServer = "192.168.110.142:9092";
23 
24 // Set the theme 
25 private static final String topic = "topic-demo";
26 
27 // Set the theme 
28 private static final String topic2 = "topic-demo2";
29 
30 // Set up consumer groups 
31 private static final String groupId = "group.demo";
32 
33 public static void main(String[] args) {
34 Properties properties = new Properties();
35 // Set deserialization key parameter information 
36 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 // Set deserialization value parameter information 
38 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
39 
40 // Set server list information , Required parameters , This parameter is the same as the producer ,, Develop links kafka Cluster needs broker Address list , You can set one or more 
41 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
42 
43 // Set up consumer group information , The consumer group to which the consumer belongs , The default is empty. , If the setting is empty , An exception will be thrown , This parameter should be set to a name with a certain business meaning 
44 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
45 
46 // To develop kafka The client corresponding to the consumer id, The default is empty. , If not set kafka The consumer will automatically generate a non empty string .
47 properties.put("client.id", "consumer.client.id.demo");
48 
49 // Set each time from the earliest offset Start spending 
50 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
51 
52 // Manually submit to open 
53 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
54 
55 // Set parameters to consumer parameters 
56 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
57 
58 // News subscription 
59 // consumer.subscribe(Collections.singletonList(topic));
60 // Can subscribe to multiple topics 
61 // consumer.subscribe(Arrays.asList(topic, topic2));
62 // You can subscribe using regular expressions 
63 // consumer.subscribe(Pattern.compile("topic-demo*"));
64 
65 // Specify the partition of the subscription 
66 TopicPartition topicPartition = new TopicPartition(topic, 0);
67 consumer.assign(Arrays.asList(topicPartition));
68 
69 // initialization offset Displacement is -1
70 long lastConsumeOffset = -1;
71 while (true) {
72 // Listen every second , Pull the message of the specified topic partition 
73 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
74 if (records.isEmpty()) {
75 break;
76 }
77 // Get message 
78 List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
79 // Get the message offset Displacement information , Finally, the shift of consumption 
80 lastConsumeOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
81 // System.out.println("the last offset is " + lastConsumeOffset);
82 // Synchronous submission of consumption displacement 
83 consumer.commitSync();
84 }
85 // The current consumer's last consumption position 
86 System.out.println("consumed offset is " + lastConsumeOffset);
87 // Submit , Where does the next consumption start 
88 OffsetAndMetadata committed = consumer.committed(topicPartition);
89 System.out.println("committed offset is " + committed.offset());
90 // Where does the next consumption start 
91 long position = consumer.position(topicPartition);
92 System.out.println("the offset of the next record is " + position);
93 
94 }
95 
96 }

3)、 Asynchronous submission method . Manual submission has one drawback , That is, the calling application will block when the submission is initiated . Of course, we can reduce the frequency of manual submission , But this increases the probability of message repetition ( Just like auto submit ). Another solution is , Using asynchronous commit . But asynchronous commit also has a drawback , That is, if the server returns a commit failure , Asynchronous commit will not retry . Compared with , Synchronous submission will be retried until success or finally throw an exception to the application . Asynchronous commit does not implement retry because , If there are multiple asynchronous commits at the same time , Retrying may result in displacement override . such as , We initiate an asynchronous commit commitA, In this case, the submitted displacement is 2000, Then an asynchronous commit is initiated commitB And the displacement is 3000,commitA Submit failed but commitB Submit failed , here commitA If you try again and succeed , Will actually submit the displacement from 3000 Roll back to 2000, Lead to repeated consumption of messages .

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Map;
6 import java.util.Properties;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
14 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 
18 public class KafkaConsumerAsyncSimple {
19 
20 private static AtomicBoolean running = new AtomicBoolean(true);
21 
22 // Set the server address 
23 private static final String bootstrapServer = "192.168.110.142:9092";
24 
25 // Set the theme 
26 private static final String topic = "topic-demo";
27 
28 // Set up consumer groups 
29 private static final String groupId = "group.demo";
30 
31 public static void main(String[] args) {
32 Properties properties = new Properties();
33 // Set deserialization key parameter information 
34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35 // Set deserialization value parameter information 
36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38 // Set server list information , Required parameters , This parameter is the same as the producer ,, Develop links kafka Cluster needs broker Address list , You can set one or more 
39 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41 // Set up consumer group information , The consumer group to which the consumer belongs , The default is empty. , If the setting is empty , An exception will be thrown , This parameter should be set to a name with a certain business meaning 
42 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
43 
44 // To develop kafka The client corresponding to the consumer id, The default is empty. , If not set kafka The consumer will automatically generate a non empty string .
45 properties.put("client.id", "consumer.client.id.demo");
46 
47 // Set each time from the earliest offset Start spending 
48 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
49 
50 // Set parameters to consumer parameters 
51 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
52 // Subscribe to topics 
53 consumer.subscribe(Arrays.asList(topic));
54 
55 try {
56 while (running.get()) {
57 // Listen every second , Pull the message of the specified topic partition 
58 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
59 if (records.isEmpty()) {
60 break;
61 }
62 for (ConsumerRecord<String, String> record : records) {
63 System.out.println(" I'm going to start spending : " + record.toString());
64 }
65 
66 // Asynchronous callback , It's suitable for very large amount of messages , But allowing messages to repeat 
67 consumer.commitAsync(new OffsetCommitCallback() {
68 
69 @Override
70 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
71 if (exception == null) {
72 System.out.println(" The asynchronous callback succeeded ,offset : " + offsets);
73 } else {
74 System.err.println("fail to commit offsets " + offsets + " , " + exception);
75 }
76 
77 }
78 });
79 
80 }
81 } finally {
82 // Close client 
83 consumer.close();
84 }
85 
86 }
87 
88 }
 Headline interviewer : say something Kafka How consumers submit , How to achieve

 

2、 Specify displacement consumption

seek Method provides this function , You can track previous consumption or backtrack consumption .

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Map;
6 import java.util.Properties;
7 import java.util.Set;
8 import java.util.concurrent.atomic.AtomicBoolean;
9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 
19 public class KafkaConsumerSeekSimple {
20 
21 private static AtomicBoolean running = new AtomicBoolean(true);
22 
23 // Set the server address 
24 private static final String bootstrapServer = "192.168.110.142:9092";
25 
26 // Set the theme 
27 private static final String topic = "topic-demo3";
28 
29 // Set up consumer groups 
30 private static final String groupId = "group.demo";
31 
32 public static void main(String[] args) {
33 Properties properties = new Properties();
34 // Set deserialization key parameter information 
35 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
36 // Set deserialization value parameter information 
37 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
38 
39 // Set server list information , Required parameters , This parameter is the same as the producer ,, Develop links kafka Cluster needs broker Address list , You can set one or more 
40 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
41 
42 // Set up consumer group information , The consumer group to which the consumer belongs , The default is empty. , If the setting is empty , An exception will be thrown , This parameter should be set to a name with a certain business meaning 
43 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
44 
45 // To develop kafka The client corresponding to the consumer id, The default is empty. , If not set kafka The consumer will automatically generate a non empty string .
46 properties.put("client.id", "consumer.client.id.demo");
47 
48 // Set each time from the earliest offset Start spending 
49 // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
50 
51 // Set parameters to consumer parameters 
52 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
53 // Subscribe to topics 
54 consumer.subscribe(Arrays.asList(topic));
55 
56 // Get the partition assigned by the consumer 
57 Set<TopicPartition> assignment = consumer.assignment();
58 System.err.println(" Print the partition obtained by consumers : " + assignment.toString());
59 
60 // timeout How many parameters are appropriate ? Too short will cause partition allocation to fail , Too long may cause some unnecessary waiting 
61 // Get the message of the specified subject 
62 consumer.poll(Duration.ofMillis(2000));
63 
64 // for (TopicPartition topicPartition : assignment) {
65 // // Parameters partition Represents a partition ,offset Indicates that consumption starts from the location of the partition 
66 // // Mode one , You can specify the location for consumption 
67 // consumer.seek(topicPartition, 3);
68 // }
69 
70 // Specifies that consumption begins at the end of the partition , Mode two , You can start flashback from the end 
71 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
72 for (TopicPartition topicPartition : assignment) {
73 System.err.println(" Print consumers get offset : " + ( endOffsets.get(topicPartition) + 1 ));
74 consumer.seek(topicPartition, endOffsets.get(topicPartition) + 1);
75 }
76 
77 try {
78 while (running.get()) {
79 // Listen every second , Pull the message of the specified topic partition 
80 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
81 if (records.isEmpty()) {
82 break;
83 }
84 for (ConsumerRecord<String, String> record : records) {
85 System.out.println(" I'm going to start spending : " + record.toString());
86 }
87 
88 // Asynchronous callback , It's suitable for very large amount of messages , But allowing messages to repeat 
89 consumer.commitAsync(new OffsetCommitCallback() {
90 
91 @Override
92 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
93 if (exception == null) {
94 System.out.println(" The asynchronous callback succeeded ,offset : " + offsets);
95 } else {
96 System.err.println("fail to commit offsets " + offsets + " , " + exception);
97 }
98 
99 }
100 });
101 
102 }
103 } finally {
104 // Close client 
105 consumer.close();
106 }
107 
108 }
109 

3、Kafka Rebalance monitor

Rebalancing refers to the transfer of ownership of a partition from one consumer to another , It guarantees the high availability and scalability of the consumer group , It makes it convenient and safe for us to delete or add consumers to the consumption group . But during the rebalancing period , Consumers can't pull the news .

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Collection;
5 import java.util.Collections;
6 import java.util.HashMap;
7 import java.util.Map;
8 import java.util.Properties;
9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
16 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.serialization.StringDeserializer;
19 
20 public class KafkaConsumerListenerSimple {
21 
22 // Set the server address 
23 private static final String bootstrapServer = "192.168.110.142:9092";
24 
25 // Set the theme 
26 private static final String topic = "topic-demo";
27 
28 // Set up consumer groups 
29 private static final String groupId = "group.demo";
30 
31 public static void main(String[] args) {
32 Properties properties = new Properties();
33 // Set deserialization key parameter information 
34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35 // Set deserialization value parameter information 
36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38 // Set server list information , Required parameters , This parameter is the same as the producer ,, Develop links kafka Cluster needs broker Address list , You can set one or more 
39 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41 // Set up consumer group information , The consumer group to which the consumer belongs , The default is empty. , If the setting is empty , An exception will be thrown , This parameter should be set to a name with a certain business meaning 
42 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
43 
44 // To develop kafka The client corresponding to the consumer id, The default is empty. , If not set kafka The consumer will automatically generate a non empty string .
45 properties.put("client.id", "consumer.client.id.demo");
46 
47 // Set each time from the earliest offset Start spending 
48 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
49 
50 // Manually submit to open 
51 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
52 
53 // Set parameters to consumer parameters 
54 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
55 
56 // News subscription 
57 // consumer.subscribe(Collections.singletonList(topic));
58 
59 // If the message is repeatedly consumed or lost , When the consumer of a partition changes ,kafka There will be rebalancing 
60 // kafka Provides a rebalance listener , You can handle your own behavior , During the period of rebalancing , Consumers can't pull messages .
61 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
62 consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
63 
64 //
65 @Override
66 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
67 // Try to avoid repeat consumption 
68 consumer.commitSync(currentOffsets);// Synchronous displacement submission 
69 }
70 
71 //
72 @Override
73 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
74 
75 }
76 
77 });
78 
79 while (true) {
80 // Listen every second , Pull the message of the specified topic partition 
81 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
82 if (records.isEmpty()) {
83 break;
84 }
85 for (ConsumerRecord<String, String> record : records) {
86 System.out.println(record.toString());
87 
88 // Asynchronously submit messages , Before the rebalance action occurs, the onPartitionsRevoked Callback execution commitSync Methods synchronously submit the displacement 
89 currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
90 new OffsetAndMetadata(record.offset() + 1));
91 }
92 // It is very likely that consumers will lose messages when they submit their consumption asynchronously , So after pulling the message, you can send the message to offset The displacement is recorded 
93 consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
94 
95 @Override
96 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
97 if (exception == null) {
98 System.out.println(" The asynchronous callback succeeded ,offset : " + offsets);
99 } else {
100 System.err.println("fail to commit offsets " + offsets + " , " + exception);
101 }
102 }
103 });
104 }
105 
106 // Close client 
107 consumer.close();
108 
109 }
110 
111 }
 Headline interviewer : say something Kafka How consumers submit , How to achieve

 

4、Kafka Consumer blocker

Consumer interceptors are mainly in The message to Message or some customized operation when submitting message displacement . Use scenarios , To consumer news Set an expiration date property , If a message cannot arrive within a given time window , Then it's invalid , It doesn't need to be dealt with again .

 1 package com.demo.kafka.interceptor;
2 
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 
8 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
9 import org.apache.kafka.clients.consumer.ConsumerRecord;
10 import org.apache.kafka.clients.consumer.ConsumerRecords;
11 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
12 import org.apache.kafka.common.TopicPartition;
13 
14 /**
15 * 
16 * @author Consumer blocker 
17 *
18 */
19 public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
20 
21 // Ten seconds 
22 private static final long EXPIRE_INTERVAL = 10 * 1000; // 10000
23 
24 @Override
25 public void configure(Map<String, ?> configs) {
26 
27 }
28 
29 @Override
30 public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
31 // Print out messages 
32 for (ConsumerRecord<String, String> record : records) {
33 System.out.println("==============================" + record.toString() + "==============================");
34 }
35 
36 // Get the current time 
37 long now = System.currentTimeMillis();
38 // Create a map A collection of objects 
39 Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>();
40 // Loop through the consumer's message partition 
41 for (TopicPartition tp : records.partitions()) {
42 System.out.println(
43 "============== Get the partition ================" + tp.partition() + "==============================");
44 // Get the message in the partition 
45 List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
46 // Create a collection object newTpRecords
47 List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
48 // Loop through the message 
49 for (ConsumerRecord<String, String> record : tpRecords) {
50 // If the message's timestamp is greater than the current time, more than 10 second , Put it in the collection 
51 if (now - record.timestamp() > EXPIRE_INTERVAL) {
52 // Put it in the set 
53 newTpRecords.add(record);
54 }
55 }
56 // Determine whether it is null 
57 if (!newTpRecords.isEmpty()) {
58 // Put partitions and new messages in map Collection 
59 newRecords.put(tp, newTpRecords);
60 }
61 }
62 
63 for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> map : newRecords.entrySet()) {
64 for (int i = 0; i < map.getValue().size(); i++) {
65 List<ConsumerRecord<String, String>> value = map.getValue();
66 ConsumerRecord<String, String> consumerRecord = value.get(i);
67 System.out.println("==============================" + consumerRecord.toString()
68 + "==============================");
69 }
70 }
71 
72 return new ConsumerRecords<String, String>(newRecords);
73 }
74 
75 @Override
76 public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
77 offsets.forEach((tp, offset) -> System.out.println(" Acquired offset Displacement : " + tp + " : " + offset.offset()));
78 }
79 
80 @Override
81 public void close() {
82 
83 }
84 
85 public static void main(String[] args) {
86 Map<String, String> map = new HashMap<>();
87 map.put("zhangsan", "hello world zhangsan!!!");
88 map.put("lisi", "hello world lisi!!!");
89 map.put("wangwu", "hello world wangwu!!!");
90 map.put("zhaoliu", "hello world zhaoliu!!!");
91 
92 map.forEach((key, value) -> System.out.println("key : " + key + " , value : " + value));
93 }
94 
95 }

Reader benefits : Official account : Qilin reform bug You can get a sorted Java A core study note

Consumer configuration monitoring , As shown below :

 1 package com.demo.kafka.consumer;
2
3 import java.time.Duration;
4 import java.util.Collections;
5 import java.util.Properties;
6
7 import org.apache.kafka.clients.consumer.ConsumerConfig;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.serialization.StringDeserializer;
12
13 import com.demo.kafka.interceptor.ConsumerInterceptorTTL;
14
15 public class KafkaConsumerInterceptorSimple {
16
17 //  Set the server address 
18 private static final String bootstrapServer = "192.168.110.142:9092";
19
20 //  Set the theme 
21 private static final String topic = "topic-demo3";
22
23 //  Set up consumer groups 
24 private static final String groupId = "group.demo";
25
26 public static void main(String[] args) {
27 Properties properties = new Properties();
28 //  Set deserialization key parameter information 
29 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
30 //  Set deserialization value parameter information 
31 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
32
33 //  Set server list information , Required parameters , This parameter is the same as the producer ,, Develop links kafka Cluster needs broker Address list , You can set one or more 
34 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
35
36 //  Set up consumer group information , The consumer group to which the consumer belongs , The default is empty. , If the setting is empty , An exception will be thrown , This parameter should be set to a name with a certain business meaning 
37 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
38
39 //  To develop kafka The client corresponding to the consumer id, The default is empty. , If not set kafka The consumer will automatically generate a non empty string .
40 properties.put("client.id", "consumer.client.id.demo");
41
42 //  Set each time from the earliest offset Start spending 
43 // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
44
45 //  Manually submit to open 
46 // properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
47
48 //  Designated Consumer interceptors 
49 properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
50
51 //  Set parameters to consumer parameters 
52 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
53
54 //  News subscription 
55 consumer.subscribe(Collections.singletonList(topic));
56
57 while (true) {
58 //  Listen every second , Pull the message of the specified topic partition 
59 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
60 if (records.isEmpty()) {
61 break;
62 }
63 for (ConsumerRecord<String, String> record : records) {
64 System.out.println(record.toString());
65 }
66 }
67
68 }
69
70 }
版权声明
本文为[Kirin to fix bugs]所创,转载请带上原文链接,感谢
https://javamana.com/2021/02/20210223154804696x.html

  1. Redis 日志篇:系统高可用的杀手锏
  2. Java中把一个对象的值复制给另外一个对象引发的思考
  3. Java serialization / call wildfly service interface exception: ejbclient000409
  4. Docker compose deploy stack
  5. Mac下查看已安装的jdk版本及其安装目录
  6. Redis log: the killer of system high availability
  7. mybatis映射xml配置文件报错:<statement> or DELIMITER expected, got ‘id‘
  8. Thinking about copying the value of one object to another in Java
  9. IntelliJ IDEA 还能画思维导图,果然最强 IDE!
  10. vue使用sdk进行七牛云上传
  11. IntelliJ IDEA 还能画思维导图,果然最强 IDE!
  12. Spring原来还可以这么玩!阿里新产Spring全线宝典成功颠覆了我对Spring的认知!
  13. View the installed JDK version and its installation directory under mac
  14. Error in mybatis mapping XML configuration file: < statement > or delay expected, got 'ID‘
  15. IntelliJ IDEA 还能画思维导图,果然最强 IDE!
  16. Javascript性能优化【内联缓存】 V8引擎特性
  17. IntelliJ idea can also draw mind maps. It's really the strongest ide!
  18. Vue uses SDK to upload Qi Niu cloud
  19. IntelliJ idea can also draw mind maps. It's really the strongest ide!
  20. 深入理解 Web 协议 (三):HTTP 2
  21. Spring can still play like this! Ali's new spring product has successfully overturned my understanding of spring!
  22. IntelliJ idea can also draw mind maps. It's really the strongest ide!
  23. JavaScript performance optimization [inline cache] V8 engine features
  24. linux 配置java环境
  25. linux find 查找文件
  26. 深入理解 Web 协议 (三):HTTP 2
  27. IntelliJ IDEA 相关问题记录
  28. Deep understanding of Web protocol (3): http 2
  29. 深入理解 Web 协议 (三):HTTP 2
  30. 腾讯IEG开源AI SDK:自动化测试吃鸡、MOBA类游戏
  31. Mysql Command
  32. Configuring Java environment with Linux
  33. Find files in Linux
  34. docker-Dockerfile 创建镜像
  35. Redis Cluster
  36. 深入理解 Web 协议 (三):HTTP 2
  37. JavaScriptBOM操作
  38. JavaScriptBOM操作
  39. Deep understanding of Web protocol (3): http 2
  40. Record of IntelliJ idea related problems
  41. Deep understanding of Web protocol (3): http 2
  42. Tencent IEG open source AI SDK: automatic testing of chicken eating and MoBa games
  43. Mysql Command
  44. Docker dockerfile create image
  45. Redis Cluster
  46. 死磕Spring之IoC篇 - 文章导读
  47. Deep understanding of Web protocol (3): http 2
  48. JavaScript BOM operation
  49. JavaScript BOM operation
  50. 死磕Spring之IoC篇 - 文章导读
  51. k8s node 操作与维护
  52. k8s 证书更新
  53. 【Java面试题第三期】JVM中哪些地方会出现内存溢出?出现的原因是什么?
  54. HashMap连环问你能答出几道?
  55. k8s-cronjob
  56. k8s-cert
  57. Spring: an introduction to IOC
  58. Spring: an introduction to IOC
  59. Operation and maintenance of k8s node
  60. K8s certificate update