Função da janela Spark SQL com condição complexa

Provavelmente é mais fácil explicar através do exemplo. Suponha que eu tenha um DataFrame de logins de usuário em um site, por exemplo:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

Gostaria de adicionar uma coluna indicando quando eles se tornaram um usuário ativo no site. Mas há uma ressalva: há um período durante o qual um usuário é considerado ativo e, após esse período, se ele efetuar login novamente, suabecame_active redefinições de data. Suponha que esse período seja5 dias. Então a tabela desejada derivada da tabela acima seria algo como isto:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

Então, em particular, os artigos de SirChillingtonIVbecame_active data foi redefinida porque o segundo login ocorreu após o período ativo expirar, mas as informações de Booooooo99900098became_active A data não foi redefinida na segunda vez em que ele fez login, porque caiu dentro do período ativo.

Meu pensamento inicial era usar funções de janela comlage, em seguida, usando olagvalores definidos para preencher obecame_active coluna; por exemplo, algo começando aproximadamente como:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

Em seguida, a regra para preencher obecame_active data seria, setmp énull (ou seja, se é o primeiro login de sempre) ou selogin_date - tmp >= 5 entãobecame_active = login_date; caso contrário, vá para o próximo valor mais recente emtmp e aplique a mesma regra. Isso sugere uma abordagem recursiva, que estou tendo problemas para imaginar uma maneira de implementar.

Minhas perguntas: essa é uma abordagem viável? Em caso afirmativo, como posso "voltar" e analisar os valores anteriores detmp até encontrar um onde paro? Pelo que sei, não posso iterar através dos valores de um Spark SQLColumn. Existe outra maneira de alcançar esse resultado?

questionAnswers(2)

yourAnswerToTheQuestion