Como converter dados do Spark Streaming em Spark DataFrame
Até agora, o Spark não criou o DataFrame para transmitir dados, mas quando estou detectando anomalias, é mais conveniente e rápido usar o DataFrame para análise de dados. Eu fiz essa parte, mas quando tento fazer a detecção de anomalias em tempo real usando dados de streaming, os problemas aparecem. Tentei de várias maneiras e ainda não consegui converter o DStream em DataFrame e também não consigo converter o RDD dentro do DStream em DataFrame.
Aqui está parte da minha versão mais recente do código:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=3)
streaming_df = features_rdd.flatMap(streamrdd_to_df)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Como você pode ver na função main (), quando estou lendo os dados de streaming de entrada usando o método ssc.socketTextStream (), ele gera o DStream, então tentei converter cada indivíduo no DStream em Row, esperando poder converter os dados em DataFrame mais tarde.
Se eu usar ppprint () para imprimir features_rdd aqui, ele funciona, o que me faz pensar que cada indivíduo em features_rdd é um lote de RDD, enquanto todo o features_rdd é um DStream.
Então eu criei o método streamrdd_to_df () e esperava converter cada lote de RDD em dataframe, isso me dá o erro, mostrando:
ERRO StreamingContext: Erro ao iniciar o contexto, marcando-o como parado java.lang.IllegalArgumentException: requisito falhou: Nenhuma operação de saída registrada, portanto, nada para executar
Existe algum pensamento sobre como posso executar operações do DataFrame nos dados de streaming do Spark?