The Adaptive Server session is a publisher. It publishes on “topicA” and “topicB”; publications on “topicB” are published as retained publications. The retained publication will be deleted.
-- @QM has the Queue Manager endpoint declare @QM varchar(100) -- @BROKER has the broker queue name declare @BROKER varchar(100) -- @STREAM has the stream queue name declare @STREAM varchar(100) -- @CORRELID has the generated correlation id declare @CORRELID varchar(100) -- Put Queue manager name, broker and stream queue names into variables select @QM = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1' select @BROKER = 'SYSTEM.BROKER.CONTROL.QUEUE' select @STREAM = 'Q1.STREAM' -- Register the publisher, only for topicA select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerPublisher' message header 'correlationAsId=generate' + ',topics=topicA' + ',streamName=' + @STREAM) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014801 -- Save the generated correlation id select @CORRELID = @@msgcorrelation -- Send two publications on topicA select msgsend('topicA, publication 1', @QM + ',queue=' + @STREAM option 'rfhCommand=publish' message header 'correlationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicA') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014803 select msgsend('topicA, publication 2', @QM + ',queue=' + @STREAM option 'rfhCommand=publish' message header 'correlationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicA') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014805 -- Add another topic for this publisher select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerPublisher' message header 'correlationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicB' + ',streamName=' + @STREAM) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014807 -- Publish a retained message on topicB select msgsend('topicB, retained publication 1', @QM + ',queue=' + @STREAM option 'rfhCommand=publish' message header 'correlationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicB' + ',retainPub=yes') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014809 -- Publish a second retained publication on topicB -- This one will replace the current retained publication on topicB. select msgsend('topicB, retained publication 2', @QM + ',queue=' + @STREAM option 'rfhCommand=publish' message header ',correlationAsId=Yes' + ',correlationId' + @CORRELID + ',topics=topicB' + ',retainPub=yes') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb2001480b -- Delete the retained publication on topicB select msgsend(NULL, @QM + ',queue=' + @STREAM option 'rfhCommand=deletePublication' message header 'topics=topicB' + ',streamName=' + @STREAM) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb2001480d -- Deregister the publisher, for all topics. select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=deregisterPublisher' message header 'correlationAsId=yes' + ',correlationId=' + @CORRELID + ',deregAll=yes' + ',streamName=' + @STREAM) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb2001480f
In this example, the Adaptive Server session subscribes to “topicA” and “topicB”; publications on “topicB” are published as retained publications. This subscriber processes retained publications by requesting an update from the pub/sub broker.
-- @QM has the Queue Manager endpoint declare @QM varchar(100) -- @BROKER has the broker queue name declare @BROKER varchar(100) -- @SUBQUEUE has the subscriber queue name declare @SUBQUEUE varchar(100) -- @STREAM has the stream queue name declare @STREAM varchar(100) -- @CORRELID has the generated correlation id declare @CORRELID varchar(100) -- Put broker and subscriber queue names into variables select @QM = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1' select @BROKER = 'SYSTEM.BROKER.CONTROL.QUEUE' select @SUBQUEUE = 'Q1.SUBSCRIBER' select @STREAM = 'Q1.STREAM' -- Register the subscriber, only for topicA select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerSubscriber' message header 'correlationAsId=generate' + ',topics=topicA' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014801 -- Save the generated correlation id select @CORRELID = @@msgcorrelation -- Add another topic for this subscriber -- we will explicitly request update for publications on this topic. select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerSubscriber' message header 'CorrelationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicB' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE + ',pubOnReqOnly=yes') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014803 -- The publisher now publishes messages in the following order: -- topicA, topicB (*), topicA, topicB (*) -- ( '*' denotes a retained publication ) -- Get the first message on the subscriber queue, it will be on topicA. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicA -- Get the second message on the subscriber queue, it will be on topicA. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicA -- Request the broker to now send retained publications on topicB select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=requestUpdate' message header 'CorrelationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicB' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014805 -- Get the next message on the subscriber queue, it will be on topicB. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicB -- Get the next message on the subscriber queue, it will be on topicB. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicB -- Deregister the subscriber, for all topics. select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=deregisterSubscriber' message header 'CorrelationAsId=yes' + ',correlationId=' + @CORRELID + ',deregAll=yes' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014807
This example shows how can use request/response messaging to check the response from the pub/sub broker. A subscription is registered by user1, and the pub/sub broker response is checked. The same subscription is then registered again by user2, with a different subscription name, which causes an error response from the pub/sub broker.
Queries executed by user1:
-- @QM has the Queue Manager endpoint declare @QM varchar(100) -- @BROKER has the broker queue name declare @BROKER varchar(100) -- @SUBQUEUE has the subscriber queue name declare @SUBQUEUE varchar(100) -- @REPLY has the reply queue name declare @REPLY varchar(100) -- Put broker, subscriber and reply queue names into variables select @QM = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1' select @BROKER = 'SYSTEM.BROKER.CONTROL.QUEUE' select @SUBQUEUE = 'Q1.SUBSCRIBER' select @REPLY = 'Q1.REPLY' -- Register the subscriber. select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerSubscriber, msgType=request' message header 'correlationAsId=generate' + ',topics=topicA' + ',streamName=Q1.STREAM' + ',queueName=Q1.SUBSCRIBER' + ',replyToQueue=Q1.REPLY') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014801 -- Read the response select msgrecv(@QM + ',queue=' + @REPLY option 'timeout=30ss') ----------------------------------------------------------------------- NULL -- Check @@msgproperties select @@msgproperties ----------------------------------------------------------------------- <?xml version="1.0" encoding="UTF-8" standalone="yes" ?> <msgproperties MQPSReasonText="'MQRC_NONE'" MQPSReason="0" MQPSCompCode="0"> </msgproperties> -- Check MQPSCompCode if (msgpropvalue('MQPSCompCode', @@msgproperties) != "0") begin print "registerSubscriber failed" end
Queries executed by user2:
-- @QM has the Queue Manager endpoint declare @QM varchar(100) -- @BROKER has the broker queue name declare @BROKER varchar(100) -- @SUBQUEUE has the subscriber queue name declare @SUBQUEUE varchar(100) -- @REPLY has the reply queue name declare @REPLY varchar(100) -- Put broker, subscriber and reply queue names into variables select @QM= 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1' select @BROKER= 'SYSTEM.BROKER.CONTROL.QUEUE' select @SUBQUEUE= 'Q1.SUBSCRIBER' select @REPLY= 'Q1.REPLY' -- Register the subscriber select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerSubscriber, msgType=request' message header 'correlationAsId=generate' + ',topics=topicA' + ',streamName=Q1.STREAM' + ',queueName=Q1.SUBSCRIBER' + ',replyToQueue=Q1.REPLY') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014801 -- Read the response select msgrecv(@QM + ',queue=' + @REPLY option 'timeout=30ss') ----------------------------------------------------------------------- NULL -- Check @@msgproperties select @@msgproperties ---------------------------------------------------------------------------------- <?xml version="1.0" encoding="UTF-8" standalone="yes" ?> <msgproperties MQPSUserId="'user2 '" MQPSReasonText="'MQRCCF_DUPLICATE_IDENTITY'" MQPSReason="3078" MQPSCompCode="2" </msgproperties> -- Check MQPSCompCode if (msgpropvalue('MQPSCompCode', @@msgproperties) != "0") begin print "registerSubscriber failed" end
Copyright © 2005. Sybase Inc. All rights reserved. |
![]() |