obtener el tema del mensaje kafka en chispa

En nuestro trabajo de transmisión de chispas, leemos mensajes en transmisión desde kafka.

Para esto, usamos elKafkaUtils.createDirectStream API que devuelveJavaPairInputDStreamfrom.

Los mensajes se leen de kafka (de tres temas: prueba1, prueba2, prueba3) de la siguiente manera:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
     ,           String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

Queremos manejar los mensajes de cada tema de una manera diferente, y para lograr esto necesitamos saber el nombre del tema para cada mensaje.

entonces hacemos lo siguiente:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

y esta es la implementación de laSplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

El problema es que eltuple2._1 es nulo y asumimos que eltuple2._1 contendrá algunos metadatos, como el nombre del tema / partición de donde proviene el mensaje.

Sin embargo, cuando imprimimostuple2._1Es nulo.

Nuestra pregunta: ¿hay alguna manera de enviar el nombre del tema en kafka para que en el código de transmisión de chispa, eltuple2._1 lo contendrá (y no será nulo)?

Tenga en cuenta que también intentamos obtener los nombres de los temas de DStream como se menciona en eltutorial de integración de kafka con transmisión por chispa:

Pero devuelve TODOS los temas que se enviaron alKafkaUtils.createDirectStream, y no el tema específico de donde provienen los mensajes (que pertenecen al RDD actual).

Por lo tanto, no nos ayudó a identificar el nombre del tema desde donde se enviaron los mensajes en el RDD.

EDITAR

en respuesta a la respuesta de David: intenté usar elMessageAndMetadata Me gusta esto:

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

El problema es que no se imprimió nada en elMessageAndMetadataFunction.call método. ¿Qué debo arreglar para obtener el tema relevante para ese RDD dentro delMessageAndMetadataFunction.call ¿método?

Respuestas a la pregunta(2)

Su respuesta a la pregunta