ElasticSearch to Spark RDD
Я тестировал интеграцию ElasticSearch и Spark на своем локальном компьютере, используя некоторые тестовые данные, загруженные вasticsearch.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
Код работает нормально и возвращает правильный результат успешно с esRDD.first ()
Однако esRDD.collect () сгенерирует исключение:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Я считаю, что это связано с проблемой, упомянутой здесьhttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html поэтому я добавил эту строку соответственно
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Я должен сделать что-то еще, чтобы заставить это работать? Спасибо
Обновления: проблема с настройкой сериализации была решена. используя
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
вместо
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Теперь есть еще одна. В этом наборе данных 1000 различных записей.
esRDD.count()
возвращает 1000 без проблем, однако
esRDD.distinct().count()
возвращает 5! Если я распечатаю записи
esRDD.foreach(println)
Он распечатывает 1000 записей правильно. Но если я использую собирать или брать
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
он будет печатать DUPLICATED записи, и там действительно будет отображаться только 5 УНИКАЛЬНЫХ записей, что, кажется, является случайным подмножеством всего набора данных - это не первые 5 записей. Если я сохраню RDD и прочту его обратно
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
esRDD2 ведет себя как ожидалось. Интересно, есть ли ошибка или что-то, чего я не понимаю в поведении сбора / получения. Или потому что я все запускаю локально. По умолчанию Spark RDD использует 5 разделов, как показано в количестве файлов part-xxxx файла «spark-output». Вероятно, поэтому esRDD.collect () и esRDD.distinct () вернули 5 уникальных записей вместо некоторого другого случайного числа. Но это все еще не правильно.