So konvertieren Sie Spark Streaming-Daten in Spark DataFrame

Bisher hat Spark noch keinen DataFrame für das Streaming von Daten erstellt. Wenn ich jedoch Anomalien erkenne, ist es bequemer und schneller, DataFrame für die Datenanalyse zu verwenden. Ich habe diesen Teil erledigt, aber als ich versuche, Anomalien mithilfe von Streaming-Daten in Echtzeit zu erkennen, sind die Probleme aufgetreten. Ich habe verschiedene Möglichkeiten ausprobiert und konnte DStream immer noch nicht in DataFrame konvertieren. Die RDD in DStream konnte auch nicht in DataFrame konvertiert werden.

Hier ist ein Teil meiner neuesten Version des Codes:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

Wie Sie in der main () - Funktion sehen können, generiert die ssc.socketTextStream () - Methode beim Lesen der Streaming-Eingabedaten DStream. Dann habe ich versucht, jedes einzelne Element in DStream in Row zu konvertieren, in der Hoffnung, die Daten konvertieren zu können später in DataFrame.

Wenn ich ppprint () verwende, um hier features_rdd auszudrucken, funktioniert dies, was mich zu der Annahme veranlasst, dass jedes Individuum in features_rdd ein Stapel von RDD ist, während das gesamte features_rdd ein DStream ist.

Dann habe ich die streamrdd_to_df () -Methode erstellt und gehofft, dass jeder RDD-Stapel in einen Datenrahmen konvertiert werden kann. Es wird der folgende Fehler angezeigt:

ERROR StreamingContext: Fehler beim Starten des Kontexts. Markierung als gestoppt. Java.lang.IllegalArgumentException: Anforderung fehlgeschlagen: Keine Ausgabeoperationen registriert, daher nichts auszuführen.

Ist darüber nachgedacht, wie ich DataFrame-Vorgänge für Spark-Streaming-Daten ausführen kann?

Antworten auf die Frage(12)

Ihre Antwort auf die Frage