RabbitMQ, Pika y estrategia de reconexión

Estoy usando Pika para procesar datos de RabbitMQ. Como parecía tener diferentes tipos de problemas, decidí escribir una pequeña aplicación de prueba para ver cómo puedo manejar las desconexiones.

Escribí esta aplicación de prueba que hace lo siguiente:

Conecte a Broker, vuelva a intentar hasta que tenga éxitoCuando esté conectado, cree una cola.Consumir esta cola y poner el resultado en una cola de Python.Queue (0) Obtenga el elemento de Queue.Queue (0) y vuelva a generarlo en la cola del agente.

Lo que noté fueron 2 problemas:

Cuando ejecuto mi script desde un host que se conecta a rabbitmq en otro host (dentro de un vm), este script se cierra en momentos aleatorios sin producir un error.Cuando ejecuto mi script en el mismo host en el que está instalado RabbitMQ, funciona bien y sigue ejecutándose.

Esto podría explicarse debido a problemas de red, los paquetes se cayeron, aunque considero que la conexión no es realmente sólida.

Cuando el script se ejecuta localmente en el servidor RabbitMQ y yo elimino el RabbitMQ, el script sale con un error: "ERROR pika SelectConnection: Socket Error on 3: 104"

Así que parece que no puedo lograr que la estrategia de reconexión funcione como debería ser. ¿Podría alguien echar un vistazo al código para ver qué estoy haciendo mal?

Gracias

Arrendaj

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant connect. Reason: %s' % err)
                time.sleep(1)

        self.daemon=True
    def run(self):
        while True:
            self.submitData(self.from_broker.get(block=True))
        pass
    def on_connected(self,connection):
        connection.channel(self.on_channel_open)
    def on_channel_open(self,new_channel):
        self.channel = new_channel
        self.channel.queue_declare(queue='sandbox', durable=True)
        self.channel.basic_consume(self.processData, queue='sandbox')    
    def processData(self, ch, method, properties, body):
        self.logging.info('Received data from broker')
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        self.from_broker.put(body)
    def submitData(self,data):
        self.logging.info('Submitting data to broker.')
        self.channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=self.properties)
if __name__ == '__main__':
    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)
    broker=Broker()
    broker.start()
    try:
        broker.connection.ioloop.start()
    except Exception as err:
        print err

Respuestas a la pregunta(2)

Su respuesta a la pregunta