get Thema von kafka Nachricht in Funken

In unserem Spark-Streaming-Job lesen wir Nachrichten im Streaming von kafka.

azu verwenden wir dasKafkaUtils.createDirectStream API, die @ zurückgiJavaPairInputDStreamfrom.

Die Nachrichten werden von kafka (aus drei Themen - test1, test2, test3) folgendermaßen gelesen:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
     ,           String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

Wir möchten Nachrichten von jedem Thema auf unterschiedliche Weise behandeln, und um dies zu erreichen, müssen wir den Themennamen für jede Nachricht kennen.

so machen wir das Folgende:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

und dies ist die Implementierung desSplitToLinesFunction:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

Das Problem ist, dass dastuple2._1 ist null und wir gingen davon aus, dass dastuple2._1 enthält einige Metadaten, z. B. den Namen des Themas / der Partition, von dem / der die Nachricht stammt.

Allerdings, wenn wir druckentuple2._1, es ist null.

Unsere Frage - gibt es eine Möglichkeit, den Themennamen in kafka zu senden, sodass im Funken-Streaming-Code dastuple2._1 wird es enthalten (und nicht null sein)?

Beachten Sie, dass wir auch versucht haben, die Themennamen aus dem DStream abzurufen, wie im @ angegebe Spark-Streaming Kafka-Integration Tutorial:

Aber es werden ALLE Themen zurückgegeben, die an @ gesendet wurdeKafkaUtils.createDirectStream und nicht das spezifische Thema, von dem die Nachrichten (die zum aktuellen RDD gehören) stammen.

So hat es uns nicht geholfen, den Namen des Themas zu identifizieren, von dem aus die Nachrichten im RDD gesendet wurden.

BEARBEITE

als Antwort auf Davids Antwort - Ich habe versucht, das @ zu verwendeMessageAndMetadata so was

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

Das Problem ist, dass im @ nichts gedruckt wurMessageAndMetadataFunction.call Methode. Was muss ich korrigieren, um das relevante Thema für dieses RDD im @ zu erhalteMessageAndMetadataFunction.call Methode

Antworten auf die Frage(4)

Ihre Antwort auf die Frage