Acceso a la colección de DStreams

Estoy tratando de acceder a una colección de DStreams filtrados obtenidos como en la solución a esta pregunta:Spark Streaming: la mejor manera de dividir el flujo de entrada según el parámetro de filtro

Creo la colección de la siguiente manera:

val statuCodes = Set("200","500", "404")
    spanTagStream.cache()
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))

Trato de accederstatusCodeStreams de la siguiente manera:

for(streamTuple <- statusCodeStreams){
      streamTuple._2.foreachRDD(rdd =>
  rdd.foreachPartition(
      partitionOfRecords =>
        {
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String,String](props)

            partitionOfRecords.foreach
            {
                 x=>{ 
                 /* Code Writing to Kafka using streamTuple._1 as the topic-String */
                 }
            }
      })
   )
}

Al ejecutar esto, recibo el siguiente error: java.io.NotSerializableException:Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

¿Cómo accedo a los Streams para escribir en Kafka de forma serializable?

Respuestas a la pregunta(1)

Su respuesta a la pregunta