ElasticSearch para Spark RDD

Eu estava testando a integração do ElasticSearch e Spark na minha máquina local, usando alguns dados de teste carregados na elasticsearch.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

O código funciona bem e retorna o resultado correto com sucesso com esRDD.first ()

No entanto, esRDD.collect () gerará uma exceção:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Acredito que isso esteja relacionado ao problema mencionado aquihttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html então eu adicionei esta linha de acordo

conf.set("spark.serializer", classOf[KryoSerializer].getName)

Devo fazer outra coisa para fazê-lo funcionar? Obrigado

Atualizações: o problema de configuração da serialização foi resolvido. usando

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

ao invés de

conf.set("spark.serializer", classOf[KryoSerializer].getName)

Agora existe outro. Existem 1000 registros distintos neste conjunto de dados.

esRDD.count()

retorna 1000 sem problemas, no entanto

esRDD.distinct().count()

retorna 5! Se eu imprimir os registros

esRDD.foreach(println)

Imprime os 1000 registros corretamente. Mas se eu usar coletar ou levar

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

ele imprimirá registros DUPLICATED e, de fato, apenas 5 registros UNIQUE são exibidos, o que parece ser um subconjunto aleatório de todo o conjunto de dados - não são os cinco primeiros registros. Se eu salvar o RDD e lê-lo novamente

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

O esRDD2 se comporta conforme o esperado. Gostaria de saber se existe um bug ou algo que não entendo sobre o comportamento de coletar / receber. Ou é porque eu estou executando tudo localmente. Por padrão, o Spark RDD parece usar 5 partições, conforme mostrado no número de arquivos part-xxxx do arquivo "spark-output". Provavelmente é por isso que esRDD.collect () e esRDD.distinct () retornaram 5 registros exclusivos, em vez de outro número aleatório. Mas isso ainda não está certo.

questionAnswers(2)

yourAnswerToTheQuestion