obter tópico da mensagem kafka no spark

Em nosso trabalho de streaming de faísca, lemos mensagens em streaming de kafka.

Para isso, usamos oKafkaUtils.createDirectStream API que retornaJavaPairInputDStreamfrom.

As mensagens são lidas do kafka (de três tópicos - test1, test2, test3) da seguinte maneira:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
     ,           String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

Queremos tratar as mensagens de cada tópico de uma maneira diferente e, para conseguir isso, precisamos saber o nome do tópico para cada mensagem.

então fazemos o seguinte:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

e esta é a implementação doSplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

O problema é que otuple2._1 é nulo e assumimos que otuple2._1 conterá alguns metadados, como o nome do tópico / partição de onde a mensagem veio.

No entanto, quando imprimimostuple2._1, é nulo.

Nossa pergunta - existe uma maneira de enviar o nome do tópico em kafka para que, no código de transmissão em faísca, otuple2._1 irá conter (e não ser nulo)?

Observe que também tentamos obter os nomes dos tópicos no DStream, conforme mencionado notutorial de integração kafka de streaming de faísca:

Mas ele retorna TODOS os tópicos enviados aoKafkaUtils.createDirectStream, e não o tópico específico de onde as mensagens (que pertencem ao RDD atual) chegaram.

Portanto, não nos ajudou a identificar o nome do tópico de onde as mensagens no RDD foram enviadas.

EDITAR

em resposta à resposta de David - tentei usar oMessageAndMetadata como isso:

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

O problema é que nada foi impresso noMessageAndMetadataFunction.call método. o que devo corrigir para obter o tópico relevante para esse RDD dentro doMessageAndMetadataFunction.call método?

questionAnswers(2)

yourAnswerToTheQuestion