https://issues.apache.org/jira/browse/SPARK-10915

у приложение на Python, в котором окно скользит по последовательности значений, каждое из которых имеет метку времени. Я хочу применить функцию к значениям в скользящем окне, чтобы вычислить оценку из N последних значений, как показано на рисунке. Мы уже реализовали эту функцию, используя библиотеку Python для использования графических процессоров.

Я обнаружил, что Apache Spark 2.0 поставляется со структурированной потоковой передачей и поддерживает оконные операции во время события. Если вы хотите прочитать конечную последовательность записей из CSV-файла и хотите сосчитать записи в таком скользящем окне, вы можете использовать следующий код в PySpark:

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .getOrCreate()

schema = StructType() \
    .add('ts', 'timestamp') \
    .add('value', 'double') \

sqlContext = SQLContext(spark)
lines = sqlContext \
    .readStream \
    .format('csv') \
    .schema(schema) \
    .load(path='file:///'+getcwd()+'/csv')

windowedCount = lines.groupBy(
    window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'}) 

query = windowedCount \
   .writeStream \
    .outputMode('complete') \
    .format('console') \
    .start()

query.awaitTermination()

Однако я хочу применить UDAF, отличные от предопределенных функций агрегирования, к скользящим окнам. В соответствии сhttps://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.aggДоступными агрегатными функциями являются только avg, max, min, sum и count.

Это еще не поддерживается? Если да, то когда он будет поддерживаться в PySpark?

https://stackoverflow.com/a/32750733/1564381 показывает, что можно определить UserDefinedAggregateFunction в Java или Scala, а затем вызвать его в PySpark. Это кажется интересным, но я хочу применить свою собственную функцию Python к значениям в скользящих окнах. Я хочу чисто Pythonic путь.

постскриптум дайте мне знать какие-либо фреймворки в Python, кроме PySpark, которые могут решить такие проблемы (применение UDAF для окна, скользящего по потоку).

Ответы на вопрос(1)

Ваш ответ на вопрос