Интересно. Спасибо за указание на это, я думаю, что это была точка, которую я упустил!
аюсь сделать то, что кажется простым, но на удивление трудным.
У меня есть функция подписки на очередь RabbitMQ. Конкретно, это функция Channel.consume здесь:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume
Он возвращает обещание, которое разрешается с помощью идентификатора подписки, который необходим для последующей отмены подписки, а также имеет аргумент обратного вызова для вызова при удалении сообщений из очереди.
Когда я хочу отписаться от очереди, мне нужно отменить потребителя с помощью функции Channel.cancel здесь:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel, Это берет ранее возвращенный идентификатор подписки.
Я хочу обернуть все эти вещи в Observable, который подписывается на очередь, когда подписка на observable подписана, и отменяет подписку, когда от наблюдаемой отменяется подписка. Однако это оказывается довольно сложно из-за «двойной асинхронной» природы вызовов (я имею в виду, что они имеют как обратный вызов, так и возвращают обещание).
В идеале код, который я хотел бы написать:
return new Rx.Observable(async (subscriber) => {
var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
return async () => {
await channel.cancel(consumeResult.consumerTag);
};
});
Однако это невозможно, так как этот конструктор не поддерживает асинхронные функции подписчика или логику удаления.
Я не смог понять это. Я что-то здесь упускаю? Почему это так сложно?
Ура, Алекс