Cómo convertir datos de Spark Streaming en Spark DataFrame

Hasta ahora, Spark no ha creado el DataFrame para la transmisión de datos, pero cuando estoy haciendo la detección de anomalías, es más conveniente y rápido usar DataFrame para el análisis de datos. He hecho esta parte, pero cuando trato de hacer una detección de anomalías en tiempo real usando la transmisión de datos, aparecen los problemas. Lo intenté de varias maneras y todavía no podía convertir DStream a DataFrame, y tampoco puedo convertir el RDD dentro de DStream a DataFrame.

Aquí está parte de mi última versión del código:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

Como puede ver en la función main (), cuando leo los datos de transmisión de entrada utilizando el método ssc.socketTextStream (), genera DStream, luego intenté convertir a cada individuo en DStream en Row, con la esperanza de poder convertir los datos en DataFrame más tarde.

Si uso ppprint () para imprimir features_rdd aquí, funciona, lo que me hace pensar que cada individuo en features_rdd es un lote de RDD, mientras que todo features_rdd es un DStream.

Luego creé el método streamrdd_to_df () y esperaba convertir cada lote de RDD en un marco de datos, me da el error, que muestra:

ERROR StreamingContext: error al iniciar el contexto, marcándolo como detenido java.lang.IllegalArgumentException: error fallado: no se registraron operaciones de salida, por lo que no hay nada que ejecutar

¿Se ha pensado cómo puedo realizar operaciones de DataFrame en los datos de transmisión de Spark?