Oracle Advance Queue - Dequeue не работает
Кажется, я не могу найти решение своей проблемы, я застрял в этом в течение нескольких часов.
Я использую Oracle AQ:
Dbms_Aqadm.Create_Queue_Table(Queue_Table => 'ITEM_EVENT_QT',
Queue_Payload_Type => 'ITEM_EVENT',
Multiple_Consumers => TRUE);
Dbms_Aqadm.Create_Queue(Queue_Name => 'ITEM_EVENT_QUEUE',
Queue_Table => 'ITEM_EVENT_QT',
Max_Retries => 5,
Retry_Delay => 0,
Retention_Time => 432000, -- 5 DAYS
Dependency_Tracking => FALSE,
COMMENT => 'Item Event Queue');
-- START THE QUEUE
Dbms_Aqadm.Start_Queue('ITEM_EVENT_QUEUE');
-- GRANT QUEUE PRIVILEGES
Dbms_Aqadm.Grant_Queue_Privilege(Privilege => 'ALL',
Queue_Name => 'ITEM_EVENT_QUEUE',
Grantee => 'PUBLIC',
Grant_Option => FALSE);
END;
Вот один из моих подписчиков:
Dbms_Aqadm.Add_Subscriber(Queue_Name => 'ITEM_EVENT_QUEUE',
Subscriber => Sys.Aq$_Agent('ITEM_SUBSCRIBER_1',
NULL,
NULL),
rule => 'tab.user_data.header.thread_no = 1');
Dbms_Aq.Register(Sys.Aq$_Reg_Info_List(Sys.Aq$_Reg_Info('ITEM_EVENT_QUEUE:ITEM_SUBSCRIBER_1',
Dbms_Aq.Namespace_Aq,
'plsql://ITEM_API.GET_QUEUE_FROM_QUEUE',
HEXTORAW('FF'))),1);
Регистрация подписчика:
Всякий раз, когда в моей БД происходит определенное событие, я использую триггер, чтобы добавить «событие» в мой AQ, вызывая следующую процедуру из моегоITEM_API
пакет:
PROCEDURE ADD_EVENT_TO_QUEUE(I_EVENT IN ITEM_EVENT,
O_STATUS_CODE OUT VARCHAR2,
O_ERROR_MSG OUT VARCHAR2) IS
ENQUEUE_OPTIONS DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
MESSAGE_HANDLE RAW(16);
EVENT ITEM_EVENT;
HEADER_PROP HEADER_PROPERTIES;
BEGIN
EVENT := I_EVENT;
EVENT.SEQ_NO := ITEM_EVENT_SEQ.NEXTVAL;
ENQUEUE_OPTIONS.VISIBILITY := DBMS_AQ.ON_COMMIT;
ENQUEUE_OPTIONS.SEQUENCE_DEVIATION := NULL;
MESSAGE_PROPERTIES.PRIORITY := 1;
MESSAGE_PROPERTIES.DELAY := DBMS_AQ.NO_DELAY;
MESSAGE_PROPERTIES.EXPIRATION := DBMS_AQ.NEVER;
HEADER_PROP := HEADER_PROPERTIES(1);
EVENT.HEADER := HEADER_PROP;
DBMS_AQ.ENQUEUE(QUEUE_NAME => 'ITEM_EVENT_QUEUE',
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,
PAYLOAD => EVENT,
MSGID => MESSAGE_HANDLE);
EXCEPTION
WHEN OTHERS THEN
ERROR_HANDLER.LOG_ERROR(NULL,
EVENT.ITEM,
EVENT.SEQ_NO,
SQLCODE,
SQLERRM,
O_STATUS_CODE,
O_ERROR_MSG);
RAISE;
END ADD_EVENT_TO_QUEUE;
И это работает, потому что, когда я проверяю свою таблицу AQ, я могу найти «событие», однако мой метод удаления не обрабатывает, как вы можете видеть на изображении ниже, нетDEQ_TIME
.
Вот мой метод dequeue, также из моегоITEM_API
пакет:
PROCEDURE GET_QUEUE_FROM_QUEUE(CONTEXT RAW,
REGINFO SYS.AQ$_REG_INFO,
DESCR SYS.AQ$_DESCRIPTOR,
PAYLOAD RAW,
PAYLOADL NUMBER) IS
R_DEQUEUE_OPTIONS DBMS_AQ.DEQUEUE_OPTIONS_T;
R_MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MESSAGE_HANDLE RAW(16);
I_PAYLOAD ITEM_EVENT;
L_PROC_EVENT BOOLEAN;
O_TARGETS CFG_EVENT_STAGE_TBL;
O_ERROR_MSG VARCHAR2(300);
O_STATUS_CODE VARCHAR2(100);
BEGIN
R_DEQUEUE_OPTIONS.MSGID := DESCR.MSG_ID;
R_DEQUEUE_OPTIONS.CONSUMER_NAME := DESCR.CONSUMER_NAME;
R_DEQUEUE_OPTIONS.DEQUEUE_MODE := DBMS_AQ.REMOVE;
--R_DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;
DBMS_AQ.DEQUEUE(QUEUE_NAME => DESCR.QUEUE_NAME,
DEQUEUE_OPTIONS => R_DEQUEUE_OPTIONS,
MESSAGE_PROPERTIES => R_MESSAGE_PROPERTIES,
PAYLOAD => I_PAYLOAD,
MSGID => V_MESSAGE_HANDLE);
IF I_PAYLOAD IS NOT NULL THEN
L_PROC_EVENT := PROCESS_EVENT(I_PAYLOAD,
O_TARGETS,
O_STATUS_CODE,
O_ERROR_MSG);
END IF;
EXCEPTION
WHEN OTHERS THEN
ERROR_HANDLER.LOG_ERROR(NULL,
NULL,
NULL,
SQLCODE,
SQLERRM,
O_STATUS_CODE,
O_ERROR_MSG);
RAISE;
END GET_QUEUE_FROM_QUEUE;
Я делаю что-то неправильно? Как я могу это исправить? Я думаю, что может быть проблема с моей регистрацией подписчика, но я не уверен.
РЕДАКТИРОВАТЬ: Я только что понял, что если я удалю подписчиков и реестр, а затем добавлю их снова, они удалят все сообщения из очереди. Однако, если другое событие ставится в очередь, оно остается там неопределенно (или до тех пор, пока я не удалю и не добавлю подписчиков снова):
Запись с состоянием 0 и нетDEQ_TIME
это новый.
Нужен ли мне планировщик или что-то в этом роде?
РЕДАКТИРОВАТЬ: Я добавил распространение планировщика в свой AQ:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE');
и даже добавил поле next_time:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE', SYSDATE + 30/86400);
Все еще не работает. Какие-либо предложения? Я предполагаю, что Уведомления AQ не работают, и моя процедура обратного вызова никогда не вызывается. Как я могу это исправить?
РЕДАКТИРОВАТЬ: Я удалил свою процедуру из пакета только для целей тестирования, поэтому мои товарищи по команде могут скомпилировать пакет ITEM_API (я не знаю, перекомпиляция пакета может повлиять или не повлиять на процесс удаления из очереди). Все еще не работает.