Função da janela Spark SQL com condição complexa
Provavelmente é mais fácil explicar através do exemplo. Suponha que eu tenha um DataFrame de logins de usuário em um site, por exemplo:
scala> df.show(5) +----------------+----------+ | user_name|login_date| +----------------+----------+ |SirChillingtonIV|2012-01-04| |Booooooo99900098|2012-01-04| |Booooooo99900098|2012-01-06| | OprahWinfreyJr|2012-01-10| |SirChillingtonIV|2012-01-11| +----------------+----------+ only showing top 5 rows
Gostaria de adicionar uma coluna indicando quando eles se tornaram um usuário ativo no site. Mas há uma ressalva: há um período durante o qual um usuário é considerado ativo e, após esse período, se ele efetuar login novamente, suabecame_active
redefinições de data. Suponha que esse período seja5 dias. Então a tabela desejada derivada da tabela acima seria algo como isto:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Então, em particular, os artigos de SirChillingtonIVbecame_active
data foi redefinida porque o segundo login ocorreu após o período ativo expirar, mas as informações de Booooooo99900098became_active
A data não foi redefinida na segunda vez em que ele fez login, porque caiu dentro do período ativo.
Meu pensamento inicial era usar funções de janela comlag
e, em seguida, usando olag
valores definidos para preencher obecame_active
coluna; por exemplo, algo começando aproximadamente como:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Em seguida, a regra para preencher obecame_active
data seria, setmp
énull
(ou seja, se é o primeiro login de sempre) ou selogin_date - tmp >= 5
entãobecame_active = login_date
; caso contrário, vá para o próximo valor mais recente emtmp
e aplique a mesma regra. Isso sugere uma abordagem recursiva, que estou tendo problemas para imaginar uma maneira de implementar.
Minhas perguntas: essa é uma abordagem viável? Em caso afirmativo, como posso "voltar" e analisar os valores anteriores detmp
até encontrar um onde paro? Pelo que sei, não posso iterar através dos valores de um Spark SQLColumn
. Existe outra maneira de alcançar esse resultado?