Вперед заполнить пропущенные значения в Spark / Python

Я пытаюсь заполнить пропущенные значения в моем фрейме данных Spark предыдущим ненулевым значением (если оно существует). Я делал подобные вещи в Python / Pandas, но мои данные слишком велики для Pandas (в маленьком кластере), и я Spark Noob. Спарк может это сделать? Это может сделать это для нескольких столбцов? Если да, то как? Если нет, то есть ли предложения по альтернативным подходам в наборе инструментов who Hadoop?

Спасибо!

 chrisaycock30 июн. 2016 г., 21:52
Выглядит такбыл задан ранеебез особого успеха.
 user162457730 июн. 2016 г., 21:55
@ chrisaycock - да, я заметил: / Я думаю, что это будет возможно, хотя.
 Jeff30 июн. 2016 г., 21:59
Я считаю, что это возможно с помощьюWindow, но на самом деле я сейчас работаю над этим концептуально. Хотя, если ваши данные достаточно велики, чтобы нуждаться в кластере, зачем их вменять, а не отбрасывать наблюдения? Имейте в виду, когда вы вменяете, что вы создаете данные, которые не существуют - они используются, но вы все равно должны избегать их, если можете.
 user162457730 июн. 2016 г., 22:09
@JeffL. - В этом проекте мне придется пересылать заполнение, потому что, хотя данные для этой даты / времени в наборе данных не существуют, в этой задаче предполагается, что значения повторяются до тех пор, пока значение не изменится. Ссылка, которую вы послали, интересна .... возможно, сначала нужно изучить Scala: S
 Jeff30 июн. 2016 г., 22:18
Да, я тоже не знаю Скала. Парень, который опубликовал ответ, @ zero323, очень активен в вопросах Spark, так что в конечном итоге он мог бы внести свой вклад.
 Jeff30 июн. 2016 г., 22:00
Похоже, что вы можете сделать это, если сначала конвертировать в RDD, а затем обратно в фрейм данных:stackoverflow.com/questions/33621319/...

Ответы на вопрос(1)

Я нашел решение, которое работает без дополнительного кодирования с помощью окнаВот, ТакДжефф был прав, есть решение. Полный код Boelow, я кратко объясню, что он делает, для более подробной информации просто посмотрите на блог.

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)

# do the fill 
spark_df_filled = df6.withColumn('temperature_filled',  filled_column_temperature)

Так что идея состоит в том, чтобы определить скользящее окно (подробнее о скользящих окнахВот) через данные, которые всегда содержат актуальную строку и ВСЕ предыдущие:

    window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)

Обратите внимание, что мы сортируем по времени, поэтому данные располагаются в правильном порядке. Также обратите внимание, что использование «-sys.maxsize» гарантирует, что окно всегда включает все предыдущие данные и постоянно увеличивается по мере прохождения данных сверху вниз, но могут быть более эффективные решения.

Используя «последнюю» функцию, мы всегда обращаемся к последней строке в этом окне. Передав «ignorenulls = True», мы определяем, что если текущая строка равна нулю, то функция вернет самое последнее (последнее) ненулевое значение в окне. В противном случае используется фактическое значение строки.

Готово.

 thePurplePython30 мая 2019 г., 01:08
@ Ромео Кинцлерstackoverflow.com/questions/56368747/... если вы хотите посмотреть
 thePurplePython29 мая 2019 г., 20:27
Это решение работает хорошо, однако при попытке сохранить данные я получаю следующую ошибкуat scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.execution.SparkPlan.prepare(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200) at org.apache.spark.sql.execution.SparkPlan$$anonfun$prepare$1.apply(SparkPlan.scala:200) у кого есть работа вокруг?
 Mithril16 апр. 2019 г., 08:06
Лучше использоватьWindow.unboundedPreceding вместо-sys.maxsize spark.apache.org/docs/latest/api/python/...

Ваш ответ на вопрос