Искровой структурированный поток и регрессия Spark-Ml
ся, это должно быть очевидно, но при просмотре документов и примеров я не уверен, что смогу найти способ взять структурированный поток и преобразовать его с помощью PySpark.
Например:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)
raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)
# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()
counts = (
records
.groupBy(records.value)
.count()
)
query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()
Это вызовет следующее исключение:
Queries with streaming sources must be executed with writeStream.start
Тем не менее, если я удалю вызовrdd.map(...).toDF()
Кажется, все работает нормально.
Кажется, как будто призыв кrdd.map
разветвленное выполнение из потокового контекста и заставляет Spark предупреждать, что оно никогда не запускалось?
Есть ли «правильный» способ подачи заявки?map
или жеmapPartition
преобразования стиля с использованием структурированного потокового вещания и PySpark?