/* Formatted on 2001/09/10 16:08 (RevealNet Formatter v4.4.1) */ CREATE OR REPLACE PACKAGE aq IS /* Standard datatypes for use with Oracle AQ. */ v_msgid RAW (16); SUBTYPE msgid_type IS v_msgid%TYPE; v_name VARCHAR2 (49); SUBTYPE name_type IS v_name%TYPE; /* Some common exceptions encountered when working with Oracle AQ. You will probably want to add more; see chapter in book. */ dequeue_disabled EXCEPTION; PRAGMA exception_init (dequeue_disabled, -25226); dequeue_timeout EXCEPTION; PRAGMA exception_init (dequeue_timeout, -25228); end_of_message_group EXCEPTION; PRAGMA exception_init (end_of_message_group, -25235); /* Create and Remove queue objects. */ PROCEDURE create_queue ( qtable IN VARCHAR2, payload_type IN VARCHAR2, qname IN VARCHAR2, prioritize IN VARCHAR2 := NULL, qtype IN INTEGER := DBMS_AQADM.normal_queue ); PROCEDURE create_priority_queue ( qtable IN VARCHAR2, payload_type IN VARCHAR2, qname IN VARCHAR2, prioritize IN VARCHAR2 := 'PRIORITY,ENQ_TIME' ); /* Hides the need for an agent object. */ PROCEDURE add_subscriber ( qname IN VARCHAR2, subscriber IN VARCHAR2, address IN VARCHAR2 := NULL ); PROCEDURE stop_and_drop ( qtable IN VARCHAR2, qname IN VARCHAR2 := '%', enqueue IN BOOLEAN := TRUE, dequeue IN BOOLEAN := TRUE, wait IN BOOLEAN := TRUE ); PROCEDURE killqt (qtable IN VARCHAR2); /* Retrieve queue information */ FUNCTION queue_exists ( qname IN VARCHAR2 ) RETURN BOOLEAN; FUNCTION qtable_exists ( qtable IN VARCHAR2 ) RETURN BOOLEAN; FUNCTION msgcount ( qtable IN VARCHAR2, qname IN VARCHAR2 ) RETURN INTEGER; FUNCTION msgdata ( qtable_in IN VARCHAR2, msgid_in IN RAW, data_in IN VARCHAR2 ) RETURN VARCHAR2; PROCEDURE showmsgs ( qtable IN VARCHAR2, qname IN VARCHAR2 ); END; / CREATE OR REPLACE PACKAGE BODY aq IS g_dyncur PLS_INTEGER; /* Private program */ PROCEDURE initcur IS BEGIN IF NOT DBMS_SQL.is_open ( g_dyncur ) THEN g_dyncur := DBMS_SQL.open_cursor; END IF; EXCEPTION WHEN INVALID_CURSOR THEN g_dyncur := DBMS_SQL.open_cursor; END; /* Check status of AQ objects */ FUNCTION queue_exists ( qname IN VARCHAR2 ) RETURN BOOLEAN IS CURSOR q_cur IS SELECT 'x' FROM user_queues WHERE name = UPPER (qname); q_rec q_cur%ROWTYPE; retval BOOLEAN; BEGIN OPEN q_cur; FETCH q_cur INTO q_rec; retval := q_cur%FOUND; CLOSE q_cur; RETURN retval; END; FUNCTION qtable_exists ( qtable IN VARCHAR2 ) RETURN BOOLEAN IS CURSOR q_cur IS SELECT 'x' FROM user_queue_tables WHERE queue_table = UPPER (qtable); q_rec q_cur%ROWTYPE; BEGIN OPEN q_cur; FETCH q_cur INTO q_rec; RETURN q_cur%FOUND; END; PROCEDURE create_queue ( qtable IN VARCHAR2, payload_type IN VARCHAR2, qname IN VARCHAR2, prioritize IN VARCHAR2 := NULL, qtype IN INTEGER := DBMS_AQADM.normal_queue ) IS BEGIN IF NOT qtable_exists (qtable) THEN DBMS_AQADM.create_queue_table ( queue_table=> qtable, queue_payload_type=> payload_type, sort_list=> NVL ( prioritize, 'ENQ_TIME' ) ); END IF; IF NOT queue_exists (qname) THEN DBMS_AQADM.create_queue ( queue_name=> qname, queue_table=> qtable, queue_type=> qtype ); END IF; DBMS_AQADM.start_queue ( queue_name=> qname, enqueue=> qtype != DBMS_AQADM.exception_queue ); END; PROCEDURE create_priority_queue ( qtable IN VARCHAR2, payload_type IN VARCHAR2, qname IN VARCHAR2, prioritize IN VARCHAR2 := 'PRIORITY,ENQ_TIME' ) IS BEGIN create_queue ( qtable, payload_type, qname, prioritize ); END; PROCEDURE add_subscriber ( qname IN VARCHAR2, subscriber IN VARCHAR2, address IN VARCHAR2 := NULL ) IS v_agent sys.aq$_agent; BEGIN v_agent := sys.aq$_agent ( subscriber, address, NULL ); DBMS_AQADM.add_subscriber ( qname, v_agent ); END; /* Stop and Drop Utilities */ PROCEDURE stop_and_drop ( qtable IN VARCHAR2, qname IN VARCHAR2 := '%', enqueue IN BOOLEAN := TRUE, dequeue IN BOOLEAN := TRUE, wait IN BOOLEAN := TRUE ) IS CURSOR q_cur IS SELECT name FROM user_queues WHERE queue_table = UPPER (qtable); all_dropped BOOLEAN := enqueue AND dequeue; BEGIN FOR q_rec IN q_cur LOOP IF q_rec.name LIKE UPPER (qname) THEN p.l (q_rec.name); DBMS_AQADM.stop_queue ( q_rec.name, enqueue, dequeue, wait ); DBMS_OUTPUT.put_line ( 'stopping ' || q_rec.name ); IF enqueue AND dequeue THEN DBMS_AQADM.drop_queue ( q_rec.name ); DBMS_OUTPUT.put_line ( 'dropping ' || q_rec.name ); END IF; ELSE all_dropped := FALSE; END IF; END LOOP; IF all_dropped AND qtable_exists (qtable) THEN DBMS_AQADM.drop_queue_table ( qtable, force => TRUE ); DBMS_OUTPUT.put_line ( 'dropping ' || qtable ); END IF; END; PROCEDURE killqt (qtable IN VARCHAR2) IS BEGIN DBMS_AQADM.drop_queue_table ( qtable, force => TRUE ); END; /* Retrieve information about queues */ /* Enhance this so that you provide just the queue name and look up the queue table name from that. */ FUNCTION msgcount ( qtable IN VARCHAR2, qname IN VARCHAR2 ) RETURN INTEGER IS fdbk PLS_INTEGER; retval PLS_INTEGER; BEGIN initcur; DBMS_SQL.parse ( g_dyncur, 'SELECT COUNT(*) FROM AQ$' || qtable || ' WHERE queue = :qname', DBMS_SQL.native ); DBMS_SQL.define_column ( g_dyncur, 1, 1 ); DBMS_SQL.bind_variable ( g_dyncur, 'qname', UPPER (qname) ); fdbk := DBMS_SQL.execute_and_fetch ( g_dyncur ); DBMS_SQL.column_value ( g_dyncur, 1, retval ); RETURN retval; END; FUNCTION msgdata ( qtable_in IN VARCHAR2, msgid_in IN RAW, data_in IN VARCHAR2 ) RETURN VARCHAR2 IS fdbk PLS_INTEGER; retval VARCHAR2 (2000); BEGIN initcur; DBMS_SQL.parse ( g_dyncur, 'SELECT ' || data_in || ' FROM AQ$' || qtable_in || ' WHERE msg_id = :msgid', DBMS_SQL.native ); DBMS_SQL.define_column ( g_dyncur, 1, 'a', 2000 ); DBMS_SQL.bind_variable_raw ( g_dyncur, 'msgid', msgid_in ); fdbk := DBMS_SQL.execute_and_fetch ( g_dyncur ); IF fdbk = 1 THEN DBMS_SQL.column_value ( g_dyncur, 1, retval ); ELSE NULL; END IF; RETURN retval; END; PROCEDURE showmsgs ( qtable IN VARCHAR2, qname IN VARCHAR2 ) IS fdbk PLS_INTEGER; v_msg_priority PLS_INTEGER; v_msg_state VARCHAR2 (30); v_retry_count PLS_INTEGER; v_correlation VARCHAR2 (30); BEGIN initcur; DBMS_SQL.parse ( g_dyncur, 'SELECT msg_priority, msg_state, retry_count, corr_id FROM AQ$' || qtable || ' WHERE queue = :qname', DBMS_SQL.native ); DBMS_SQL.define_column ( g_dyncur, 1, 1 ); DBMS_SQL.define_column ( g_dyncur, 2, 'a', 30 ); DBMS_SQL.define_column ( g_dyncur, 3, 1 ); DBMS_SQL.define_column ( g_dyncur, 4, 'a', 30 ); DBMS_SQL.bind_variable ( g_dyncur, 'qname', UPPER (qname) ); fdbk := DBMS_SQL.EXECUTE (g_dyncur); WHILE DBMS_SQL.fetch_rows ( g_dyncur ) > 0 LOOP IF v_msg_state IS NULL THEN /* Display the header */ DBMS_OUTPUT.put_line ( 'Priority State Retries Correlation ID' ); DBMS_OUTPUT.put_line ( '--------------- ---------- ------- ------------------------------' ); END IF; DBMS_SQL.column_value ( g_dyncur, 1, v_msg_priority ); DBMS_SQL.column_value ( g_dyncur, 2, v_msg_state ); DBMS_SQL.column_value ( g_dyncur, 3, v_retry_count ); DBMS_SQL.column_value ( g_dyncur, 4, v_correlation ); DBMS_OUTPUT.put_line ( RPAD (v_msg_priority, 16) || RPAD (v_msg_state, 11) || RPAD (v_retry_count, 8) || RPAD (v_correlation, 30) ); END LOOP; END; END; / /*====================================================================== | Supplement to the third edition of Oracle PL/SQL Programming by Steven | Feuerstein with Bill Pribyl, Copyright (c) 1997-2002 O'Reilly & | Associates, Inc. To submit corrections or find more code samples visit | http://www.oreilly.com/catalog/oraclep3/ */