Исключение при доступе к KafkaOffset из RDD

У меня есть потребитель Spark, который течет из Кафки. Я пытаюсь управлять смещениями для семантики, выполняемой ровно один раз.

Однако при доступе к смещению выдается следующее исключение:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD не может быть приведен к org.apache.spark.streaming.kafka.HasOffsetRanges"

Часть кода, которая делает это, как показано ниже:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

Здесь dataStream - это прямой поток (DStream [String]), созданный с использованием KafkaUtils API, что-то вроде:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

Если кто-то может помочь мне понять, что я делаю здесь неправильно. transform - это первый метод в цепочке методов, выполняемых на потоке данных, как также указано в официальной документации

Благодарю.

Ответы на вопрос(1)

Ваш ответ на вопрос