Интересно. Спасибо за указание на это, я думаю, что это была точка, которую я упустил!

аюсь сделать то, что кажется простым, но на удивление трудным.

У меня есть функция подписки на очередь RabbitMQ. Конкретно, это функция Channel.consume здесь:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

Он возвращает обещание, которое разрешается с помощью идентификатора подписки, который необходим для последующей отмены подписки, а также имеет аргумент обратного вызова для вызова при удалении сообщений из очереди.

Когда я хочу отписаться от очереди, мне нужно отменить потребителя с помощью функции Channel.cancel здесь:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel, Это берет ранее возвращенный идентификатор подписки.

Я хочу обернуть все эти вещи в Observable, который подписывается на очередь, когда подписка на observable подписана, и отменяет подписку, когда от наблюдаемой отменяется подписка. Однако это оказывается довольно сложно из-за «двойной асинхронной» природы вызовов (я имею в виду, что они имеют как обратный вызов, так и возвращают обещание).

В идеале код, который я хотел бы написать:

return new Rx.Observable(async (subscriber) => {
  var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
  return async () => {
    await channel.cancel(consumeResult.consumerTag);
  };
});

Однако это невозможно, так как этот конструктор не поддерживает асинхронные функции подписчика или логику удаления.

Я не смог понять это. Я что-то здесь упускаю? Почему это так сложно?

Ура, Алекс

Ответы на вопрос(1)

Ваш ответ на вопрос