Опрос очереди служебной шины Azure из веб-задания Azure с использованием Node.js
Попытка опроса Azure Service Bus Queue с помощью WebJob, написанного в Node.js. Я создал 2 WebJobs. Первый - по запросу и отправляет 10 уникальных сообщений в очередь. Второе задание является непрерывным и опрашивает очередь на наличие сообщений.
Обнаружение следующих проблем:
Опрос медленный. Получение 10 сообщений занимает в среднем около 10 минут. Смотрите пример журнала подробно ниже. В основном непригодный на этой скорости. Вся задержка от получения ответа отreceiveQueueMessage
, Время отклика варьируется от 0 секунд до ~ 120 секунд, в среднем 60 секунд.
Сообщения принимаются в случайном порядке. Не FIFO.
Иногда сообщения принимаются дважды, даже если они читаются в режиме ReceiveAndDelete (я пробовал без параметра режима чтения, который по умолчанию должен принимать ReceiveAndDelete, с{isReceiveAndDelete:true}
и с{isPeekLock:false}
с такими же результатами).
Когда очередь пуста, она должна держать запрос на прием открытым в течение дня, но она всегда возвращается без сообщения об ошибке через 230 секунд. Согласно документации, максимум 24 дня, поэтому я не знаю, откуда придет 230 секунд:
Максимальное время ожидания для операции блокирующего приема в очередях служебной шины составляет 24 дня. Однако максимальное время ожидания на основе REST составляет 55 секунд.
В принципе ничего не работает, как рекламируется. Что я делаю неправильно?
Отправить сообщение Тестовое задание:
var uuid = require('node-uuid');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
var messagesToSend = 10;
sendMessage(0);
function sendMessage(count)
{
var message = {
body: 'test message',
customProperties: {
message_number: count,
sent_date: new Date
},
brokerProperties: {
MessageId: uuid.v4() //ensure that service bus doesn't think this is a duplicate message
}
};
serviceBus.sendQueueMessage(process.env.busSearchQueueName, message, function(err) {
if (!err) {
console.log('sent test message number ' + count.toString());
} else {
console.error('error sending message: ' + err);
}
});
//wait 5 seconds to ensure messages are received by service bus in correct order
if (count < messagesToSend) {
setTimeout(function(newCount) {
//send next message
sendMessage(newCount);
}, 5000, count+1);
}
}
Прием сообщения непрерывной работы:
console.log('listener job started');
var azure = require('azure');
var serviceBus = azure.createServiceBusService(process.env.busSearchConnectionString);
listenForMessages(serviceBus);
function listenForMessages(serviceBus)
{
var start = process.hrtime();
var timeOut = 60*60*24; //long poll for 1 day
serviceBus.receiveQueueMessage(process.env.busSearchQueueName, {timeoutIntervalInS: timeOut, isReceiveAndDelete: true}, function(err, message) {
var end = process.hrtime(start);
console.log('received a response in %ds seconds', end[0]);
if (err) {
console.log('error requesting message: ' + err);
listenForMessages(serviceBus);
} else {
if (message !== null && typeof message === 'object' && 'customProperties' in message && 'message_number' in message.customProperties) {
console.log('received test message number ' + message.customProperties.message_number.toString());
listenForMessages(serviceBus);
} else {
console.log('invalid message received');
listenForMessages(serviceBus);
}
}
});
}
Пример журнала вывода:
[05/06/2015 21:50:14 > 8c2504: SYS INFO] Status changed to Running
[05/06/2015 21:50:14 > 8c2504: INFO] listener job started
[05/06/2015 21:51:23 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:23 > 8c2504: INFO] received test message number 0
[05/06/2015 21:51:25 > 8c2504: INFO] received a response in 2s seconds
[05/06/2015 21:51:26 > 8c2504: INFO] received test message number 4
[05/06/2015 21:51:27 > 8c2504: INFO] received a response in 1s seconds
[05/06/2015 21:51:27 > 8c2504: INFO] received test message number 7
[05/06/2015 21:51:28 > 8c2504: INFO] received a response in 0s seconds
[05/06/2015 21:51:29 > 8c2504: INFO] received test message number 9
[05/06/2015 21:51:49 > 8c2504: INFO] received a response in 20s seconds
[05/06/2015 21:51:49 > 8c2504: INFO] received test message number 1
[05/06/2015 21:53:35 > 8c2504: INFO] received a response in 106s seconds
[05/06/2015 21:53:35 > 8c2504: INFO] received test message number 1
[05/06/2015 21:54:26 > 8c2504: INFO] received a response in 50s seconds
[05/06/2015 21:54:26 > 8c2504: INFO] received test message number 5
[05/06/2015 21:54:35 > 8c2504: INFO] received a response in 9s seconds
[05/06/2015 21:54:35 > 8c2504: INFO] received test message number 9
[05/06/2015 21:55:28 > 8c2504: INFO] received a response in 53s seconds
[05/06/2015 21:55:28 > 8c2504: INFO] received test message number 2
[05/06/2015 21:57:26 > 8c2504: INFO] received a response in 118s seconds
[05/06/2015 21:57:26 > 8c2504: INFO] received test message number 6
[05/06/2015 21:58:28 > 8c2504: INFO] received a response in 61s seconds
[05/06/2015 21:58:28 > 8c2504: INFO] received test message number 8
[05/06/2015 22:00:35 > 8c2504: INFO] received a response in 126s seconds
[05/06/2015 22:00:35 > 8c2504: INFO] received test message number 3
[05/06/2015 22:04:25 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive
[05/06/2015 22:08:16 > 8c2504: INFO] received a response in 230s seconds
[05/06/2015 22:04:25 > 8c2504: INFO] error requesting message: No messages to receive