Previous articles 《 analysis SparkStreaming and Kafka Two ways to integrate 》 This is described in detail in SparkStreaming and Kafka Integration mainly includes Receiver based Approach and Direct Approach. At the same time, the advantages and disadvantages of the two are compared , And for different Spark、Kafka Support for integrated version handling :

This paper mainly introduces ,SparkStreaming and Kafka Use Direct Approach When you deal with a task in this way , How to manage offset?

SparkStreaming adopt Direct Approach Access to receive data :

KafkaUtils.createDirectStream. When the method is called , Will first create

KafkaCluster:val kc = new KafkaCluster(kafkaParams)

KafkaCluster Responsible for and Kafka, This class will get Kafka The partition information of 、 establish DirectKafkaInputDStream, Every DirectKafkaInputDStream Corresponding to one topic, Every DirectKafkaInputDStream Will also hold a KafkaCluster example .

After the calculation period , Would call DirectKafkaInputDStream Of compute Method , Do the following :

  1. Get corresponding Kafka Partition Of untilOffset, To determine the interval where the data needs to be obtained
  2. structure KafkaRDD example . In each calculation cycle ,DirectKafkaInputDStream and KafkaRDD It's one-to-one
  3. Will be relevant offset Report the information to InputInfoTracker
  4. Return to the RDD

About KafkaRDD and Kafka The partition correspondence of , You can refer to this article 《 important | Spark Partition parallelism determination mechanism 》

SparkStreaming and Kafka adopt Direct Way integration , Manage yourself offsets Code practice :

1. Business logic processing

/**
* @Author: WeChat official account - Big data learning and sharing
*/
object SparkStreamingKafkaDirect { def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
s"""
|Usage: SparkStreamingKafkaDirect <brokers> <topics> <groupid>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
|
""".stripMargin)
System.exit(1)
} val Array(brokers, topics, groupId) = args val sparkConf = new SparkConf().setAppName("DirectKafka")
sparkConf.setMaster("local[*]")
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val ssc = new StreamingContext(sparkConf, Seconds(6)) val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
) val km = new KafkaManager(kafkaParams) val streams = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet) streams.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// Deal with the message first
do something... // Update again offsets
km.updateZKOffsets(rdd)
}
}) ssc.start()
ssc.awaitTermination()
}
}

2. offset The core logic of Management

2.1 utilize zookeeper

Be careful : Self defined KafkaManager It has to be in the bag org.apache.spark.streaming.kafka Next

package org.apache.spark.streaming.kafka
/**
* @Author: WeChat official account - Big data learning and sharing
* Spark-Streaming and Kafka Direct connection : Manage yourself offsets
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams) def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K] : ClassTag,
VD <: Decoder[V] : ClassTag](ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get // from zookeeper Read up offset First update according to the actual situation offset
setOrUpdateOffsets(topics, groupId) // from zookeeper Read up offset Start spending message
val messages = {
// Get partition //Either Classes that handle exceptions , Usually Left Abnormal said ,Right Is normal
val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics)
if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}") val partitions = partitionsE.right.get val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) throw new SparkException(s"get kafka consumer offsets failed:${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
} /** Before you create a data stream , Update consumption according to the actual situation offsets */
def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach { topic =>
var hasConsumed = true
// Get every topic Partition
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft) throw new SparkException(s"get kafka partition failed:${partitionsE.left.get}") // Get partition results normally
val partitions = partitionsE.right.get
// Get the consumption offset
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false if (hasConsumed) {
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}") val earliestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earliestLeaderOffsetsE.right.get
val consumerOffsets: Map[TopicAndPartition, Long] = consumerOffsetsE.right.get var offsets: mutable.HashMap[TopicAndPartition, Long] = mutable.HashMap[TopicAndPartition, Long]()
consumerOffsets.foreach { case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
//offsets += (tp -> n)
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition + "offsets Deprecated , Updated to :" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
println(n, earliestLeaderOffset, kc.getLatestLeaderOffsets(partitions).right)
}
println("map...." + offsets)
if (offsets.nonEmpty) kc.setConsumerOffsets(groupId, offsets.toMap) // val cs = consumerOffsetsE.right.get
// val lastest = kc.getLatestLeaderOffsets(partitions).right.get
// val earliest = kc.getEarliestLeaderOffsets(partitions).right.get
// var newCS: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
// cs.foreach { f =>
// val max = lastest.get(f._1).get.offset
// val min = earliest.get(f._1).get.offset
// newCS += (f._1 -> f._2)
// // If zookeeper Recorded in the offset stay kafka Does not exist in the ( Has expired ) Just specify its existing kafka Minimum offset Location begins to consume
// if (f._2 < min) {
// newCS += (f._1 -> min)
// }
// println(max + "-----" + f._2 + "--------" + min)
// }
// if (newCS.nonEmpty) kc.setConsumerOffsets(groupId, newCS)
} else {
println(" No consumption ....")
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase) val leaderOffsets: Map[TopicAndPartition, LeaderOffset] = if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft) throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsetsE.right.get
} else {
//largest
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft) throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map { case (tp, lo) => (tp, lo.offset) }
kc.setConsumerOffsets(groupId, offsets) /*
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
val result = for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {
kc.getEarliestLeaderOffsets(topicPartitions)
} else {
kc.getLatestLeaderOffsets(topicPartitions)
}).right
} yield {
leaderOffsets.map { case (tp, lo) =>
(tp, lo.offset)
}
}
*/ }
}
} /** to update zookeeper Consumption on offsets */
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams("group.id")
val offsetList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetList.foreach { offset =>
val topicAndPartition = TopicAndPartition(offset.topic, offset.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offset.untilOffset)))
if (o.isLeft) println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}

2.2 Do not use zookeeper

/**
* @author Big data learning and sharing
* Spark Streaming and Kafka082 adopt mysql maintain offset
*/
object SaveOffset2Mysql { def getLastOffsets(database: String, sql: String, jdbcOptions:Map[String,String]): HashMap[TopicAndPartition, Long] = {
val getConnection: () => Connection = JdbcUtils.createConnectionFactory(new JDBCOptions(jdbcOptions))
val conn = getConnection() val pst = conn.prepareStatement(sql)
val res = pst.executeQuery()
var map: HashMap[TopicAndPartition, Long] = HashMap()
while (res.next()) {
val o = res.getString(1)
val jSONArray = JSONArray.fromObject(o)
jSONArray.toArray.foreach { offset =>
val json = JSONObject.fromObject(offset)
val topicAndPartition = TopicAndPartition(json.getString("topic"), json.getInt("partition"))
map += topicAndPartition -> json.getLong("untilOffset")
}
}
pst.close()
conn.close()
map
} def offsetRanges2Json(offsetRanges: Array[OffsetRange]): JSONArray = {
val jSONArray = new JSONArray
offsetRanges.foreach { offsetRange =>
val jSONObject = new JSONObject()
jSONObject.accumulate("topic", offsetRange.topic)
jSONObject.accumulate("partition", offsetRange.partition)
jSONObject.accumulate("fromOffset", offsetRange.fromOffset)
jSONObject.accumulate("untilOffset", offsetRange.untilOffset)
jSONArray.add(jSONObject)
}
jSONArray
} def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5)) val kafkaParams = Map("metadata.broker.list" -> SystemProperties.BROKERS,
"zookeeper.connect" -> SystemProperties.ZK_SERVERS,
"zookeeper.connection.timeout.ms" -> "10000")
val topics = Set("pv") val tpMap = getLastOffsets("test", "select offset from res where id = (select max(id) from res)")
var messages: InputDStream[(String, String)] = null
if (tpMap.nonEmpty) {
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, tpMap, (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()))
} else {
kafkaParams + ("auto.offset.reset" -> "largest")
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
// var oRanges = Array[OffsetRange]()
// messages.transform { rdd =>
// oRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// rdd
// }.foreachRDD { rdd =>
// val offset = offsetRanges2Json(oRanges).toString
// } messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.map(_._2).flatMap(_.split(" ")).map((_, 1L)).reduceByKey(_ + _).repartition(1)
.foreachPartition { par =>
if (par.nonEmpty) {
val conn = MysqlUtil.getConnection("test")
conn.setAutoCommit(false)
val pst = conn.prepareStatement("INSERT INTO res (word,count,offset,time) VALUES (?,?,?,?)")
par.foreach { case (word, count) =>
pst.setString(1, word)
pst.setLong(2, count)
pst.setString(3, offset)
pst.setTimestamp(4, new Timestamp(System.currentTimeMillis()))
pst.addBatch()
}
pst.executeBatch()
conn.commit()
pst.close()
conn.close()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
// Spark Streaming and Kafka010 Integrated maintenance offset
val kafkaParams = Map[String, Object]("bootstrap.servers" -> SystemProperties.BROKERS,
"key.deserializer" -> classOf[StringDeserializer],
"key.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)) val messages = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topicSet, kafkaParams, getLastOffsets(kafkaParams, topicSet))) messages.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
iter.foreach { each =>
s"Do Something with $each"
}
} messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} def getLastOffsets(kafkaParams: Map[String, Object], topicSet: Set[String]): Map[TopicPartition, Long] = {
val props = new Properties()
props.putAll(kafkaParams.asJava) val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(topicSet.asJavaCollection) paranoidPoll(consumer) val consumerAssign = consumer.assignment().asScala.map(tp => tp -> consumer.position(tp)).toMap consumer.close()
consumerAssign
} /** reflection : Message consumed but submitted offsets At the time of failure offsets correct ? */
def paranoidPoll(consumer: KafkaConsumer[String, String]): Unit = {
val msg = consumer.poll(Duration.ZERO)
if (!msg.isEmpty) {
// position should be minimum offset per topic partition
// val x: ((Map[TopicPartition, Long], ConsumerRecord[String, String]) => Map[TopicPartition, Long]) => Map[TopicPartition, Long] = msg.asScala.foldLeft(Map[TopicPartition, Long]())
msg.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
val tp = new TopicPartition(m.topic(), m.partition())
val off = acc.get(tp).map(o => Math.min(o, m.offset())).getOrElse(m.offset()) acc + (tp -> off)
}.foreach { case (tp, off) =>
consumer.seek(tp, off)
}
}
}

The above gives a demo Ideas . In production , But also combined with specific business scenarios , Make special treatment according to different situations .

Recommended articles :

Spark Flow state management

Kafka As a system complement to the message system

Spark Broadcast variables and how to dynamically update broadcast variables

Java Concurrent queues and containers

Hadoop tuning

Redis Consistent hashing problem in

Kafka High performance analysis

Yes Spark Hardware configuration suggestions

Spark Clustering and task execution

adopt spark.default.parallelism To talk about Spark On parallelism


Focus on WeChat official account : Big data learning and sharing , Get more on technology dry goods

SparkStreaming and Kafka be based on Direct Approach How to manage offset Realization exactly once More articles about

  1. SparkStreaming obtain kafka Two ways of data :Receiver And Direct

    brief introduction : Spark-Streaming obtain kafka Two ways of data -Receiver And Direct The way , It can be simply understood as : Receiver The way is through zookeeper To connect kafka queue , Dire ...

  2. be based on Java+SparkStreaming Integrate kafka Programming

    One . Download dependency jar package For details, please refer to :SparkStreaming Integrate kafka Programming Two . establish Java engineering Too simple , A little . 3、 ... and . Practical example spark There are many examples in the package , Specific path :spark-2.1.1 ...

  3. Spark-Streaming obtain kafka Two ways of data :Receiver And Direct The way

    It is simply understood as :Receiver The way is through zookeeper To connect kafka queue ,Direct The way is to connect directly to kafka Get data on the node of Receiver Use Kafka The high level of Consumer API Come on ...

  4. sparkStreaming Read kafka Two ways

    summary Spark Streaming Support multiple real-time input source data reading , These include Kafka.flume.socket Flow and so on . except Kafka Real time input sources other than , Because our business scenario doesn't involve , There will be no discussion of . This article ...

  5. analysis SparkStreaming and Kafka Two ways to integrate

    spark streaming It's a streaming engine based on micro batch processing , It's usually the use of spark core perhaps spark core And spark sql Let's deal with the data together . In the enterprise real-time processing architecture , Will usually spark strea ...

  6. Tools section -Spark-Streaming obtain kafka Two ways of data ( Reprint )

    Reprinted from :https://blog.csdn.net/weixin_41615494/article/details/7952173 One . be based on Receiver The way principle Receiver from Kafka in ...

  7. 《Apache kafka actual combat 》 Reading notes - management Kafka Cluster security ACL piece

    <Apache kafka actual combat > Reading notes - management Kafka Cluster security ACL piece author : Yin Zhengjie Copyright notice : Original works , Declined reprint ! Otherwise, the legal liability will be investigated . I think you can see the little friends of this blog , I guess you are right kaf ...

  8. sparkStreaming consumption kafka-1.0.1 The way :direct The way ( Storage offset To zookeeper)-- 2

    Refer to the last post :https://www.cnblogs.com/niutao/p/10547718.html Same logic , Different packages package offsetInZookeeper /** * Cr ...

  9. sparkStreaming consumption kafka-1.0.1 The way :direct The way ( Storage offset To zookeeper)

    Version statement : kafka:1.0.1 spark:2.1.0 Be careful : In the process of use, there may be servlet Version incompatibility problem , So in importing maven Of pom When you file , You need to do the appropriate exclusion <?xml ...

  10. Spark+Kafka Of Direct Method to send the offset to Zookeeper Realization ( turn )

    Link to the original text :Spark+Kafka Of Direct Method to send the offset to Zookeeper Realization Apache Spark 1.3.0 Introduced Direct API, utilize Kafka The lower level of API from Kafka Read in a cluster ...

Random recommendation

  1. jquery datatable

    <html><head></head> <script type="text/javascript"> $(document).re ...

  2. JDE Section Set default not to execute

    After this property is set , The Section It can only be called manually , Default not executed .

  3. Paip.Php Java Asynchronous programming . Push model and pull model . Response type (Reactive)” Programming FutureData summary ... 1

    Paip.Php  Java Asynchronous programming . Push model and pull model . Response type (Reactive)" Programming FutureData summary ... 1.1.1       Asynchronous call implementation and role (: caller Bill of lading ) F ...

  4. No more content td open

    <style type="text/css"> table {width:600px;table-layout:fixed;} td {white-space:nowr ...

  5. bzoj 2134 Single choice dislocation ( expect )

    [ Topic link ] http://www.lydsy.com/JudgeOnline/problem.php?id=2134 [ The question ] ai And ai+1 Equal 1 branch , Expect . [ Ideas ] The expectation of each question is independent . ...

  6. [ turn ]LINQ Sentence of Select/Distinct and Count/Sum/Min/Max/Avg

    It's about LINQ, By the way Where operation , At the beginning of this article, let's continue LINQ sentence , The purpose of this paper is to let you understand from the perspective of sentence LINQ,LINQ Include LINQ to Objects.LINQ to DataSets.LINQ ...

  7. web Basics -web working principle ,http agreement , Browser cache

    1,web working principle 2,http agreement 3, Browser cache 4,cookie and session -------------------------------------------------------- ...

  8. DKNY_ Baidu Encyclopedia

    DKNY_ Baidu Encyclopedia DKNY

  9. Jquery How to get iframe Inside body Of html Well ?

    If it's your own web page , It can be like this ,$("iframe").contents().find("body").html(); intend , obtain iframe Inside the page body Within the ...

  10. python The whole development of the stack day48-jqurey Custom animation ,jQuery Attribute operation ,jQuery Document operations for ,jQuery Medium ajax

    One . Yesterday's review 1.jQuery First time to know 1). Use jQuery Instead of JS The six reasons for this 2).jQuery Objects and js Object conversion 3).jQuery Two characteristics of 4).jQuery Three ways to write the entry function of 5).jQu ...