Spark - ¿Ventana con recursividad? - Propagación condicional de valores entre filas
Tengo el siguiente marco de datos que muestra los ingresos de las compras.
+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
| 1| 1| 0|
| 1| 2| 0|
| 1| 3| 0|
| 1| 4| 100|
| 1| 5| 0|
| 1| 6| 0|
| 1| 7| 200|
| 1| 8| 0|
| 1| 9| 10|
+-------+--------+-------+
Finalmente quiero la nueva columnapurch_revenue
para mostrar los ingresos generados por la compra en cada fila. Como solución alternativa, también he intentado introducir un identificador de comprapurch_id
que se incrementa cada vez que se realiza una compra. Entonces esto se enumera solo como referencia.
+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
| 1| 1| 0| 100| 1|
| 1| 2| 0| 100| 1|
| 1| 3| 0| 100| 1|
| 1| 4| 100| 100| 1|
| 1| 5| 0| 100| 2|
| 1| 6| 0| 100| 2|
| 1| 7| 200| 100| 2|
| 1| 8| 0| 100| 3|
| 1| 9| 10| 100| 3|
+-------+--------+-------+-------------+--------+
He tratado de usar ellag/lead
funciona así:
user_timeline = Window.partitionBy("user_id").orderBy("visit_id")
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))\
.otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline))
df.withColumn("purch_revenue", find_rev)
Esto duplica la columna de ingresos sirevenue > 0
y también lo levanta una fila. Claramente, puedo encadenar esto por un N finito, pero eso no es una solución.
revenue > 0
?Alternativamente, ¿hay alguna manera de incrementar un valor basado en una condición? Traté de encontrar una manera de hacerlo, pero tuve problemas para encontrar uno.