SparkStreaming, RabbitMQ und MQTT in Python mit pika

Nur um es knifflig zu machen, möchte ich Nachrichten aus der rabbitMQ-Warteschlange konsumieren. Jetzt weiß ich, dass es ein Plugin für MQTT auf Rabbit gibt https: //www.rabbitmq.com/mqtt.htm).

Ich kann jedoch kein Beispiel erstellen, in dem Spark eine Nachricht verarbeitet, die aus pika erstellt wurde.

Zum Beispiel verwende ich hier das einfache Programm wordcount.py https: //spark.apache.org/docs/1.2.0/streaming-programming-guide.htm) um zu sehen, ob ich eine Nachricht sehen kannProduzen auf die folgende Weise

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

Das SparkstreamingVerbrauche ist das Folgende

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

Abweichend vom einfachen wordcount-Beispiel kann ich dies jedoch nicht zum Laufen bringen und erhalte den folgenden Fehler:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

Also meine Fragen sind, was sollen die Einstellungen in Bezug auf @ seMQTTUtils.createStream(ssc, brokerUrl, topic), um in die Warteschlange zu hören und ob es noch umfassendere Beispiele gibt und wie diese auf die von rabbitMQ abgebildet werden.

Ich verwende meinen Consumer-Code mit:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

Ich habe den Herstellercode wie folgt mit TCP-Parametern aktualisiert, wie in einem Kommentar angegeben:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

und der Funke strömt wie folgt:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()

Antworten auf die Frage(4)

Ihre Antwort auf die Frage