Высокоэффективная Искра Рэйчел Уоррен, Холден Карау - Глава 4

аюсь получить последние записи из таблицы с помощью самостоятельного соединения. Работает с использованиемspark-sql но не работает с использованием искрыDataFrame API.

Кто-нибудь может помочь? Это ошибка?

Я использую Spark 2.2.0 в локальном режиме

Создание вводаDataFrame:

scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5))).toDF("id","value","time")
df3: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]    

scala> val df33 = df3
df33: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]

scala> df3.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

scala> df33.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

Теперь выполняем объединение с использованием SQL:работает

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|   aa|   2|
|  2|   bb|   5|
+---+-----+----+

Теперь выполняем объединение с использованием API Dataframe:не работает

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show
+---+-----+----+
| id|value|time|
+---+-----+----+
+---+-----+----+

Обратите внимание на планы объяснения:бланк дляDataFrame API !!

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).explain
== Physical Plan ==
LocalTableScan <empty>, [id#150, value#151, time#152]

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain
== Physical Plan ==
*Project [id#1241, value#1242, time#1243]
+- *SortMergeJoin [id#150], [id#1241], Inner, (time#152 < time#1243)
   :- *Sort [id#150 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#150, 200)
   :     +- *Project [_1#146 AS id#150, _3#148 AS time#152]
   :        +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
   :           +- Scan ExternalRDDScan[obj#145]
   +- *Sort [id#1241 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#1241, 200)
         +- *Project [_1#146 AS id#1241, _2#147 AS value#1242, _3#148 AS time#1243]
            +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
               +- Scan ExternalRDDScan[obj#145]

Ответы на вопрос(1)

Ваш ответ на вопрос