В Pika или RabbitMQ, как я могу проверить, потребляют ли какие-либо потребители в настоящее время?

Я хотел бы проверить, еслиПотребитель / работник присутствует, чтобы потреблятьСообщение Я собираюсь отправить.

Если нетработникЯ бы начал несколько работников (и потребители и издатели находятся на одной машине), а затем приступить к публикацииСообщения.

Если есть такая функция, какconnection.check_if_has_consumersЯ бы реализовал это примерно так -

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`
    workers.start_workers()

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
                            properties=pika.BasicProperties(delivery_mode=2))
connection.close()

Но я не могу найти какую-либо функцию сcheck_if_has_consumers функциональность впищуха.

Есть ли способ сделать это, используяпищуха? или, может быть,говорящий вКролик напрямую?

Я не совсем уверен, но я действительно думаю,RabbitMQ будет знать о количестве потребителей, подписавшихся на разные очереди, так как он отправляетСообщения им и принимаетACKs

Я только началRabbitMQ 3 часа назад ... любая помощь приветствуется ...

здесьworkers.py код, который я написал, если это поможет ....

import multiprocessing
import pika


def start_workers(num=3):
    """start workers as non-daemon processes"""
    for i in xrange(num):    
        process = WorkerProcess()
        process.start()


class WorkerProcess(multiprocessing.Process):
    """
    worker process that waits infinitly for task msgs and calls
    the `callback` whenever it gets a msg
    """
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_working = multiprocessing.Event()

    def run(self):
        """
        worker method, open a channel through a pika connection and
        start consuming
        """
        connection = pika.BlockingConnection(
                              pika.ConnectionParameters(host='localhost')
                     )
        channel = connection.channel()
        channel.queue_declare(queue='worker_queue', auto_delete=False,
                                                    durable=True)

        # don't give work to one worker guy until he's finished
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='worker_queue')

        # do what `channel.start_consuming()` does but with stopping signal
        while len(channel._consumers) and not self.stop_working.is_set():
            channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
        return 0

    def signal_exit(self):
        """exit when finished with current loop"""
        self.stop_working.set()

    def exit(self):
        """exit worker, blocks until worker is finished and dead"""
        self.signal_exit()
        while self.is_alive(): # checking `is_alive()` on zombies kills them
            time.sleep(1)

    def kill(self):
        """kill now! should not use this, might create problems"""
        self.terminate()
        self.join()


def callback(channel, method, properties, body):
    """pika basic consume callback"""
    print 'GOT:', body
    # do some heavy lifting here
    result = save_to_database(body)
    print 'DONE:', result
    channel.basic_ack(delivery_tag=method.delivery_tag)

РЕДАКТИРОВАТЬ:

Я должен двигаться вперед, так что вот обходной путь, который я собираюсь предпринять, если не найдется лучший подход,

Так,RabbitMQ имеет этиHTTP управление API-интерфейсом, они работают после того, как вы включилиплагин управления и в середине страницы HTTP API есть

/ api / connections - список всех открытых соединений.

/ api / connections / name - индивидуальное соединение. УДАЛЕНИЕ Это закроет соединение.

Итак, если я подключу свойРабочие и мойПроизводит оба по разномусоединение имена / пользователи, я смогу проверить, еслиРабочая связь открыт ... (могут быть проблемы, когда работник умирает ...)

будет ждать лучшего решения ...

РЕДАКТИРОВАТЬ:

только что нашел это в документах rabbitmq, но это было бы хакерски делать в python:

shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue    0
...done.

чтобы я мог сделать что-то вроде

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")

хаки ... все еще надеюсь, что у pika есть какая-то функция python, чтобы сделать это

Спасибо,

Ответы на вопрос(2)

Ваш ответ на вопрос