ElasticSearch to Spark RDD

Я тестировал интеграцию ElasticSearch и Spark на своем локальном компьютере, используя некоторые тестовые данные, загруженные вasticsearch.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

Код работает нормально и возвращает правильный результат успешно с esRDD.first ()

Однако esRDD.collect () сгенерирует исключение:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Я считаю, что это связано с проблемой, упомянутой здесьhttp://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html поэтому я добавил эту строку соответственно

conf.set("spark.serializer", classOf[KryoSerializer].getName)

Я должен сделать что-то еще, чтобы заставить это работать? Спасибо

Обновления: проблема с настройкой сериализации была решена. используя

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

вместо

conf.set("spark.serializer", classOf[KryoSerializer].getName)

Теперь есть еще одна. В этом наборе данных 1000 различных записей.

esRDD.count()

возвращает 1000 без проблем, однако

esRDD.distinct().count()

возвращает 5! Если я распечатаю записи

esRDD.foreach(println)

Он распечатывает 1000 записей правильно. Но если я использую собирать или брать

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

он будет печатать DUPLICATED записи, и там действительно будет отображаться только 5 УНИКАЛЬНЫХ записей, что, кажется, является случайным подмножеством всего набора данных - это не первые 5 записей. Если я сохраню RDD и прочту его обратно

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

esRDD2 ведет себя как ожидалось. Интересно, есть ли ошибка или что-то, чего я не понимаю в поведении сбора / получения. Или потому что я все запускаю локально. По умолчанию Spark RDD использует 5 разделов, как показано в количестве файлов part-xxxx файла «spark-output». Вероятно, поэтому esRDD.collect () и esRDD.distinct () вернули 5 уникальных записей вместо некоторого другого случайного числа. Но это все еще не правильно.

Ответы на вопрос(2)

Ваш ответ на вопрос