Искровой структурированный поток и регрессия Spark-Ml

ся, это должно быть очевидно, но при просмотре документов и примеров я не уверен, что смогу найти способ взять структурированный поток и преобразовать его с помощью PySpark.

Например:

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName('StreamingWordCount')
    .getOrCreate()
)

raw_records = (
    spark
    .readStream
    .format('socket')
    .option('host', 'localhost')
    .option('port', 9999)
    .load()
)

# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()

counts = (
    records
    .groupBy(records.value)
    .count()
)

query = (
    counts
    .writeStream
    .outputMode('complete')
    .format('console')
    .start()
)
query.awaitTermination()

Это вызовет следующее исключение:

Queries with streaming sources must be executed with writeStream.start

Тем не менее, если я удалю вызовrdd.map(...).toDF() Кажется, все работает нормально.

Кажется, как будто призыв кrdd.map разветвленное выполнение из потокового контекста и заставляет Spark предупреждать, что оно никогда не запускалось?

Есть ли «правильный» способ подачи заявки?map или жеmapPartition преобразования стиля с использованием структурированного потокового вещания и PySpark?

Ответы на вопрос(1)

Ваш ответ на вопрос