obter tópico da mensagem kafka no spark
Em nosso trabalho de streaming de faísca, lemos mensagens em streaming de kafka.
Para isso, usamos oKafkaUtils.createDirectStream
API que retornaJavaPairInputDStreamfrom
.
As mensagens são lidas do kafka (de três tópicos - test1, test2, test3) da seguinte maneira:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
, String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Queremos tratar as mensagens de cada tópico de uma maneira diferente e, para conseguir isso, precisamos saber o nome do tópico para cada mensagem.
então fazemos o seguinte:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
e esta é a implementação doSplitToLinesFunction
:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
O problema é que otuple2._1
é nulo e assumimos que otuple2._1
conterá alguns metadados, como o nome do tópico / partição de onde a mensagem veio.
No entanto, quando imprimimostuple2._1
, é nulo.
Nossa pergunta - existe uma maneira de enviar o nome do tópico em kafka para que, no código de transmissão em faísca, otuple2._1
irá conter (e não ser nulo)?
Observe que também tentamos obter os nomes dos tópicos no DStream, conforme mencionado notutorial de integração kafka de streaming de faísca:
Mas ele retorna TODOS os tópicos enviados aoKafkaUtils.createDirectStream
, e não o tópico específico de onde as mensagens (que pertencem ao RDD atual) chegaram.
Portanto, não nos ajudou a identificar o nome do tópico de onde as mensagens no RDD foram enviadas.
EDITAR
em resposta à resposta de David - tentei usar oMessageAndMetadata
como isso:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
O problema é que nada foi impresso noMessageAndMetadataFunction.call
método. o que devo corrigir para obter o tópico relevante para esse RDD dentro doMessageAndMetadataFunction.call
método?