Excepción al acceder a KafkaOffset desde RDD

Tengo un consumidor de Spark que se transmite desde Kafka. Estoy tratando de gestionar las compensaciones para la semántica de una sola vez.

Sin embargo, al acceder al desplazamiento, arroja la siguiente excepción:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD no se puede enviar a org.apache.spark.streaming.kafka.HasOffsetRanges"

La parte del código que hace esto es la siguiente:

var offsetRanges = Array[OffsetRange]()
dataStream
  .transform { 
    rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
   }
   .foreachRDD(rdd => { })

Aquí dataStream es un flujo directo (DStream [String]) creado usando la API de KafkaUtils algo así como:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

Si alguien puede ayudarme a entender lo que estoy haciendo mal aquí. transform es el primer método en la cadena de métodos realizados en el flujo de datos como se menciona en la documentación oficial también

Gracias.

Respuestas a la pregunta(1)

Su respuesta a la pregunta