https://issues.apache.org/jira/browse/SPARK-10915
у приложение на Python, в котором окно скользит по последовательности значений, каждое из которых имеет метку времени. Я хочу применить функцию к значениям в скользящем окне, чтобы вычислить оценку из N последних значений, как показано на рисунке. Мы уже реализовали эту функцию, используя библиотеку Python для использования графических процессоров.
Я обнаружил, что Apache Spark 2.0 поставляется со структурированной потоковой передачей и поддерживает оконные операции во время события. Если вы хотите прочитать конечную последовательность записей из CSV-файла и хотите сосчитать записи в таком скользящем окне, вы можете использовать следующий код в PySpark:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd
spark = SparkSession \
.builder \
.master('local[*]') \
.getOrCreate()
schema = StructType() \
.add('ts', 'timestamp') \
.add('value', 'double') \
sqlContext = SQLContext(spark)
lines = sqlContext \
.readStream \
.format('csv') \
.schema(schema) \
.load(path='file:///'+getcwd()+'/csv')
windowedCount = lines.groupBy(
window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'})
query = windowedCount \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
Однако я хочу применить UDAF, отличные от предопределенных функций агрегирования, к скользящим окнам. В соответствии сhttps://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.aggДоступными агрегатными функциями являются только avg, max, min, sum и count.
Это еще не поддерживается? Если да, то когда он будет поддерживаться в PySpark?
https://stackoverflow.com/a/32750733/1564381 показывает, что можно определить UserDefinedAggregateFunction в Java или Scala, а затем вызвать его в PySpark. Это кажется интересным, но я хочу применить свою собственную функцию Python к значениям в скользящих окнах. Я хочу чисто Pythonic путь.
постскриптум дайте мне знать какие-либо фреймворки в Python, кроме PySpark, которые могут решить такие проблемы (применение UDAF для окна, скользящего по потоку).