ElasticSearch, um RDD zu aktivieren
Ich habe die ElasticSearch- und Spark-Integration auf meinem lokalen Computer mit einigen in elasticsearch geladenen Testdaten getestet.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
Der Code läuft einwandfrei und gibt mit esRDD.first () das richtige Ergebnis zurück.
EsRDD.collect () generiert jedoch eine Ausnahme:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Ich glaube, das hängt mit dem hier erwähnten Problem zusammenhttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html Also habe ich diese Zeile entsprechend hinzugefügt
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Soll ich noch etwas tun, damit es funktioniert? Danke
Updates: Das Serialziation-Setup-Problem wurde behoben. durch die Nutzung
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
Anstatt von
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Jetzt gibt es noch einen weiteren Datensatz. Dieser Datensatz enthält 1000 unterschiedliche Datensätze
esRDD.count()
gibt 1000 aber kein problem zurück
esRDD.distinct().count()
gibt 5 zurück! Wenn ich die Aufzeichnungen drucke
esRDD.foreach(println)
Es druckt die 1000 Datensätze korrekt aus. Aber wenn ich sammle oder nehme
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
Es werden DUPLICATED-Datensätze gedruckt, und es werden tatsächlich nur 5 UNIQUE-Datensätze angezeigt. Dies scheint eine zufällige Untermenge des gesamten Datensatzes zu sein - es sind nicht die ersten 5 Datensätze. Wenn ich das RDD speichere und es zurücklese
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
esRDD2 verhält sich wie erwartet. Ich frage mich, ob es einen Fehler gibt oder etwas, was ich nicht über das Verhalten beim Sammeln / Nehmen verstehe. Oder weil ich alles lokal betreibe. Standardmäßig scheint Spark RDD 5 Partitionen zu verwenden, wie in der Anzahl der part-xxxx-Dateien der "spark-output" -Datei angegeben. Das ist wahrscheinlich der Grund, warum esRDD.collect () und esRDD.distinct () anstelle einer anderen Zufallszahl 5 eindeutige Datensätze zurückgaben. Aber das stimmt immer noch nicht.