Ausnahme beim Zugriff auf KafkaOffset über RDD

Ich habe einen Spark-Konsumenten, der von Kafka streamt. Ich versuche, Offsets für genau eine Semantik zu verwalten.

Beim Zugriff auf den Offset wird jedoch die folgende Ausnahme ausgelöst:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD kann nicht in org.apache.spark.streaming.kafka.HasOffsetRanges umgewandelt werden"

Der Teil des Codes, der dies tut, ist wie folgt:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

Hier dataStream ist ein direkter Stream (DStream [String]), der mit der KafkaUtils-API erstellt wurde. Beispiel:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

Wenn mir jemand helfen kann zu verstehen, was ich hier falsch mache. transform ist die erste Methode in der Methodenkette, die für den Datenstrom ausgeführt wird, wie dies auch in der offiziellen Dokumentation angegeben ist.

Vielen Dank

Antworten auf die Frage(2)

Ihre Antwort auf die Frage