Как преобразовать данные Spark Streaming в Spark DataFrame

До сих пор Spark не создавал DataFrame для потоковой передачи данных, но когда я занимаюсь обнаружением аномалий, удобнее и быстрее использовать DataFrame для анализа данных. Я выполнил эту часть, но когда я попытался обнаружить аномалии в реальном времени, используя потоковые данные, возникли проблемы. Я попробовал несколько способов, но все еще не смог преобразовать DStream в DataFrame, и также не смог преобразовать RDD внутри DStream в DataFrame.

Вот часть моей последней версии кода:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

Как вы можете видеть в функции main (), когда я читаю входные потоковые данные с помощью метода ssc.socketTextStream (), он генерирует DStream, затем я попытался преобразовать каждого отдельного человека в DStream в строку, надеясь, что смогу преобразовать данные в DataFrame позже.

Если я использую ppprint (), чтобы распечатать здесь features_rdd, это работает, что заставляет меня думать, что каждый человек в features_rdd является пакетом RDD, а весь features_rdd является DStream.

Затем я создал метод streamrdd_to_df () и надеялся преобразовать каждый пакет RDD в фрейм данных, он выдает ошибку, показывающую:

ОШИБКА StreamingContext: Ошибка запуска контекста, пометка его как остановленного java.lang.IllegalArgumentException: требование не выполнено: не зарегистрированы выходные операции, поэтому ничего не выполняется

Есть ли мысли о том, как я могу выполнять операции DataFrame с потоковой передачей данных Spark?