get Thema von kafka Nachricht in Funken
In unserem Spark-Streaming-Job lesen wir Nachrichten im Streaming von kafka.
azu verwenden wir dasKafkaUtils.createDirectStream
API, die @ zurückgiJavaPairInputDStreamfrom
.
Die Nachrichten werden von kafka (aus drei Themen - test1, test2, test3) folgendermaßen gelesen:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
, String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Wir möchten Nachrichten von jedem Thema auf unterschiedliche Weise behandeln, und um dies zu erreichen, müssen wir den Themennamen für jede Nachricht kennen.
so machen wir das Folgende:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
und dies ist die Implementierung desSplitToLinesFunction
:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
Das Problem ist, dass dastuple2._1
ist null und wir gingen davon aus, dass dastuple2._1
enthält einige Metadaten, z. B. den Namen des Themas / der Partition, von dem / der die Nachricht stammt.
Allerdings, wenn wir druckentuple2._1
, es ist null.
Unsere Frage - gibt es eine Möglichkeit, den Themennamen in kafka zu senden, sodass im Funken-Streaming-Code dastuple2._1
wird es enthalten (und nicht null sein)?
Beachten Sie, dass wir auch versucht haben, die Themennamen aus dem DStream abzurufen, wie im @ angegebe Spark-Streaming Kafka-Integration Tutorial:
Aber es werden ALLE Themen zurückgegeben, die an @ gesendet wurdeKafkaUtils.createDirectStream
und nicht das spezifische Thema, von dem die Nachrichten (die zum aktuellen RDD gehören) stammen.
So hat es uns nicht geholfen, den Namen des Themas zu identifizieren, von dem aus die Nachrichten im RDD gesendet wurden.
BEARBEITE
als Antwort auf Davids Antwort - Ich habe versucht, das @ zu verwendeMessageAndMetadata
so was
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
Das Problem ist, dass im @ nichts gedruckt wurMessageAndMetadataFunction.call
Methode. Was muss ich korrigieren, um das relevante Thema für dieses RDD im @ zu erhalteMessageAndMetadataFunction.call
Methode