头条面试官:说说Kafka的消费者提交方式,怎么实现的

麒麟改bug 2021-02-23 15:50:34
java 面试 kafka


头条面试官:说说Kafka的消费者提交方式,怎么实现的

 

1、Kafka的消费者提交方式

1)、自动提交,这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔五秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll方法来驱动的,在调用poll方法的时候,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。需要注意的是,这种方式可能会导致消息重复消费,假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。

2)、同步提交。

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Collections;
6 import java.util.List;
7 import java.util.Properties;
8 import java.util.regex.Pattern;
9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15 import org.apache.kafka.clients.producer.ProducerConfig;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 
19 public class KafkaConsumerSimple {
20 
21 // 设置服务器地址
22 private static final String bootstrapServer = "192.168.110.142:9092";
23 
24 // 设置主题
25 private static final String topic = "topic-demo";
26 
27 // 设置主题
28 private static final String topic2 = "topic-demo2";
29 
30 // 设置消费者组
31 private static final String groupId = "group.demo";
32 
33 public static void main(String[] args) {
34 Properties properties = new Properties();
35 // 设置反序列化key参数信息
36 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 // 设置反序列化value参数信息
38 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
39 
40 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
41 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
42 
43 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
44 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
45 
46 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
47 properties.put("client.id", "consumer.client.id.demo");
48 
49 // 设置每次从最早的offset开始消费
50 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
51 
52 // 手动提交开启
53 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
54 
55 // 将参数设置到消费者参数中
56 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
57 
58 // 消息订阅
59 // consumer.subscribe(Collections.singletonList(topic));
60 // 可以订阅多个主题
61 // consumer.subscribe(Arrays.asList(topic, topic2));
62 // 可以使用正则表达式进行订阅
63 // consumer.subscribe(Pattern.compile("topic-demo*"));
64 
65 // 指定订阅的分区
66 TopicPartition topicPartition = new TopicPartition(topic, 0);
67 consumer.assign(Arrays.asList(topicPartition));
68 
69 // 初始化offset位移为-1
70 long lastConsumeOffset = -1;
71 while (true) {
72 // 每隔一秒监听一次,拉去指定主题分区的消息
73 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
74 if (records.isEmpty()) {
75 break;
76 }
77 // 获取到消息
78 List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
79 // 获取到消息的offset位移信息,最后消费的位移
80 lastConsumeOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
81 // System.out.println("the last offset is " + lastConsumeOffset);
82 // 同步提交消费位移
83 consumer.commitSync();
84 }
85 // 当前消费者最后一个消费的位置
86 System.out.println("consumed offset is " + lastConsumeOffset);
87 // 提交,下次消费从哪个位置开始
88 OffsetAndMetadata committed = consumer.committed(topicPartition);
89 System.out.println("committed offset is " + committed.offset());
90 // 下次消费从哪个位置开始
91 long position = consumer.position(topicPartition);
92 System.out.println("the offset of the next record is " + position);
93 
94 }
95 
96 }

3)、异步提交方式。手动提交有一个缺点,就是当发起提交时调用应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决方法是,使用异步提交。但是异步提交也有一个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试知道成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。比如,我们发起一个异步提交commitA,此时提交位移是2000,随后又发起了一个异步提交commitB且位移为3000,commitA提交失败但commitB提交失败,此时commitA进行重试并成功的话,会将实际上已经提交的位移从3000回滚到2000,导致消息重复消费。

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Map;
6 import java.util.Properties;
7 import java.util.concurrent.atomic.AtomicBoolean;
8 
9 import org.apache.kafka.clients.consumer.ConsumerConfig;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
14 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.serialization.StringDeserializer;
17 
18 public class KafkaConsumerAsyncSimple {
19 
20 private static AtomicBoolean running = new AtomicBoolean(true);
21 
22 // 设置服务器地址
23 private static final String bootstrapServer = "192.168.110.142:9092";
24 
25 // 设置主题
26 private static final String topic = "topic-demo";
27 
28 // 设置消费者组
29 private static final String groupId = "group.demo";
30 
31 public static void main(String[] args) {
32 Properties properties = new Properties();
33 // 设置反序列化key参数信息
34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35 // 设置反序列化value参数信息
36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
39 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
42 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
43 
44 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
45 properties.put("client.id", "consumer.client.id.demo");
46 
47 // 设置每次从最早的offset开始消费
48 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
49 
50 // 将参数设置到消费者参数中
51 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
52 // 订阅主题
53 consumer.subscribe(Arrays.asList(topic));
54 
55 try {
56 while (running.get()) {
57 // 每隔一秒监听一次,拉去指定主题分区的消息
58 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
59 if (records.isEmpty()) {
60 break;
61 }
62 for (ConsumerRecord<String, String> record : records) {
63 System.out.println("我要开始消费了: " + record.toString());
64 }
65 
66 // 异步回调,适合消息量非常大,但是允许消息重复的
67 consumer.commitAsync(new OffsetCommitCallback() {
68 
69 @Override
70 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
71 if (exception == null) {
72 System.out.println("异步回调成功了,offset : " + offsets);
73 } else {
74 System.err.println("fail to commit offsets " + offsets + " , " + exception);
75 }
76 
77 }
78 });
79 
80 }
81 } finally {
82 // 关闭客户端
83 consumer.close();
84 }
85 
86 }
87 
88 }
头条面试官:说说Kafka的消费者提交方式,怎么实现的

 

2、指定位移消费

seek方法提供了这个功能,可以追踪之前的消费或者回溯消费。

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Arrays;
5 import java.util.Map;
6 import java.util.Properties;
7 import java.util.Set;
8 import java.util.concurrent.atomic.AtomicBoolean;
9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 
19 public class KafkaConsumerSeekSimple {
20 
21 private static AtomicBoolean running = new AtomicBoolean(true);
22 
23 // 设置服务器地址
24 private static final String bootstrapServer = "192.168.110.142:9092";
25 
26 // 设置主题
27 private static final String topic = "topic-demo3";
28 
29 // 设置消费者组
30 private static final String groupId = "group.demo";
31 
32 public static void main(String[] args) {
33 Properties properties = new Properties();
34 // 设置反序列化key参数信息
35 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
36 // 设置反序列化value参数信息
37 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
38 
39 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
40 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
41 
42 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
43 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
44 
45 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
46 properties.put("client.id", "consumer.client.id.demo");
47 
48 // 设置每次从最早的offset开始消费
49 // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
50 
51 // 将参数设置到消费者参数中
52 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
53 // 订阅主题
54 consumer.subscribe(Arrays.asList(topic));
55 
56 // 获取消费者所分配到的分区
57 Set<TopicPartition> assignment = consumer.assignment();
58 System.err.println("打印消费者获取到的分区: " + assignment.toString());
59 
60 // timeout参数设置多少合适?太短会使分区分配失败,太长有可能造成一些不必要的等待
61 // 获取到指定主题的消息
62 consumer.poll(Duration.ofMillis(2000));
63 
64 // for (TopicPartition topicPartition : assignment) {
65 // // 参数partition表示分区,offset表示指定从分区的那个位置开始消费
66 // // 方式一,可以指定位置进行消费
67 // consumer.seek(topicPartition, 3);
68 // }
69 
70 // 指定从分区末尾开始消费,方式二,可以从末端开始倒叙消费
71 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
72 for (TopicPartition topicPartition : assignment) {
73 System.err.println("打印消费者获取到offset : " + ( endOffsets.get(topicPartition) + 1 ));
74 consumer.seek(topicPartition, endOffsets.get(topicPartition) + 1);
75 }
76 
77 try {
78 while (running.get()) {
79 // 每隔一秒监听一次,拉去指定主题分区的消息
80 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
81 if (records.isEmpty()) {
82 break;
83 }
84 for (ConsumerRecord<String, String> record : records) {
85 System.out.println("我要开始消费了: " + record.toString());
86 }
87 
88 // 异步回调,适合消息量非常大,但是允许消息重复的
89 consumer.commitAsync(new OffsetCommitCallback() {
90 
91 @Override
92 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
93 if (exception == null) {
94 System.out.println("异步回调成功了,offset : " + offsets);
95 } else {
96 System.err.println("fail to commit offsets " + offsets + " , " + exception);
97 }
98 
99 }
100 });
101 
102 }
103 } finally {
104 // 关闭客户端
105 consumer.close();
106 }
107 
108 }
109 

3、Kafka再均衡监听器

再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全的删除消费组内的消费者或者往消费组内添加消费者。不过再均衡期间,消费者是无法拉取消息的。

 1 package com.demo.kafka.consumer;
2 
3 import java.time.Duration;
4 import java.util.Collection;
5 import java.util.Collections;
6 import java.util.HashMap;
7 import java.util.Map;
8 import java.util.Properties;
9 
10 import org.apache.kafka.clients.consumer.ConsumerConfig;
11 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.consumer.KafkaConsumer;
15 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
16 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.serialization.StringDeserializer;
19 
20 public class KafkaConsumerListenerSimple {
21 
22 // 设置服务器地址
23 private static final String bootstrapServer = "192.168.110.142:9092";
24 
25 // 设置主题
26 private static final String topic = "topic-demo";
27 
28 // 设置消费者组
29 private static final String groupId = "group.demo";
30 
31 public static void main(String[] args) {
32 Properties properties = new Properties();
33 // 设置反序列化key参数信息
34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
35 // 设置反序列化value参数信息
36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
37 
38 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
39 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
40 
41 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
42 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
43 
44 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
45 properties.put("client.id", "consumer.client.id.demo");
46 
47 // 设置每次从最早的offset开始消费
48 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
49 
50 // 手动提交开启
51 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
52 
53 // 将参数设置到消费者参数中
54 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
55 
56 // 消息订阅
57 // consumer.subscribe(Collections.singletonList(topic));
58 
59 // 如果发生消息重复消费或者消息丢失的情况,当一个分区的消费者发生变更的时候,kafka会出现再均衡
60 // kafka提供了再均衡监听器,可以处理自己的行为,发生再均衡期间,消费者无法拉取消息的。
61 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
62 consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
63 
64 //
65 @Override
66 public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
67 // 尽量避免重复消费
68 consumer.commitSync(currentOffsets);// 同步位移的提交
69 }
70 
71 //
72 @Override
73 public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
74 
75 }
76 
77 });
78 
79 while (true) {
80 // 每隔一秒监听一次,拉去指定主题分区的消息
81 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
82 if (records.isEmpty()) {
83 break;
84 }
85 for (ConsumerRecord<String, String> record : records) {
86 System.out.println(record.toString());
87 
88 // 异步提交消息位移,在发生再均衡动作之前通过再均衡监听器的onPartitionsRevoked回调执行commitSync方法同步提交位移
89 currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
90 new OffsetAndMetadata(record.offset() + 1));
91 }
92 // 消费者的消费异步提交很有可能出现消息丢失的情况,所以在拉取完消息之后可以将消息的offset位移进行记录
93 consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
94 
95 @Override
96 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
97 if (exception == null) {
98 System.out.println("异步回调成功了,offset : " + offsets);
99 } else {
100 System.err.println("fail to commit offsets " + offsets + " , " + exception);
101 }
102 }
103 });
104 }
105 
106 // 关闭客户端
107 consumer.close();
108 
109 }
110 
111 }
头条面试官:说说Kafka的消费者提交方式,怎么实现的

 

4、Kafka消费者拦截器

消费者拦截器主要是在消息到消息或者在提交消息位移的时候进行一些定制化的操作。使用场景,对消费消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那就视为无效,不需要再被处理。

 1 package com.demo.kafka.interceptor;
2 
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 
8 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
9 import org.apache.kafka.clients.consumer.ConsumerRecord;
10 import org.apache.kafka.clients.consumer.ConsumerRecords;
11 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
12 import org.apache.kafka.common.TopicPartition;
13 
14 /**
15 * 
16 * @author 消费者拦截器
17 *
18 */
19 public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {
20 
21 // 十秒钟
22 private static final long EXPIRE_INTERVAL = 10 * 1000; // 10000
23 
24 @Override
25 public void configure(Map<String, ?> configs) {
26 
27 }
28 
29 @Override
30 public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
31 // 打印输出消息
32 for (ConsumerRecord<String, String> record : records) {
33 System.out.println("==============================" + record.toString() + "==============================");
34 }
35 
36 // 获取到当前时间
37 long now = System.currentTimeMillis();
38 // 创建一个map集合对象
39 Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>();
40 // 循环遍历出消费者的消息分区
41 for (TopicPartition tp : records.partitions()) {
42 System.out.println(
43 "==============获取到的分区================" + tp.partition() + "==============================");
44 // 获取到分区里面的消息
45 List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
46 // 创建一个集合对象newTpRecords
47 List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
48 // 循环遍历消息
49 for (ConsumerRecord<String, String> record : tpRecords) {
50 // 如果消息的时间戳大于当前时间超过10秒,就放到集合中
51 if (now - record.timestamp() > EXPIRE_INTERVAL) {
52 // 放到集合中
53 newTpRecords.add(record);
54 }
55 }
56 // 判断是否为空
57 if (!newTpRecords.isEmpty()) {
58 // 将分区和新的消息放到map集合中
59 newRecords.put(tp, newTpRecords);
60 }
61 }
62 
63 for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> map : newRecords.entrySet()) {
64 for (int i = 0; i < map.getValue().size(); i++) {
65 List<ConsumerRecord<String, String>> value = map.getValue();
66 ConsumerRecord<String, String> consumerRecord = value.get(i);
67 System.out.println("==============================" + consumerRecord.toString()
68 + "==============================");
69 }
70 }
71 
72 return new ConsumerRecords<String, String>(newRecords);
73 }
74 
75 @Override
76 public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
77 offsets.forEach((tp, offset) -> System.out.println("获取到的offset位移: " + tp + " : " + offset.offset()));
78 }
79 
80 @Override
81 public void close() {
82 
83 }
84 
85 public static void main(String[] args) {
86 Map<String, String> map = new HashMap<>();
87 map.put("zhangsan", "hello world zhangsan!!!");
88 map.put("lisi", "hello world lisi!!!");
89 map.put("wangwu", "hello world wangwu!!!");
90 map.put("zhaoliu", "hello world zhaoliu!!!");
91 
92 map.forEach((key, value) -> System.out.println("key : " + key + " , value : " + value));
93 }
94 
95 }

读者福利:关注公众号:麒麟改bug可获取一份整理好的Java核心学习笔记一份

消费者配置监听,如下所示:

 1 package com.demo.kafka.consumer;
2
3 import java.time.Duration;
4 import java.util.Collections;
5 import java.util.Properties;
6
7 import org.apache.kafka.clients.consumer.ConsumerConfig;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.serialization.StringDeserializer;
12
13 import com.demo.kafka.interceptor.ConsumerInterceptorTTL;
14
15 public class KafkaConsumerInterceptorSimple {
16
17 // 设置服务器地址
18 private static final String bootstrapServer = "192.168.110.142:9092";
19
20 // 设置主题
21 private static final String topic = "topic-demo3";
22
23 // 设置消费者组
24 private static final String groupId = "group.demo";
25
26 public static void main(String[] args) {
27 Properties properties = new Properties();
28 // 设置反序列化key参数信息
29 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
30 // 设置反序列化value参数信息
31 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
32
33 // 设置服务器列表信息,必填参数,该参数和生产者相同,,制定链接kafka集群所需的broker地址清单,可以设置一个或者多个
34 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
35
36 // 设置消费者组信息,消费者隶属的消费组,默认为空,如果设置为空,则会抛出异常,这个参数要设置成具有一定业务含义的名称
37 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
38
39 // 制定kafka消费者对应的客户端id,默认为空,如果不设置kafka消费者会自动生成一个非空字符串。
40 properties.put("client.id", "consumer.client.id.demo");
41
42 // 设置每次从最早的offset开始消费
43 // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
44
45 // 手动提交开启
46 // properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
47
48 // 指定消费者拦截器
49 properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName());
50
51 // 将参数设置到消费者参数中
52 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
53
54 // 消息订阅
55 consumer.subscribe(Collections.singletonList(topic));
56
57 while (true) {
58 // 每隔一秒监听一次,拉去指定主题分区的消息
59 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
60 if (records.isEmpty()) {
61 break;
62 }
63 for (ConsumerRecord<String, String> record : records) {
64 System.out.println(record.toString());
65 }
66 }
67
68 }
69
70 }
版权声明
本文为[麒麟改bug]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4678580/blog/4960573

  1. Redis 日志篇:系统高可用的杀手锏
  2. Java中把一个对象的值复制给另外一个对象引发的思考
  3. Java serialization / call wildfly service interface exception: ejbclient000409
  4. Docker compose deploy stack
  5. Mac下查看已安装的jdk版本及其安装目录
  6. Redis log: the killer of system high availability
  7. mybatis映射xml配置文件报错:<statement> or DELIMITER expected, got ‘id‘
  8. Thinking about copying the value of one object to another in Java
  9. IntelliJ IDEA 还能画思维导图,果然最强 IDE!
  10. vue使用sdk进行七牛云上传
  11. IntelliJ IDEA 还能画思维导图,果然最强 IDE!
  12. Spring原来还可以这么玩!阿里新产Spring全线宝典成功颠覆了我对Spring的认知!
  13. View the installed JDK version and its installation directory under mac
  14. Error in mybatis mapping XML configuration file: < statement > or delay expected, got 'ID‘
  15. IntelliJ IDEA 还能画思维导图,果然最强 IDE!
  16. Javascript性能优化【内联缓存】 V8引擎特性
  17. IntelliJ idea can also draw mind maps. It's really the strongest ide!
  18. Vue uses SDK to upload Qi Niu cloud
  19. IntelliJ idea can also draw mind maps. It's really the strongest ide!
  20. 深入理解 Web 协议 (三):HTTP 2
  21. Spring can still play like this! Ali's new spring product has successfully overturned my understanding of spring!
  22. IntelliJ idea can also draw mind maps. It's really the strongest ide!
  23. JavaScript performance optimization [inline cache] V8 engine features
  24. linux 配置java环境
  25. linux find 查找文件
  26. 深入理解 Web 协议 (三):HTTP 2
  27. IntelliJ IDEA 相关问题记录
  28. Deep understanding of Web protocol (3): http 2
  29. 深入理解 Web 协议 (三):HTTP 2
  30. 腾讯IEG开源AI SDK:自动化测试吃鸡、MOBA类游戏
  31. Mysql Command
  32. Configuring Java environment with Linux
  33. Find files in Linux
  34. docker-Dockerfile 创建镜像
  35. Redis Cluster
  36. 深入理解 Web 协议 (三):HTTP 2
  37. JavaScriptBOM操作
  38. JavaScriptBOM操作
  39. Deep understanding of Web protocol (3): http 2
  40. Record of IntelliJ idea related problems
  41. Deep understanding of Web protocol (3): http 2
  42. Tencent IEG open source AI SDK: automatic testing of chicken eating and MoBa games
  43. Mysql Command
  44. Docker dockerfile create image
  45. Redis Cluster
  46. 死磕Spring之IoC篇 - 文章导读
  47. Deep understanding of Web protocol (3): http 2
  48. JavaScript BOM operation
  49. JavaScript BOM operation
  50. 死磕Spring之IoC篇 - 文章导读
  51. k8s node 操作与维护
  52. k8s 证书更新
  53. 【Java面试题第三期】JVM中哪些地方会出现内存溢出?出现的原因是什么?
  54. HashMap连环问你能答出几道?
  55. k8s-cronjob
  56. k8s-cert
  57. Spring: an introduction to IOC
  58. Spring: an introduction to IOC
  59. Operation and maintenance of k8s node
  60. K8s certificate update