ElasticSearch, um RDD zu aktivieren

Ich habe die ElasticSearch- und Spark-Integration auf meinem lokalen Computer mit einigen in elasticsearch geladenen Testdaten getestet.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

Der Code läuft einwandfrei und gibt mit esRDD.first () das richtige Ergebnis zurück.

EsRDD.collect () generiert jedoch eine Ausnahme:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Ich glaube, das hängt mit dem hier erwähnten Problem zusammenhttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html Also habe ich diese Zeile entsprechend hinzugefügt

conf.set("spark.serializer", classOf[KryoSerializer].getName)

Soll ich noch etwas tun, damit es funktioniert? Danke

Updates: Das Serialziation-Setup-Problem wurde behoben. durch die Nutzung

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

Anstatt von

conf.set("spark.serializer", classOf[KryoSerializer].getName)

Jetzt gibt es noch einen weiteren Datensatz. Dieser Datensatz enthält 1000 unterschiedliche Datensätze

esRDD.count()

gibt 1000 aber kein problem zurück

esRDD.distinct().count()

gibt 5 zurück! Wenn ich die Aufzeichnungen drucke

esRDD.foreach(println)

Es druckt die 1000 Datensätze korrekt aus. Aber wenn ich sammle oder nehme

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

Es werden DUPLICATED-Datensätze gedruckt, und es werden tatsächlich nur 5 UNIQUE-Datensätze angezeigt. Dies scheint eine zufällige Untermenge des gesamten Datensatzes zu sein - es sind nicht die ersten 5 Datensätze. Wenn ich das RDD speichere und es zurücklese

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

esRDD2 verhält sich wie erwartet. Ich frage mich, ob es einen Fehler gibt oder etwas, was ich nicht über das Verhalten beim Sammeln / Nehmen verstehe. Oder weil ich alles lokal betreibe. Standardmäßig scheint Spark RDD 5 Partitionen zu verwenden, wie in der Anzahl der part-xxxx-Dateien der "spark-output" -Datei angegeben. Das ist wahrscheinlich der Grund, warum esRDD.collect () und esRDD.distinct () anstelle einer anderen Zufallszahl 5 eindeutige Datensätze zurückgaben. Aber das stimmt immer noch nicht.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage