MQ publish/subscribe examples

Publisher example

The Adaptive Server session is a publisher. It publishes on “topicA” and “topicB”; publications on “topicB” are published as retained publications. The retained publication is deleted.

-- @QM has the queue manager endpoint
declare @QM            varchar(100)
-- @BROKER has the broker queue name
declare @BROKER        varchar(100)
-- @STREAM has the stream queue name
declare @STREAM        varchar(100)
-- @CORRELID has the generated correlation id
declare @CORRELID      varchar(100)

-- Put Queue manager name, broker and stream queue names into variables
select @QM        = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER    = 'SYSTEM.BROKER.CONTROL.QUEUE'
select @STREAM    = 'Q1.STREAM'

-- Register the publisher, only for topicA
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=registerPublisher'
        message header 'correlationAsId=generate'
                        + ',topics=topicA'
                        + ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801

-- Save the generated correlation id
select @CORRELID = @@msgcorrelation

-- Send two publications on topicA
select msgsend('topicA, publication 1', @QM + ',queue=' + @STREAM
        option 'rfhCommand=publish'
        message header 'correlationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',topics=topicA')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014803

select msgsend('topicA, publication 2', @QM + ',queue=' + @STREAM
        option 'rfhCommand=publish'
        message header 'correlationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',topics=topicA')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014805

-- Add another topic for this publisher
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=registerPublisher'
        message header 'correlationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',topics=topicB'
                        + ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014807

-- Publish a retained message on topicB
select msgsend('topicB, retained publication 1', @QM + ',queue=' + @STREAM
        option 'rfhCommand=publish'
        message header 'correlationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',topics=topicB'
                        + ',retainPub=yes')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014809

-- Publish a second retained publication on topicB
-- This one will replace the current retained publication on topicB.
select msgsend('topicB, retained publication 2', @QM + ',queue=' + @STREAM
        option 'rfhCommand=publish'
        message header ',correlationAsId=Yes'
                        + ',correlationId' + @CORRELID
                        + ',topics=topicB'
                        + ',retainPub=yes')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb2001480b

-- Delete the retained publication on topicB
select msgsend(NULL, @QM + ',queue=' + @STREAM
        option 'rfhCommand=deletePublication'
        message header 'topics=topicB'
                        + ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb2001480d

-- Deregister the publisher, for all topics.
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=deregisterPublisher'
        message header 'correlationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',deregAll=yes'
                        + ',streamName=' + @STREAM)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb2001480f

Subscriber example

In this example, the Adaptive Server session subscribes to “topicA” and “topicB”; publications on “topicB” are published as retained publications. This subscriber processes retained publications by requesting an update from the pub/sub broker.

-- @QM has the queue manager endpoint
declare @QM           varchar(100)
-- @BROKER has the broker queue name
declare @BROKER       varchar(100)
-- @SUBQUEUE has the subscriber queue name
declare @SUBQUEUE     varchar(100)
-- @STREAM has the stream queue name
declare @STREAM       varchar(100)
-- @CORRELID has the generated correlation id
declare @CORRELID     varchar(100)

-- Put broker and subscriber queue names into variables
select @QM        = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER    = 'SYSTEM.BROKER.CONTROL.QUEUE'
select @SUBQUEUE  = 'Q1.SUBSCRIBER'
select @STREAM    = 'Q1.STREAM'

-- Register the subscriber, only for topicA
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=registerSubscriber'
        message header 'correlationAsId=generate'
                        + ',topics=topicA'
                        + ',streamName=' + @STREAM
                        + ',queueName=' + @SUBQUEUE)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801

-- Save the generated correlation id
select @CORRELID = @@msgcorrelation

-- Add another topic for this subscriber
-- we will explicitly request update for publications on this topic.
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=registerSubscriber'
        message header 'CorrelationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',topics=topicB'
                        + ',streamName=' + @STREAM
                        + ',queueName=' + @SUBQUEUE
                        + ',pubOnReqOnly=yes')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014803

-- The publisher now publishes messages in the following order:
-- topicA, topicB (*), topicA, topicB (*)
-- ( '*' denotes a retained publication )

-- Get the first message on the subscriber queue, it will be on topicA.
select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss')
-----------------------------------------------------------------------
publication on topicA

-- Get the second message on the subscriber queue, it will be on topicA.
select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss')
-----------------------------------------------------------------------
publication on topicA

-- Request the broker to now send retained publications on topicB
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=requestUpdate'
        message header 'CorrelationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',topics=topicB'
                        + ',streamName=' + @STREAM
                        + ',queueName=' + @SUBQUEUE)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014805

-- Get the next message on the subscriber queue, it will be on topicB.
select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss')
-----------------------------------------------------------------------
publication on topicB

-- Get the next message on the subscriber queue, it will be on topicB.
select msgrecv(@QM + ',queue=' + @SUBQUEUE option 'timeout=30ss')
-----------------------------------------------------------------------
publication on topicB

-- Deregister the subscriber, for all topics.
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=deregisterSubscriber'
        message header 'CorrelationAsId=yes'
                        + ',correlationId=' + @CORRELID
                        + ',deregAll=yes'
                        + ',streamName=' + @STREAM
                        + ',queueName=' + @SUBQUEUE)
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014807

Broker response example

This example shows you how to use request/response messaging to check the response from the pub/sub broker. A subscription is registered by user1, and the pub/sub broker response is checked. The same subscription is then registered again by user2, with a different subscription name, which causes an error response from the pub/sub broker.

Queries executed by user1:

-- @QM has the queue manager endpoint
declare @QM           varchar(100)
-- @BROKER has the broker queue name
declare @BROKER       varchar(100)
-- @SUBQUEUE has the subscriber queue name
declare @SUBQUEUE     varchar(100)
-- @REPLY has the reply queue name
declare @REPLY        varchar(100)

-- Put broker, subscriber and reply queue names into variables
select @QM        = 'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER    = 'SYSTEM.BROKER.CONTROL.QUEUE'
select @SUBQUEUE  = 'Q1.SUBSCRIBER'
select @REPLY     = 'Q1.REPLY'

-- Register the subscriber.
select msgsend(NULL, @QM + ',queue=' + @BROKER
        option 'rfhCommand=registerSubscriber, msgType=request'
        message header 'correlationAsId=generate'
                        + ',topics=topicA'
                        + ',streamName=Q1.STREAM'
                        + ',queueName=Q1.SUBSCRIBER'
                        + ',replyToQueue=Q1.REPLY')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801

-- Read the response
select msgrecv(@QM + ',queue=' + @REPLY option 'timeout=30ss')
-----------------------------------------------------------------------
NULL

-- Check @@msgproperties
select @@msgproperties
-----------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<msgproperties
        MQPSReasonText="&apos;MQRC_NONE&apos;"
        MQPSReason="0"
        MQPSCompCode="0">
</msgproperties>

-- Check MQPSCompCode
if (msgpropvalue('MQPSCompCode', @@msgproperties) != "0")
begin
        print "registerSubscriber failed"
end

Queries executed by user2:

-- @QM has the queue manager endpoint
declare @QM                  varchar(100)
-- @BROKER has the broker queue name
declare @BROKER              varchar(100)
-- @SUBQUEUE has the subscriber queue name
declare @SUBQUEUE            varchar(100)
-- @REPLY has the reply queue name
declare @REPLY               varchar(100)

-- Put broker, subscriber and reply queue names into variables
select @QM=                  'ibm_mq:chan1/tcp/localhost(5678)?qmgr=QM1'
select @BROKER=              'SYSTEM.BROKER.CONTROL.QUEUE'
select @SUBQUEUE=            'Q1.SUBSCRIBER'
select @REPLY=               'Q1.REPLY'

-- Register the subscriber
select msgsend(NULL, @QM + ',queue=' + @BROKER
       option 'rfhCommand=registerSubscriber, msgType=request'
       message header 'correlationAsId=generate'
                             + ',topics=topicA'
                             + ',streamName=Q1.STREAM'
                             + ',queueName=Q1.SUBSCRIBER'
                             + ',replyToQueue=Q1.REPLY')
-----------------------------------------------------------------------
0x414d51204652414e4349532e514d202041a3ebfb20014801

-- Read the response
select msgrecv(@QM + ',queue=' + @REPLY option 'timeout=30ss')
-----------------------------------------------------------------------
NULL

-- Check @@msgproperties
select @@msgproperties
------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<msgproperties
        MQPSUserId="&apos;user2 &apos;"
        MQPSReasonText="&apos;MQRCCF_DUPLICATE_IDENTITY&apos;"
        MQPSReason="3078"
        MQPSCompCode="2"
</msgproperties>

-- Check MQPSCompCode
if (msgpropvalue('MQPSCompCode', @@msgproperties) != "0")
begin
print "registerSubscriber failed"
end