Exceção ao acessar o KafkaOffset a partir do RDD

Eu tenho um consumidor Spark que transmite de Kafka. Estou tentando gerenciar deslocamentos para a semântica exatamente uma vez.

No entanto, ao acessar o deslocamento, lança a seguinte exceção:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD não pode ser convertido em org.apache.spark.streaming.kafka.HasOffsetRanges"

A parte do código que faz isso é a seguinte:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

Aqui, dataStream é um fluxo direto (DStream [String]) criado usando a API KafkaUtils, algo como:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

Se alguém puder me ajudar a entender o que estou fazendo de errado aqui. transform é o primeiro método na cadeia de métodos executados no fluxo de dados, conforme mencionado na documentação oficial,

Obrigado.

questionAnswers(1)

yourAnswerToTheQuestion