получить тему из сообщения кафки в искре

В нашей работе с потоковым воспроизведением мы читаем сообщения в потоковом режиме с кафки.

Для этого мы используемKafkaUtils.createDirectStream API, который возвращаетJavaPairInputDStreamfrom.

Сообщения читаются из кафки (из трех тем - test1, test2, test3) следующим образом:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
     ,           String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

Мы хотим обрабатывать сообщения из каждой темы по-разному, и для этого нам нужно знать название темы для каждого сообщения.

поэтому мы делаем следующее:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

и это реализацияSplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

Проблема в том, чтоtuple2._1 является нулевым, и мы предположили, чтоtuple2._1 будет содержать некоторые метаданные, такие как название темы / раздела, откуда пришло сообщение.

Тем не менее, когда мы печатаемtuple2._1это ноль.

Наш вопрос - есть ли способ отправить название темы в kafka, чтобы в коде потоковой передачиtuple2._1 будет содержать его (и не будет нулевым)?

Обратите внимание, что мы также попытались получить названия тем из DStream, как указано вучебник по интеграции с Kafka:

Но он возвращает ВСЕ темы, которые были отправлены наKafkaUtils.createDirectStreamа не конкретная тема, откуда поступили сообщения (принадлежащие текущему СДР).

Таким образом, это не помогло нам определить название темы, откуда были отправлены сообщения в RDD.

РЕДАКТИРОВАТЬ

в ответ на ответ Дэвида - я пытался использоватьMessageAndMetadata как это:

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

Проблема в том, что вMessageAndMetadataFunction.call метод. что я должен исправить, чтобы получить соответствующую тему для этого RDD внутриMessageAndMetadataFunction.call метод?

Ответы на вопрос(1)

Ваш ответ на вопрос