в

вероятно, проще всего объяснить на примере. Предположим, у меня есть DataFrame для входа пользователей на сайт, например:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

Я хотел бы добавить к этому столбец, указывающий, когда они стали активным пользователем на сайте. Но есть одно предостережение: есть период времени, в течение которого пользователь считается активным, и после этого периода, если он снова входит в систему, егоbecame_active дата сбрасывается. Предположим, этот период5 дней, Тогда желаемая таблица, полученная из приведенной выше таблицы, будет выглядеть примерно так:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

Так, в частности, SirChillingtonIV'sbecame_active дата была сброшена, потому что их второй логин пришел после истечения активного периода, но Booooooo99900098became_active дата не была сброшена во второй раз, когда он / она вошел в систему, потому что она попала в активный период.

Моей первоначальной мыслью было использовать оконные функции сlag, а затем с помощьюlagGED значения для заполненияbecame_active колонка; например, что-то, начинающееся примерно так:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

Затем правило для заполненияbecame_active дата будет, еслиtmp являетсяnull (то есть, если это первый в истории логин) или еслиlogin_date - tmp >= 5 затемbecame_active = login_date; в противном случае перейдите к следующему последнему значению вtmp и применить то же правило. Это предполагает рекурсивный подход, который я с трудом представляю, как реализовать.

Мои вопросы: это жизнеспособный подход, и если да, то как я могу «вернуться» и посмотреть на более ранние значенияtmp пока я не найду один, где я остановлюсь? Насколько мне известно, я не могу перебирать значения Spark SQLColumn, Есть ли другой способ достичь этого результата?

Ответы на вопрос(2)

Ваш ответ на вопрос