Acessando a coleção de DStreams

Estou tentando acessar uma coleção de DStreams filtrados obtidos como na solução para esta pergunta:Spark Streaming - Melhor maneira de dividir o fluxo de entrada com base no filtro Param

Crio a coleção da seguinte maneira:

val statuCodes = Set("200","500", "404")
    spanTagStream.cache()
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))

Eu tento acessarstatusCodeStreams Da seguinte maneira:

for(streamTuple <- statusCodeStreams){
      streamTuple._2.foreachRDD(rdd =>
  rdd.foreachPartition(
      partitionOfRecords =>
        {
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String,String](props)

            partitionOfRecords.foreach
            {
                 x=>{ 
                 /* Code Writing to Kafka using streamTuple._1 as the topic-String */
                 }
            }
      })
   )
}

Ao executar isso, recebo o seguinte erro: java.io.NotSerializableException:Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

Como acesso os Streams para gravar no Kafka de maneira serializável?

questionAnswers(1)

yourAnswerToTheQuestion