, Мне даже удалось отфильтровать RDD, присвоить их набору Status и затем выполнить итерацию по этому набору для окончательной обработки.
аюсь получить доступ к коллекции отфильтрованных DStreams, полученных как в решении этого вопроса:Spark Streaming - лучший способ разделения входного потока на основе фильтра Param
Я создаю Коллекцию следующим образом:
val statuCodes = Set("200","500", "404")
spanTagStream.cache()
val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))
Я пытаюсь получить доступstatusCodeStreams
следующим образом:
for(streamTuple <- statusCodeStreams){
streamTuple._2.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
x=>{
/* Code Writing to Kafka using streamTuple._1 as the topic-String */
}
}
})
)
}
При выполнении этого я получаю следующую ошибку: java.io.NotSerializableException:Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects
Как получить доступ к потокам для записи в Kafka в сериализуемом виде?