¿Cómo transformar DataFrame antes de unirse a la operación?
El siguiente código se usa para extraer rangos de la columnaproducts
. Los rangos son segundos números en cada par[...]
. Por ejemplo, en el ejemplo dado[[222,66],[333,55]]
los rangos son66
y55
para productos con PK222
y333
, en consecuencia. Pero el código en Spark 2.2 funciona muy lentamente cuandodf_products
es de alrededor de 800 Mb:
df_products.createOrReplaceTempView("df_products")
val result = df.as("df2")
.join(spark.sql("SELECT * FROM df_products")
.select($"product_PK", explode($"products").as("products"))
.withColumnRenamed("product_PK","product_PK_temp").as("df1"),$"df2.product _PK" === $"df1.product_PK_temp" and $"df2.rec_product_PK" === $"df1.products.product_PK", "left")
.drop($"df1.product_PK_temp")
.select($"product_PK", $"rec_product_PK", coalesce($"df1.products.col2", lit(0.0)).as("rank_product"))
Esta es una pequeña muestra dedf_products
ydf
:
df_products =
+----------+--------------------+
|product_PK| products|
+----------+--------------------+
| 111|[[222,66],[333,55...|
| 222|[[333,24],[444,77...|
...
+----------+--------------------+
df =
+----------+-----------------+
|product_PK| rec_product_PK|
+----------+-----------------+
| 111| 222|
| 222| 888|
+----------+-----------------+
El código anterior funciona bien cuando las matrices en cada fila deproducts
contiene una pequeña cantidad de elementos. Pero cuando hay muchos elementos en las matrices de cada fila[[..],[..],...]
, entonces el código parece atascarse y no avanza.
¿Cómo puedo optimizar el código? Cualquier ayuda es muy apreciada.
¿Es posible, por ejemplo, transformardf_products
en el siguiente DataFrame antes de unirse?
df_products =
+----------+--------------------+------+
|product_PK| rec_product_PK| rank|
+----------+--------------------+------+
| 111| 222| 66|
| 111| 333| 55|
| 222| 333| 24|
| 222| 444| 77|
...
+----------+--------------------+------+