En Pika o RabbitMQ, ¿cómo verifico si algún consumidor está consumiendo actualmente?

Me gustaría comprobar si unConsumidor / Trabajador está presente para consumir unMensaje Estoy a punto de enviar.

Si no hayObrero, Comenzaría algunos trabajadores (tanto los consumidores como los editores están en una sola máquina) y luego continuaría publicandoMensajes.

Si hay una función comoconnection.check_if_has_consumers, Lo implementaría de esta manera -

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",

Pero no puedo encontrar ninguna función concheck_if_has_consumers funcionalidad enpika.

¿Hay alguna manera de lograr esto, usandopika? o tal vez, porhablando aEl conejo ¿directamente?

No estoy completamente seguro, pero realmente piensoRabbitMQ sería consciente de la cantidad de consumidores suscritos a diferentes colas, ya que se despachamensajes a ellos y aceptaack

Acabo de empezar conRabbitMQ Hace 3 horas ... cualquier ayuda es bienvenida ...

aquí está elworkers.py Código que escribí, si es de alguna ayuda ....

import multiprocessing
import pika

def start_workers(num=3):
    """start workers as non-daemon processes"""
    for i in xrange(num):    
        process = WorkerProcess()

class WorkerProcess(multiprocessing.Process):
    worker process that waits infinitly for task msgs and calls
    the `callback` whenever it gets a msg
    def __init__(self):
        self.stop_working = multiprocessing.Event()

    def run(self):
        worker method, open a channel through a pika connection and
        start consuming
        connection = pika.BlockingConnection(
        channel = connection.channel()
        channel.queue_declare(queue='worker_queue', auto_delete=False,

        # don't give work to one worker guy until he's finished
        channel.basic_consume(callback, queue='worker_queue')

        # do what `channel.start_consuming()` does but with stopping signal
        while len(channel._consumers) and not self.stop_working.is_set():

        return 0

    def signal_exit(self):
        """exit when finished with current loop"""

    def exit(self):
        """exit worker, blocks until worker is finished and dead"""
        while self.is_alive(): # checking `is_alive()` on zombies kills them

    def kill(self):
        """kill now! should not use this, might create problems"""

def callback(channel, method, properties, body):
    """pika basic consume callback"""
    print 'GOT:', body
    # do some heavy lifting here
    result = save_to_database(body)
    print 'DONE:', result


Tengo que seguir adelante, así que aquí hay una solución que voy a tomar, a menos que se presente un mejor enfoque,

Asi que,RabbitMQ tiene estosGestión de HTTP apis, funcionan después de que hayas encendido elcomplemento de gestión y en medio de la página HTTP apis hay

/ api / connections - Una lista de todas las conexiones abiertas.

/ api / connections / name - Una conexión individual. BORRAR se cerrará la conexión.

Así que, si conecto miTrabajadores y miProduce tanto por diferenteConexión nombres / usuarios, podré comprobar si elConexión del trabajador está abierto ... (puede haber problemas cuando el trabajador muere ...)

Estaremos esperando una mejor solución ...


acabo de encontrar esto en la documentación de rabbitmq, pero sería un error para Python:

shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue    0

para que yo pudiera hacer algo como

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")

hacky ... aún espero que pika tenga alguna función de python para hacer esto ...


