Suchergebnisse für Anfrage "spark-streaming"
Ist Spark-Streaming funktioniert sowohl mit "cp" als auch mit "mv"
Ich verwende Spark-Streaming Mein Programm liest ständig Streams aus einem Hadoop-Ordner. Das Problem ist, dass der Spark-Job gestartet wird, wenn ich ihn in meinen Hadoop-Ordner kopiere (Hadoop fs -copyFromLocal), aber wenn ich ihn verschiebe ...
get Thema von kafka Nachricht in Funken
In unserem Spark-Streaming-Job lesen wir Nachrichten im Streaming von kafka. azu verwenden wir dasKafkaUtils.createDirectStream API, die @ zurückgiJavaPairInputDStreamfrom. Die Nachrichten werden von kafka (aus drei Themen - test1, test2, ...
Prepare Batch-Anweisung zum Speichern aller Rdd in MySQL, die durch Spark-Streaming generiert wurden
Ich versuche, die aus Dstream generierten Batch-RDDs mithilfe von Spark-Streaming in MySQL einzufügen. Der folgende Code funktioniert gut, aber das Problem dabei ist, dass ich eine Verbindung zum Speichern jedes Tupels erstelle. Also, um zu ...
Spark Streaming: So starten Sie den Empfänger nach einem Empfängerausfall nicht neu
Wir verwenden einen benutzerdefinierten Funkenempfänger, der gestreamte Daten von einem bereitgestellten http-Link liest. Wenn der angegebene http-Link falsch ist, fällt der Empfänger aus. Das Problem ist, dass der Funke den Empfänger ständig ...
Kafka-Themenpartitionen für Spark-Streaming
Ich habe einige Anwendungsfälle, die ich genauer erläutern möchte, zum Thema Kafka-Partitionierung -> Verwendung von Spark-Streaming-Ressourcen. Ich verwende den Spark-Standalone-Modus, daher sind nur die Einstellungen "Gesamtanzahl der ...
Kafka-Thema in einem Spark-Batch-Job lesen
Ich schreibe einen Spark-Stapeljob (v1.6.0), der aus einem Kafka-Thema liest. Dafür kann ichorg.apache.spark.streaming.kafka.KafkaUtils#createRDD Ich muss jedoch die Offsets für alle Partitionen festlegen und sie auch irgendwo speichern (ZK? ...
Spark Dataframe validiert Spaltennamen für Parkettschreibvorgänge (Scala)
Ich verarbeite Ereignisse mit Dataframes, die aus einem Stream von JSON-Ereignissen konvertiert wurden, der schließlich als Parkettformat ausgegeben wird. Einige der JSON-Ereignisse enthalten jedoch Leerzeichen in den Schlüsseln, die ich ...
Connection-Pooling in einer Pyspark-Streaming-Anwendung
Wie werden Verbindungspools in einer Pyspark-Streaming-Anwendung ordnungsgemäß verwendet? Ich lese ...
Spark Stateful Streaming-Job bleibt nach langer Betriebszeit beim Checkpointing auf S3 hängen
Ich habe kürzlich unsere Spark-Streaming-App einem Stresstest unterzogen. Der Stresstest erfasst ungefähr 20.000 Nachrichten pro Sekunde mit Nachrichtengrößen zwischen 200 Byte und 1 KB in Kafka, wobei Spark Streaming alle 4 Sekunden Batches ...
java.io.NotSerializableException im Spark-Streaming mit aktiviertem Checkpointing
code unten: def main(args: Array[String]) { val sc = new SparkContext val sec = Seconds(3) val ssc = new StreamingContext(sc, sec) ssc.checkpoint("./checkpoint") val rdd = ssc.sparkContext.parallelize(Seq("a","b","c")) val inputDStream = ...