SparkStreaming和Kafka基于Direct Approach如何管理offset实现exactly once

大数据学习与分享 2021-01-22 10:20:26
管理 kafka approach sparkstreaming direct


在之前的文章《解析SparkStreaming和Kafka集成的两种方式》中已详细介绍SparkStreaming和Kafka集成主要有Receiver based Approach和Direct Approach。同时对比了二者的优劣势,以及针对不同的Spark、Kafka集成版本处理方式的支持:

本文主要介绍,SparkStreaming和Kafka使用Direct Approach方式处理任务时,如何自己管理offset?

SparkStreaming通过Direct Approach接收数据的入口:

KafkaUtils.createDirectStream。在调用该方法时,会先创建

KafkaCluster:val kc = new KafkaCluster(kafkaParams)

 

KafkaCluster负责和Kafka,该类会获取Kafka的分区信息、创建DirectKafkaInputDStream,每个DirectKafkaInputDStream对应一个topic,每个DirectKafkaInputDStream也会持有一个KafkaCluster实例。

到了计算周期后,会调用DirectKafkaInputDStream的compute方法,执行以下操作:

  1. 获取对应Kafka Partition的untilOffset,以确定需要获取数据的区间
  2. 构建KafkaRDD实例。每个计算周期里,DirectKafkaInputDStream和KafkaRDD是一一对应的
  3. 将相关的offset信息报给InputInfoTracker
  4. 返回该RDD

关于KafkaRDD和Kafka的分区对应关系,可以参考这篇文章
《重要 | Spark分区并行度决定机制》

SparkStreaming和Kafka通过Direct方式集成,自己管理offsets代码实践:

1. 业务逻辑处理

/**
* @Author: 微信公众号-大数据学习与分享
*/
object SparkStreamingKafkaDirect {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
s"""
|Usage: SparkStreamingKafkaDirect <brokers> <topics> <groupid>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics, groupId) = args
val sparkConf = new SparkConf().setAppName("DirectKafka")
sparkConf.setMaster("local[*]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(6))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
val km = new KafkaManager(kafkaParams)
val streams = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
streams.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// 先处理消息
do something...
// 再更新offsets
km.updateZKOffsets(rdd)
}
})
ssc.start()
ssc.awaitTermination()
}
}

 

2. offset管理核心逻辑

2.1 利用zookeeper

注意:自定义的KafkaManager必须在包org.apache.spark.streaming.kafka下

package org.apache.spark.streaming.kafka
/**
* @Author: 微信公众号-大数据学习与分享
* Spark-Streaming和Kafka直连方式:自己管理offsets
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K] : ClassTag,
VD <: Decoder[V] : ClassTag](ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
//从zookeeper上读取offset前先根据实际情况更新offset
setOrUpdateOffsets(topics, groupId)
//从zookeeper上读取offset开始消费message
val messages = {
//获取分区 //Either处理异常的类,通常Left表示异常,Right表示正常
val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics)
if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed:${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
}
/** 创建数据流之前,根据实际情况更新消费offsets */
def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach { topic =>
var hasConsumed = true
//获取每一个topic分区
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}")
//正常获取分区结果
val partitions = partitionsE.right.get
//获取消费偏移量
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earliestLeaderOffsetsE.right.get
val consumerOffsets: Map[TopicAndPartition, Long] = consumerOffsetsE.right.get
var offsets: mutable.HashMap[TopicAndPartition, Long] = mutable.HashMap[TopicAndPartition, Long]()
consumerOffsets.foreach { case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
//offsets += (tp -> n)
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + "offsets已过时,更新为:" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
println(n, earliestLeaderOffset, kc.getLatestLeaderOffsets(partitions).right)
}
println("map...." + offsets)
if (offsets.nonEmpty) kc.setConsumerOffsets(groupId, offsets.toMap)
// val cs = consumerOffsetsE.right.get
// val lastest = kc.getLatestLeaderOffsets(partitions).right.get
// val earliest = kc.getEarliestLeaderOffsets(partitions).right.get
// var newCS: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
// cs.foreach { f =>
// val max = lastest.get(f._1).get.offset
// val min = earliest.get(f._1).get.offset
// newCS += (f._1 -> f._2)
// //如果zookeeper中记录的offset在kafka中不存在(已过期)就指定其现有kafka的最小offset位置开始消费
// if (f._2 < min) {
// newCS += (f._1 -> min)
// }
// println(max + "-----" + f._2 + "--------" + min)
// }
// if (newCS.nonEmpty) kc.setConsumerOffsets(groupId, newCS)
} else {
println("没有消费过....")
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
val leaderOffsets: Map[TopicAndPartition, LeaderOffset] = if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsetsE.right.get
} else {
//largest
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) }
kc.setConsumerOffsets(groupId, offsets)
/*
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
val result = for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {
kc.getEarliestLeaderOffsets(topicPartitions)
} else {
kc.getLatestLeaderOffsets(topicPartitions)
}).right
} yield {
leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
}
*/
}
}
}
/** 更新zookeeper上的消费offsets */
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams("group.id")
val offsetList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetList.foreach { offset =>
val topicAndPartition = TopicAndPartition(offset.topic, offset.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offset.untilOffset)))
if (o.isLeft) println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}

 

2.2 不利用zookeeper

/**
* @author 大数据学习与分享
* Spark Streaming和Kafka082通过mysql维护offset
*/
object SaveOffset2Mysql {
def getLastOffsets(database: String, sql: String, jdbcOptions:Map[String,String]): HashMap[TopicAndPartition, Long] = {
val getConnection: () => Connection = JdbcUtils.createConnectionFactory(new JDBCOptions(jdbcOptions))
val conn = getConnection()
val pst = conn.prepareStatement(sql)
val res = pst.executeQuery()
var map: HashMap[TopicAndPartition, Long] = HashMap()
while (res.next()) {
val o = res.getString(1)
val jSONArray = JSONArray.fromObject(o)
jSONArray.toArray.foreach { offset =>
val json = JSONObject.fromObject(offset)
val topicAndPartition = TopicAndPartition(json.getString("topic"), json.getInt("partition"))
map += topicAndPartition -> json.getLong("untilOffset")
}
}
pst.close()
conn.close()
map
}
def offsetRanges2Json(offsetRanges: Array[OffsetRange]): JSONArray = {
val jSONArray = new JSONArray
offsetRanges.foreach { offsetRange =>
val jSONObject = new JSONObject()
jSONObject.accumulate("topic", offsetRange.topic)
jSONObject.accumulate("partition", offsetRange.partition)
jSONObject.accumulate("fromOffset", offsetRange.fromOffset)
jSONObject.accumulate("untilOffset", offsetRange.untilOffset)
jSONArray.add(jSONObject)
}
jSONArray
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams = Map("metadata.broker.list" -> SystemProperties.BROKERS,
"zookeeper.connect" -> SystemProperties.ZK_SERVERS,
"zookeeper.connection.timeout.ms" -> "10000")
val topics = Set("pv")
val tpMap = getLastOffsets("test", "select offset from res where id = (select max(id) from res)")
var messages: InputDStream[(String, String)] = null
if (tpMap.nonEmpty) {
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, tpMap, (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))
} else {
kafkaParams + ("auto.offset.reset" -> "largest")
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
// var oRanges = Array[OffsetRange]()
// messages.transform { rdd =>
// oRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// rdd
// }.foreachRDD { rdd =>
// val offset = offsetRanges2Json(oRanges).toString
// }
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(_._2).flatMap(_.split(" ")).map((_, 1L)).reduceByKey(_ + _).repartition(1)
.foreachPartition { par =>
if (par.nonEmpty) {
val conn = MysqlUtil.getConnection("test")
conn.setAutoCommit(false)
val pst = conn.prepareStatement("INSERT INTO res (word,count,offset,time) VALUES (?,?,?,?)")
par.foreach { case (word, count) =>
pst.setString(1, word)
pst.setLong(2, count)
pst.setString(3, offset)
pst.setTimestamp(4, new Timestamp(System.currentTimeMillis()))
pst.addBatch()
}
pst.executeBatch()
conn.commit()
pst.close()
conn.close()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}

 

// Spark Streaming和Kafka010整合维护offset
val kafkaParams = Map[String, Object]("bootstrap.servers" -> SystemProperties.BROKERS,
"key.deserializer" -> classOf[StringDeserializer],
"key.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val messages = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topicSet, kafkaParams, getLastOffsets(kafkaParams, topicSet)))
messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
iter.foreach { each =>
s"Do Something with $each"
}
}
messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
def getLastOffsets(kafkaParams: Map[String, Object], topicSet: Set[String]): Map[TopicPartition, Long] = {
val props = new Properties()
props.putAll(kafkaParams.asJava)
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(topicSet.asJavaCollection)
paranoidPoll(consumer)
val consumerAssign = consumer.assignment().asScala.map(tp => tp -> consumer.position(tp)).toMap
consumer.close()
consumerAssign
}
/** 思考: 消息已消费但提交offsets失败时的offsets矫正? */
def paranoidPoll(consumer: KafkaConsumer[String, String]): Unit = {
val msg = consumer.poll(Duration.ZERO)
if (!msg.isEmpty) {
// position should be minimum offset per topic partition
// val x: ((Map[TopicPartition, Long], ConsumerRecord[String, String]) => Map[TopicPartition, Long]) => Map[TopicPartition, Long] = msg.asScala.foldLeft(Map[TopicPartition, Long]())
msg.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic(), m.partition())
val off = acc.get(tp).map(o => Math.min(o, m.offset())).getOrElse(m.offset())
acc + (tp -> off)
}.foreach { case (tp, off) =>
consumer.seek(tp, off)
}
}
}

 

上述给出一个demo思路。实际生产中,还要结合具体的业务场景,根据不同情况做特殊处理。

推荐文章:

Spark流式状态管理

Kafka作为消息系统的系统补充

Spark中广播变量详解以及如何动态更新广播变量

Java并发队列与容器

Hadoop调优

Redis中一致性哈希问题

Kafka高性能分析

对Spark硬件配置的建议

Spark集群和任务执行

通过spark.default.parallelism谈Spark谈并行度


关注微信公众号:大数据学习与分享,获取更对技术干货

版权声明
本文为[大数据学习与分享]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/bigdatalearnshare/p/14267488.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云