obtener el tema del mensaje kafka en chispa
En nuestro trabajo de transmisión de chispas, leemos mensajes en transmisión desde kafka.
Para esto, usamos elKafkaUtils.createDirectStream
API que devuelveJavaPairInputDStreamfrom
.
Los mensajes se leen de kafka (de tres temas: prueba1, prueba2, prueba3) de la siguiente manera:
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
, String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Queremos manejar los mensajes de cada tema de una manera diferente, y para lograr esto necesitamos saber el nombre del tema para cada mensaje.
entonces hacemos lo siguiente:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
y esta es la implementación de laSplitToLinesFunction
:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
El problema es que eltuple2._1
es nulo y asumimos que eltuple2._1
contendrá algunos metadatos, como el nombre del tema / partición de donde proviene el mensaje.
Sin embargo, cuando imprimimostuple2._1
Es nulo.
Nuestra pregunta: ¿hay alguna manera de enviar el nombre del tema en kafka para que en el código de transmisión de chispa, eltuple2._1
lo contendrá (y no será nulo)?
Tenga en cuenta que también intentamos obtener los nombres de los temas de DStream como se menciona en eltutorial de integración de kafka con transmisión por chispa:
Pero devuelve TODOS los temas que se enviaron alKafkaUtils.createDirectStream
, y no el tema específico de donde provienen los mensajes (que pertenecen al RDD actual).
Por lo tanto, no nos ayudó a identificar el nombre del tema desde donde se enviaron los mensajes en el RDD.
EDITAR
en respuesta a la respuesta de David: intenté usar elMessageAndMetadata
Me gusta esto:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
El problema es que no se imprimió nada en elMessageAndMetadataFunction.call
método. ¿Qué debo arreglar para obtener el tema relevante para ese RDD dentro delMessageAndMetadataFunction.call
¿método?