Kafka入门到精通

枫树湾河桥 2020-11-11 20:10:23
kafka 入门 博客园 技术开发 精通


原文链接:https://blog.csdn.net/qq_43323585/article/details/105824989

Kafka概述介绍

kafka是什么

Kafka是Apache开源的流处理平台,该平台提供了消息的订阅与发布。具有高吞吐、简单、易部署等特点。

Kafka干什么

  1. 消息队列:用于系统解耦、异步通信、流量填谷等。
  2. Kaka Streaming:实时在线流处理。

消息队列工作模式

消息队列的两种工作模式 :1.最多一次 2.没有限制。如图:
在这里插入图片描述
那么Kafka是怎样的工作模式?继续

Kafka架构和概念

一些概念还是要知道的,编码会用到并且可以帮助理解!最后我会附上我的git地址,我想关于kafka的业务用我的代码库基本都可以搞定了

Kafka消息的生产和消费

Kafka以Topic形式管理分类集群中的消息(Record)。每个Record属于一个Topic并且每个Topic存在多个分区(partition)存放Record,每个分区对应一个服务器(Broker),这个Broker被称为leader,分区副本对应的Broker成为follower。**需要注意的是只有leader可以读写。**没错,我们很容易就想到zookeeper,那个号称目前为止唯一的分布式一致性算法Paxos,这也为Kafka的数据可靠性提供了保证。可能有点抽象,看图:
在这里插入图片描述在这里插入图片描述

分区和日志

日志就是log,其实就是数据,有点像redis里的log。并不是我们说的打印日志,呵呵
3. 每个Topic可以被多个消费者订阅,Kafka通过Topic管理partiton。
4. 消费者可以通过轮询,负载(对Record的key取模)的方式将Record存入partition中。
5. partition是一个有序不可变的日志序列,每个Record有唯一的offset,用于记录消费情况、为持久化策略提供支撑。
6. kafka默认配置log.retention.hours=168,不管消息有没有被消费Record都可以保存168小时–硬盘保存,具体持久化策略可以通过配置文件定制。
7. partiton内部有序,在多个分区情况下,不要指望先生产先消费。写业务和编码要注意
为什么Kafka要放弃有序,采用分区局部有序性?
就像hadoop的一样,分布式集群,打破物理局限性,不管是性能容量并发量都得到了质的提升,一台搞不定就一百台。毕竟Kafka是大数据框架。

消费组

概念:是一个逻辑的消费者,有多个消费者实例组成。如果4个消费者同时订阅topic1(4个分区),那么一个分区要被消费4次。引入消费组就可以避免重复消费。
编码:消费者可以采用subscribe和assign,采用subscribe订阅topic必须指定消费组!
在这里插入图片描述

Kafka高性能之道

为什么Kafka吞吐量如此之高

Kafka可以轻松支持百万级的写入请求,而且数据会持久化到硬盘,恐怖吗。其实现在想想,一个高性能的技术都是对内核的一个封装,比如Redis底层调用的epoll(),最厉害的还是老大哥OS。

顺序写、mmap

顺序写:硬盘是一个机械结构,寻址是极其耗时的机械动作,所以Kafka用了顺序IO,避免了大量的内存开销和IO寻址的时间。
mmap:Memory Mapped Files内存映射文件,它的工作原理是利用操作系统的PageCache实现文件到物理内存的直接映射,直接写入到了pagecache中,用户对内存的所有操作会被操作系统自动刷新到磁盘上,极大的降低了IO使用率。mmap相当于一个用户态可以直接访问内核态的共享空间,避免了用户态向内核态的切换和copy。
在这里插入图片描述

ZeroCopy

Kafka服务器在响应客户端读取的时候,底层使用ZeroCopy技术,直接将磁盘无需拷贝到用户空间,而是直接将数据通过内核空间传输出去,数据并没有抵达用户空间。
IO模型在这里不赘述了,以后再写吧。
在这里插入图片描述

搭建Kafka集群

请看我写的zookeeper和kafka搭建两篇文章,参考:Kafka集群搭建

Topic管理

  • bootstrap-server:消费者,拉取数据,老版本用-zookeeper这个参数。并且好多数据放到zk中了,这个很不合理。
  • broker-list:生产者,推送数据
  • partitions:分区个数
  • replication-factor : 分区副本因子

创建topic

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --create --partitions 3 --replication-factor 2 --topic debug
  • 1

查看主题列表

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
  • 1

查看主题详情

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --describe --topic debug
  • 1

删除topic

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --delete --topic debug
  • 1

生产某个主题的消息

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic debug
  • 1

消费某个主题的消息

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic debug --from-beginning
  • 1
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic debug --group group1
  • 1

这几个命令还是很有用的!可以辅助我们的测试。

查看消费组

bin/kafka-consumer-groups.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
  • 1

Kafka API

资料来源于工具书的实例,非常全面,已经上传到GIt,现在还没来得及整理,不过以后会整理的,如果觉得不错给个星星哦!包括:生产消费,自定义分区,序列化,拦截器,事务,kafka流处理等。
Git地址:Kafka API

Kafka特性

acks、retries机制

acks应答机制:

  1. acks=1,leader写成功,不等待follower确认返回。无法保证单点故障
  2. acks=0,发送给套接字缓存,确认返回。消息安全系数最低
  3. acks=-1,leader至少一个follower应答后再确认返回。高可用集群
    retries重试机制:
    request.timeout.ms=3000默认超时重试。
    retries=2147483647xxx重试次数。
    Kafka消息语义:消息能至少一次保存

幂等写

幂等性:多次请求的结果和一次请求的结果一致。
Kafka幂等写解决方案:
在这里插入图片描述
对于一些业务上的幂等问题是可以借鉴一下的。要真正解决幂等:{黑名单、签名、token}
需要注意的是
enable.idempotence=false默认关闭幂等
前提条件:retries=true;acks=all

事务

因为 Kafka幂等写不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即 Kafka 事务。
producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。
consumer提供了read_committed和read_uncommitted。

KafkaEagle监控软件

开源地址:https://github.com/smartloli/kafka-eagle
包括我一开始学习Kafka的书也是他写的。

安装过程

  1. 下载kafka-eagle-bin-1.4.0.tar.gz并解压
  2. mv kafka-eagle-bin-1.4.0 /opt/
  3. 解压之后里面还有个压缩包,再次解压
  4. 修改环境变量
vim /etc/profile
------------------------
# kafka-eagle
export KE_HOME=/opt/kafka-eagle
# PATH
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KE_HOME/bin
------------------------
source /etc/profile
echo $PATH
OK~
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 修改kafka-eagle配置
[[email protected] conf]# vim system-config.properties
-----------------------------------------------------
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
cluster1.kafka.eagle.offset.storage=kafka
#cluster2.kafka.eagle.offset.storage=zk
kafka.eagle.metrics.charts=true
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
#cluster2.kafka.eagle.sasl.enable=false
#cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
#cluster2.kafka.eagle.sasl.mechanism=PLAIN
#cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
#cluster2.kafka.eagle.sasl.client.id=
######################################
# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org
######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  1. 修改kafka启动配置,开启JMS
vim kafka-server-start.sh
-------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export JMX_PORT="7379"
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 最后启动,发现没有启动权限
chmod u+x ke.sh
./ke.sh
  • 1
  • 2
Version 1.4.0
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.83.11:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

总结

到此为止,3台zk+3台kafka+kafka-eagle监控其实可以做很多事了。比如:日志收集、消息系统、活动追踪、运营指标、流式处理等。希望能给看到的人些许帮助!
辅助文档:
kafka集群:链接
zk集群:链接

版权声明
本文为[枫树湾河桥]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/fswhq/p/13853519.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云