получить тему из сообщения кафки в искре
В нашей работе с потоковым воспроизведением мы читаем сообщения в потоковом режиме с кафки.
Для этого мы используемKafkaUtils.createDirectStream
API, который возвращаетJavaPairInputDStreamfrom
.
Сообщения читаются из кафки (из трех тем - test1, test2, test3) следующим образом:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
, String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Мы хотим обрабатывать сообщения из каждой темы по-разному, и для этого нам нужно знать название темы для каждого сообщения.
поэтому мы делаем следующее:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
и это реализацияSplitToLinesFunction
:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
Проблема в том, чтоtuple2._1
является нулевым, и мы предположили, чтоtuple2._1
будет содержать некоторые метаданные, такие как название темы / раздела, откуда пришло сообщение.
Тем не менее, когда мы печатаемtuple2._1
это ноль.
Наш вопрос - есть ли способ отправить название темы в kafka, чтобы в коде потоковой передачиtuple2._1
будет содержать его (и не будет нулевым)?
Обратите внимание, что мы также попытались получить названия тем из DStream, как указано вучебник по интеграции с Kafka:
Но он возвращает ВСЕ темы, которые были отправлены наKafkaUtils.createDirectStream
а не конкретная тема, откуда поступили сообщения (принадлежащие текущему СДР).
Таким образом, это не помогло нам определить название темы, откуда были отправлены сообщения в RDD.
РЕДАКТИРОВАТЬ
в ответ на ответ Дэвида - я пытался использоватьMessageAndMetadata
как это:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
Проблема в том, что вMessageAndMetadataFunction.call
метод. что я должен исправить, чтобы получить соответствующую тему для этого RDD внутриMessageAndMetadataFunction.call
метод?