Exceção ao acessar o KafkaOffset a partir do RDD
Eu tenho um consumidor Spark que transmite de Kafka. Estou tentando gerenciar deslocamentos para a semântica exatamente uma vez.
No entanto, ao acessar o deslocamento, lança a seguinte exceção:
"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD não pode ser convertido em org.apache.spark.streaming.kafka.HasOffsetRanges"
A parte do código que faz isso é a seguinte:
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
Aqui, dataStream é um fluxo direto (DStream [String]) criado usando a API KafkaUtils, algo como:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
Se alguém puder me ajudar a entender o que estou fazendo de errado aqui. transform é o primeiro método na cadeia de métodos executados no fluxo de dados, conforme mencionado na documentação oficial,
Obrigado.