Como converter dados do Spark Streaming em Spark DataFrame

Até agora, o Spark não criou o DataFrame para transmitir dados, mas quando estou detectando anomalias, é mais conveniente e rápido usar o DataFrame para análise de dados. Eu fiz essa parte, mas quando tento fazer a detecção de anomalias em tempo real usando dados de streaming, os problemas aparecem. Tentei de várias maneiras e ainda não consegui converter o DStream em DataFrame e também não consigo converter o RDD dentro do DStream em DataFrame.

Aqui está parte da minha versão mais recente do código:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

Como você pode ver na função main (), quando estou lendo os dados de streaming de entrada usando o método ssc.socketTextStream (), ele gera o DStream, então tentei converter cada indivíduo no DStream em Row, esperando poder converter os dados em DataFrame mais tarde.

Se eu usar ppprint () para imprimir features_rdd aqui, ele funciona, o que me faz pensar que cada indivíduo em features_rdd é um lote de RDD, enquanto todo o features_rdd é um DStream.

Então eu criei o método streamrdd_to_df () e esperava converter cada lote de RDD em dataframe, isso me dá o erro, mostrando:

ERRO StreamingContext: Erro ao iniciar o contexto, marcando-o como parado java.lang.IllegalArgumentException: requisito falhou: Nenhuma operação de saída registrada, portanto, nada para executar

Existe algum pensamento sobre como posso executar operações do DataFrame nos dados de streaming do Spark?