Исключение при доступе к KafkaOffset из RDD
У меня есть потребитель Spark, который течет из Кафки. Я пытаюсь управлять смещениями для семантики, выполняемой ровно один раз.
Однако при доступе к смещению выдается следующее исключение:
"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD не может быть приведен к org.apache.spark.streaming.kafka.HasOffsetRanges"
Часть кода, которая делает это, как показано ниже:
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
Здесь dataStream - это прямой поток (DStream [String]), созданный с использованием KafkaUtils API, что-то вроде:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
Если кто-то может помочь мне понять, что я делаю здесь неправильно. transform - это первый метод в цепочке методов, выполняемых на потоке данных, как также указано в официальной документации
Благодарю.