The Adaptive Server session is a publisher. It publishes on “topicA” and “topicB”; publications on “topicB” are published as retained publications. The retained publication is deleted.
-- @QM has the queue manager endpoint
declare @QM varchar(100)
-- @BROKER has the broker queue name
declare @BROKER varchar(100)
-- @STREAM has the stream queue name
declare @STREAM varchar(100)
-- @CORRELID has the generated correlation id
declare @CORRELID varchar(100)
-- Put Queue manager name, broker and stream queue names into variables
select @QM = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER = 'SYSTEM.BROKER.CONTROL.QUEUE'
select @STREAM = 'Q1.STREAM'
-- Register the publisher, only for topicA
select msgsend(NULL, @QM + ',queue=' + @BROKER
option 'rfhCommand=registerPublisher'
message header 'correlationAsId=generate'
+ ',topics=topicA'
+ ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801
-- Save the generated correlation id
select @CORRELID = @@msgcorrelation
-- Send two publications on topicA
select msgsend('topicA, publication 1', @QM + ',queue=' + @STREAM
option 'rfhCommand=publish'
message header 'correlationAsId=yes'
+ ',correlationId=' + @CORRELID
+ ',topics=topicA')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014803
select msgsend('topicA, publication 2', @QM + ',queue=' + @STREAM
option 'rfhCommand=publish'
message header 'correlationAsId=yes'
+ ',correlationId=' + @CORRELID
+ ',topics=topicA')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014805
-- Add another topic for this publisher
select msgsend(NULL, @QM + ',queue=' + @BROKER
option 'rfhCommand=registerPublisher'
message header 'correlationAsId=yes'
+ ',correlationId=' + @CORRELID
+ ',topics=topicB'
+ ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014807
-- Publish a retained message on topicB
select msgsend('topicB, retained publication 1', @QM + ',queue=' + @STREAM
option 'rfhCommand=publish'
message header 'correlationAsId=yes'
+ ',correlationId=' + @CORRELID
+ ',topics=topicB'
+ ',retainPub=yes')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014809
-- Publish a second retained publication on topicB
-- This one will replace the current retained publication on topicB.
select msgsend('topicB, retained publication 2', @QM + ',queue=' + @STREAM
option 'rfhCommand=publish'
message header ',correlationAsId=Yes'
+ ',correlationId' + @CORRELID
+ ',topics=topicB'
+ ',retainPub=yes')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb2001480b
-- Delete the retained publication on topicB
select msgsend(NULL, @QM + ',queue=' + @STREAM
option 'rfhCommand=deletePublication'
message header 'topics=topicB'
+ ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb2001480d
-- Deregister the publisher, for all topics.
select msgsend(NULL, @QM + ',queue=' + @BROKER
option 'rfhCommand=deregisterPublisher'
message header 'correlationAsId=yes'
+ ',correlationId=' + @CORRELID
+ ',deregAll=yes'
+ ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb2001480f
In this example, the Adaptive Server session subscribes to “topicA” and “topicB”; publications on “topicB” are published as retained publications. This subscriber processes retained publications by requesting an update from the pub/sub broker.
-- @QM has the queue manager endpoint declare @QM varchar(100) -- @BROKER has the broker queue name declare @BROKER varchar(100) -- @SUBQUEUE has the subscriber queue name declare @SUBQUEUE varchar(100) -- @STREAM has the stream queue name declare @STREAM varchar(100) -- @CORRELID has the generated correlation id declare @CORRELID varchar(100) -- Put broker and subscriber queue names into variables select @QM = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1' select @BROKER = 'SYSTEM.BROKER.CONTROL.QUEUE' select @SUBQUEUE = 'Q1.SUBSCRIBER' select @STREAM = 'Q1.STREAM' -- Register the subscriber, only for topicA select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerSubscriber' message header 'correlationAsId=generate' + ',topics=topicA' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014801 -- Save the generated correlation id select @CORRELID = @@msgcorrelation -- Add another topic for this subscriber -- we will explicitly request update for publications on this topic. select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=registerSubscriber' message header 'CorrelationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicB' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE + ',pubOnReqOnly=yes') ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014803 -- The publisher now publishes messages in the following order: -- topicA, topicB (*), topicA, topicB (*) -- ( '*' denotes a retained publication ) -- Get the first message on the subscriber queue, it will be on topicA. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicA -- Get the second message on the subscriber queue, it will be on topicA. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicA -- Request the broker to now send retained publications on topicB select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=requestUpdate' message header 'CorrelationAsId=yes' + ',correlationId=' + @CORRELID + ',topics=topicB' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014805 -- Get the next message on the subscriber queue, it will be on topicB. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicB -- Get the next message on the subscriber queue, it will be on topicB. select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss') ----------------------------------------------------------------------- publication on topicB -- Deregister the subscriber, for all topics. select msgsend(NULL, @QM + ',queue=' + @BROKER option 'rfhCommand=deregisterSubscriber' message header 'CorrelationAsId=yes' + ',correlationId=' + @CORRELID + ',deregAll=yes' + ',streamName=' + @STREAM + ',queueName=' + @SUBQUEUE) ----------------------------------------------------------------------- 0x414d51204652414e4349532e514d202041a3ebfb20014807
This example shows how can use request/response messaging to check the response from the pub/sub broker. A subscription is registered by user1, and the pub/sub broker response is checked. The same subscription is then registered again by user2, with a different subscription name, which causes an error response from the pub/sub broker.
Queries executed by user1:
-- @QM has the queue manager endpoint
declare @QM varchar(100)
-- @BROKER has the broker queue name
declare @BROKER varchar(100)
-- @SUBQUEUE has the subscriber queue name
declare @SUBQUEUE varchar(100)
-- @REPLY has the reply queue name
declare @REPLY varchar(100)
-- Put broker, subscriber and reply queue names into variables
select @QM = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER = 'SYSTEM.BROKER.CONTROL.QUEUE'
select @SUBQUEUE = 'Q1.SUBSCRIBER'
select @REPLY = 'Q1.REPLY'
-- Register the subscriber.
select msgsend(NULL, @QM + ',queue=' + @BROKER
option 'rfhCommand=registerSubscriber, msgType=request'
message header 'correlationAsId=generate'
+ ',topics=topicA'
+ ',streamName=Q1.STREAM'
+ ',queueName=Q1.SUBSCRIBER'
+ ',replyToQueue=Q1.REPLY')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801
-- Read the response
select msgrecv(@QM + ',queue=' + @REPLY option 'timeout=30ss')
-----------------------------------------------------------------------
NULL
-- Check @@msgproperties
select @@msgproperties
-----------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<msgproperties
MQPSReasonText="'MQRC_NONE'"
MQPSReason="0"
MQPSCompCode="0">
</msgproperties>
-- Check MQPSCompCode
if (msgpropvalue('MQPSCompCode', @@msgproperties) != "0")
begin
print "registerSubscriber failed"
end
Queries executed by user2:
-- @QM has the queue manager endpoint
declare @QM varchar(100)
-- @BROKER has the broker queue name
declare @BROKER varchar(100)
-- @SUBQUEUE has the subscriber queue name
declare @SUBQUEUE varchar(100)
-- @REPLY has the reply queue name
declare @REPLY varchar(100)
-- Put broker, subscriber and reply queue names into variables
select @QM= 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER= 'SYSTEM.BROKER.CONTROL.QUEUE'
select @SUBQUEUE= 'Q1.SUBSCRIBER'
select @REPLY= 'Q1.REPLY'
-- Register the subscriber
select msgsend(NULL, @QM + ',queue=' + @BROKER
option 'rfhCommand=registerSubscriber, msgType=request'
message header 'correlationAsId=generate'
+ ',topics=topicA'
+ ',streamName=Q1.STREAM'
+ ',queueName=Q1.SUBSCRIBER'
+ ',replyToQueue=Q1.REPLY')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801
-- Read the response
select msgrecv(@QM + ',queue=' + @REPLY option 'timeout=30ss')
-----------------------------------------------------------------------
NULL
-- Check @@msgproperties
select @@msgproperties
------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<msgproperties
MQPSUserId="'user2 '"
MQPSReasonText="'MQRCCF_DUPLICATE_IDENTITY'"
MQPSReason="3078"
MQPSCompCode="2"
</msgproperties>
-- Check MQPSCompCode
if (msgpropvalue('MQPSCompCode', @@msgproperties) != "0")
begin
print "registerSubscriber failed"
end