Kafka 消費組消費者分配策略

itread01 2021-01-05 17:11:51
kafka 技术开发 策略 itread01 分配


微信公眾號:蘇言論
理論聯絡實際,暢言技術與生活。

消費組和消費者是kafka中比較重要的概念,理解和掌握原理有利於優化kafka效能和處理消費積壓問題。Kafka topic 由多個分割槽組成,分割槽分佈在叢集節點上;

Topic:topic01 PartitionCount:10 ReplicationFactor:2 Configs: Topic: topic01 Partition: 0 Leader: 1 Replicas: 1,4 Isr: 1,4 Topic: topic01 Partition: 1 Leader: 2 Replicas: 2,5 Isr: 2,5 Topic: topic01 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: topic01 Partition: 3 Leader: 4 Replicas: 4,2 Isr: 4,2 Topic: topic01 Partition: 4 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: topic01 Partition: 5 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: topic01 Partition: 6 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: topic01 Partition: 7 Leader: 3 Replicas: 3,5 Isr: 3,5 Topic: topic01 Partition: 8 Leader: 4 Replicas: 4,1 Isr: 4,1 Topic: topic01 Partition: 9 Leader: 5 Replicas: 5,2 Isr: 5,2

當外部程式消費topic資料時,kafka將其視為消費組(ConsumerGroup),每個消費組包含1個或多個消費者(Consumer),消費者數量最多可以為分割槽總數量,並不是可以無限量。當消費組中的任意一個消費者終止時,kafka會對消費組進行平衡(Rebalance),再根據存活消費數和消費者分配策略重新分配消費者。在0.10.x版本中,kafka提供兩種分配策略(RangeAssignor、RoundRobinAssignor),0.11.x 版本新增策略(StickyAssignor),結構如下;

1 RangeAssignor 策略

RangeAssignor 以主題為單位,以資料順序排列可用分割槽,以字典順序排列消費者,將topic分割槽數除以消費者總數,以確定分配給每個消費者的分割槽數;如果沒有平均分配,那麼前幾個消費者將擁有一個額外的分割槽。實現程式碼;

for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List<String> consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //topic分割槽數除以消費者總數 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //計算額外分割槽 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } }

比如有兩個topic(topic1 ,topic2) ,每個topic都有三個分割槽;

  • topic1 ,分割槽:topic1p0,topic1p1,topic1p2
  • topic2 ,分割槽:topic2p0,topic2p1,topic2p2

和一個消費組(consumer_group1),有(consumer1,consumer2)兩個消費者,使用RangeAssignor策略可能會得到如下的分配:

  • consumer1: [topic1p0,topic1p1,topic2p0,topic2p1]
  • consumer2: [topic1p2,topic2p2]

如果此時消費組(consumer_group1)有新的消費者consumer3加入,使用RangeAssignor策略可能會得到如下的分配:

  • consumer1: [topic1p0,topic2p0]
  • consumer2: [topic1p2,topic2p2]
  • consumer3: [topic1p1,topic2p1]

2 RoundRobinAssignor 策略

RoundRobinAssignor 是kafka預設策略,對所有分割槽和所有消費者迴圈分配,分割槽更均衡;實現程式碼;

Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { final String topic = partition.topic(); while (!subscriptions.get(assigner.peek()).topics().contains(topic)) assigner.next(); assignment.get(assigner.next()).add(partition); }

繼續以上例topic和消費組為例,RoundRobinAssignor 策略可能會得到如下的分配;

  • consumer1: [topic1p0,topic1p1,topic2p2,]
  • consumer2: [topic2p0,topic2p1,topic1p2]

3 StickyAssignor 策略

StickyAssignor 策略是最複雜且是0.11.x 版本出現的新策略,該策略主要作用:

  • 使topic分割槽分配儘可能均勻的分配給消費者
  • 當某個消費者終止觸發重新分配時,儘可能保留現有分配,將已經終止的消費者所分配的分割槽移動到另一個消費者,避免全部分割槽重新平衡,節省開銷。

這個策略自0.11.x 版本出現後,一直到新版本有不同bug被發現,低版本慎用。

4 java多執行緒消費例項

public class KafkaTopicConsumer { private KafkaConsumer<String, String> consumer; private int consumerId=0; //消費例項id private final long timeOut=10000; public KafkaTopicConsumer(int consumerId){ this.consumerId=consumerId; Properties props = new Properties(); props.put("client.id", "client-" + consumerId); props.put("bootstrap.servers","192.168.1.10:9092,192.168.1.11:9092"); props.put("group.id", "test-group03"); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //設定分割槽策略 props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("topic1","topic2")); } public void consume() { while (true){ ConsumerRecords<String, String> records=consumer.poll(timeOut); System.out.println("records count:"+records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("client-id = %d , topic = %s, partition = %d , offset = %d, key = %s, value = %s", this.consumerId,record.topic(), record.partition(), record.offset(), record.key(), record.value())); } consumer.commitSync(); } } public static void main(String[] args) { int threadSize=Integer.parseInt(args[0]); for (int i = 0; i < threadSize; i++) { int id = i; new Thread() { @Override public void run() { new KafkaTopicConsumer(id).consume(); } }.start(); } }//}

啟動三個多執行緒例項消費,分割槽分配到每個消費者的情況;

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERtest-group03 topic2 0 0 3333 3333 client-0_/192.168.1.13test-group03 topic1 0 500 3333 2833 client-0_/192.168.1.13test-group03 topic2 2 0 3333 3333 client-2_/192.168.1.13test-group03 topic1 2 500 3333 2833 client-2_/192.168.1.13test-group03 topic2 1 500 3334 2834 client-1_/192.168.1.13test-group03 topic1 1 0 3334 3334 client-1_/192.168.1.13

對於大的topic,將topic單獨消費以避免資料積壓和topic各自影響資料處理速度,比如文章開始時提到的10分割槽的topic(topic01),根據硬體資源和分割槽策略設定合理的消費者,資料量大時最優的消費者數量為分割槽總數。

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERtest-group02 topic01 6 373460 1026328 652868 client-6_/192.168.1.13test-group02 topic01 2 375660 1048756 673096 client-2_/192.168.1.13test-group02 topic01 5 374625 1013157 638532 client-5_/192.168.1.13test-group02 topic01 3 347001 1066967 719966 client-3_/192.168.1.13test-group02 topic01 0 375570 1013261 637691 client-0_/192.168.1.13test-group02 topic01 9 376545 1094088 717543 client-9_/192.168.1.13test-group02 topic01 8 347082 1066948 719866 client-8_/192.168.1.13test-group02 topic01 7 375100 1048827 673727 client-7_/192.168.1.13test-group02 topic01 1 372447 1026467 654020 client-1_/192.168.1.13test-group02 topic01 4 377052 1093926 716874 client-4_/192.168.1.13

5 總結

Kafka提供三種分配策略(RangeAssignor、RoundRobinAssignor、StickyAssignor),其中StickyAssignor策略是0.11.x 版本新增的,每種策略不盡相同,RangeAssignor策略以主題為單位,以資料順序排列可用分割槽,以字典順序排列消費者計算分配;RoundRobinAssignor 對所有分割槽和所有消費者迴圈均勻分配;但這兩種分配策略當有消費者終止或加入時均會觸發消費組平衡;StickyAssignor 策略當某個消費者終止時,儘可能保留現有分配,將已經終止的消費者所分配的分割槽移動到另一個消費者,避免全部分割槽重新平衡,節省開銷;對於topic分割槽數較多、數量較大使用StickyAssignor策略有較大優勢。

參考文獻

  • https://kafka.apache.org/0100/javadoc - kafka 0.10.0.1 API
  • https://kafka.apache.org/0100/documentation.html - kafka DOCUMENTATION
版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://www.itread01.com/content/1609478582.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云