в
вероятно, проще всего объяснить на примере. Предположим, у меня есть DataFrame для входа пользователей на сайт, например:
scala> df.show(5) +----------------+----------+ | user_name|login_date| +----------------+----------+ |SirChillingtonIV|2012-01-04| |Booooooo99900098|2012-01-04| |Booooooo99900098|2012-01-06| | OprahWinfreyJr|2012-01-10| |SirChillingtonIV|2012-01-11| +----------------+----------+ only showing top 5 rows
Я хотел бы добавить к этому столбец, указывающий, когда они стали активным пользователем на сайте. Но есть одно предостережение: есть период времени, в течение которого пользователь считается активным, и после этого периода, если он снова входит в систему, егоbecame_active
дата сбрасывается. Предположим, этот период5 дней, Тогда желаемая таблица, полученная из приведенной выше таблицы, будет выглядеть примерно так:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Так, в частности, SirChillingtonIV'sbecame_active
дата была сброшена, потому что их второй логин пришел после истечения активного периода, но Booooooo99900098became_active
дата не была сброшена во второй раз, когда он / она вошел в систему, потому что она попала в активный период.
Моей первоначальной мыслью было использовать оконные функции сlag
, а затем с помощьюlag
GED значения для заполненияbecame_active
колонка; например, что-то, начинающееся примерно так:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Затем правило для заполненияbecame_active
дата будет, еслиtmp
являетсяnull
(то есть, если это первый в истории логин) или еслиlogin_date - tmp >= 5
затемbecame_active = login_date
; в противном случае перейдите к следующему последнему значению вtmp
и применить то же правило. Это предполагает рекурсивный подход, который я с трудом представляю, как реализовать.
Мои вопросы: это жизнеспособный подход, и если да, то как я могу «вернуться» и посмотреть на более ранние значенияtmp
пока я не найду один, где я остановлюсь? Насколько мне известно, я не могу перебирать значения Spark SQLColumn
, Есть ли другой способ достичь этого результата?