Wie werden Daten von Kafka an Spark Streaming übergeben?

Ich versuche, Daten von Kafka an Spark-Streaming zu übergeben.

Das habe ich bis jetzt gemacht:

Installed bothkafka undsparkGestartetzookeeper mit Standardeigenschaften configGestartetkafka server mit Standardeigenschaften configGestartetkafka producerGestartetkafka consumerSendung einer Nachricht vom Produzenten an den Verbraucher. Funktioniert gutSchrieb kafka-spark.py, um Nachrichten von kafka to spark zu erhalten. Ich versuche es mit./bin/spark-submit examples/src/main/python/kafka-spark.pyIch erhalte eine Fehlermeldung.

kafka-spark.py -

from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
    conf = SparkConf().setAppName("Kafka-Spark")
    #sc = SparkContext(appName="KafkaSpark")
    sc = SparkContext(conf=conf)
    stream=StreamingContext(sc,1)
    map1={'spark-kafka':1}
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too

    print("kafkastream=",kafkaStream)
    sc.stop()

Full Log einschließlich des Fehlers beim Ausführen von spark-kafka.py

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/18 13:05:33 INFO SparkContext: Running Spark version 1.6.0
16/01/18 13:05:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/18 13:05:33 INFO SecurityManager: Changing view acls to: username
16/01/18 13:05:33 INFO SecurityManager: Changing modify acls to: username
16/01/18 13:05:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); users with modify permissions: Set(username)
16/01/18 13:05:33 INFO Utils: Successfully started service 'sparkDriver' on port 54446.
16/01/18 13:05:34 INFO Slf4jLogger: Slf4jLogger started
16/01/18 13:05:34 INFO Remoting: Starting remoting
16/01/18 13:05:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50386]
16/01/18 13:05:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50386.
16/01/18 13:05:34 INFO SparkEnv: Registering MapOutputTracker
16/01/18 13:05:34 INFO SparkEnv: Registering BlockManagerMaster
16/01/18 13:05:34 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-f5490271-cdb7-467d-a915-4f5ccab57f0e
16/01/18 13:05:34 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/01/18 13:05:34 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/18 13:05:34 INFO Server: jetty-8.y.z-SNAPSHOT
16/01/18 13:05:34 INFO AbstractConnector: Started [email protected]:4040
16/01/18 13:05:34 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/18 13:05:34 INFO SparkUI: Started SparkUI at http://127.0.0.1:4040
Java HotSpot(TM) Server VM warning: You have loaded library /tmp/libnetty-transport-native-epoll561240765619860252.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
16/01/18 13:05:34 INFO Utils: Copying ~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py to /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/userFiles-e93fc252-0ba1-42b7-b4fa-2e46f3a0601e/kafka-spark.py
16/01/18 13:05:34 INFO SparkContext: Added file file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py at file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py with timestamp 1453118734892
16/01/18 13:05:35 INFO Executor: Starting executor ID driver on host localhost
16/01/18 13:05:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58970.
16/01/18 13:05:35 INFO NettyBlockTransferService: Server created on 58970
16/01/18 13:05:35 INFO BlockManagerMaster: Trying to register BlockManager
16/01/18 13:05:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58970 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58970)
16/01/18 13:05:35 INFO BlockManagerMaster: Registered BlockManager

________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6.0 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.6.0.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py", line 33, in <module>
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1)
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 80, in createStream
py4j.protocol.Py4JJavaError: An error occurred while calling o22.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

16/01/18 13:05:35 INFO SparkContext: Invoking stop() from shutdown hook
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}
16/01/18 13:05:35 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
16/01/18 13:05:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/18 13:05:35 INFO MemoryStore: MemoryStore cleared
16/01/18 13:05:35 INFO BlockManager: BlockManager stopped
16/01/18 13:05:35 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/18 13:05:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/01/18 13:05:35 INFO SparkContext: Successfully stopped SparkContext
16/01/18 13:05:35 INFO ShutdownHookManager: Shutdown hook called
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/pyspark-fcd47a97-57ef-46c3-bb16-357632580334

BEARBEITE

On running./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar examples/src/main/python/kafka-spark.py Ich erhalte die HEXADECIMAL-Position anstelle der tatsächlichen Zeichenfolge:

kafkastream= <pyspark.streaming.dstream.TransformedDStream object at 0x7fd6c4dad150>

Eine Idee, was mache ich falsch? Ich bin wirklich neu in Kakfa und Funken, also brauche ich hier etwas Hilfe. Vielen Dank

Antworten auf die Frage(6)

Ihre Antwort auf die Frage