Subscribing with Guaranteed Delivery

Use guaranteed delivery (GD) to ensure that events are still delivered to the subscriber if the connection is temporarily lost or the server is restarted.

Prerequisites
Enable guaranteed delivery in a window and attach a log store in the CCL. To receive checkpoint messages from the server on streams using GD with checkpoint, set the Auto Checkpoint parameter in the project configuration file. The client may also receive checkpoint messages if the consistent recovery option is turned on and a publisher commits a message.
Task

Guaranteed delivery is a delivery mechanism that preserves events produced by a window, keeps data in a log store, and tracks events consumed by GD subscribers. For more information on guaranteed delivery, see the Programmers Guide.

A CCL project can be set to checkpoint after a number of messages pass through it. Once the configured number of messages pass through the project, the server commits the log store and sends a checkpoint message to the subscriber. This indicates that all messages up to the checkpoint sequence number are safely logged in the system.

A subscriber must indicate to the server when it has processed the messages and can recover them without the server. The subscriber can call NetEspPublisher.commit_gd at any time to tell the server the sequence number of the last message that has been processed. The commit call ensures that the server will not resend messages up to and including the last sequence number committed, and allows it to reclaim resources consumed by these messages. The subscriber should not commit sequence numbers higher than the sequence number received via the last checkpoint message. This ensures that no data is lost if the server restarts.

  1. Request a GD subscription by calling NetEspSubscriberOptions.set_gd_session(string session_name) and creating the NetEspSubscriber object.
  2. Create and connect a NetEspPublisher object.
  3. Check if streams have GD or GD with checkpoint enabled by calling NetEspStream.is_gd_enabled(NetEspError^ error) and NetEspStream.is_checkpoint_enabled(NetEspError^ error).
  4. Retrieve active and inactive GD sessions by calling NetEspProject.get_active_gd_sessions(NetEspError^ error) and NetEspProject.get_inactive_gd_sessions NetEspError^ error).
  5. Retrieve the checkpoint sequence number for the last checkpointed data by calling NetEspSubscriberEvent.get_checkpoint_sequence_number(NetEspError^ error).
  6. Tell the server that the subscriber has committed messages up to a given sequence number and no longer needs them by calling NetEspPublisher.commit_gd(String^ session_name, array<int32_t>^ stream_ids, array<int64_t>^ seq_nos, NetEspError^ error).
  7. Cancel the GD session by closing the subscriber or by calling NetEspProject.cancel_gd_subscriber_session(String^ gd_session, NetEspError^ error).

Example

	// To connect to ESP server
	NetEspError espError = new NetEspError();
	NetEspSdk s_sdk = SYBASE.Esp.SDK.NetEspSdk.get_sdk();
	s_sdk.start(espError);
        NetEspCredentials creds = new NetEspCredentials(NetEspCredentials.NET_ESP_CREDENTIALS_T.NET_ESP_CREDENTIALS_USER_PASSWORD);
        creds.set_user("sybase");
        creds.set_password("sybase");
        NetEspUri uri = new NetEspUri();
        uri.set_uri("esp://localhost:19011", espError);
        NetEspServerOptions soptions = new NetEspServerOptions();
        NetEspServer server = s_sdk.get_server(uri, creds, soptions, espError);
        server.connect(espError);

        // To connect to an ESP project
        NetEspProject project = new NetEspProject();
        NetEspProjectOptions projoptions = new NetEspProjectOptions();
        project = server.get_project("workspace", "gd", projoptions, espError);
        project.connect(espError);

        // To create a GD subscriber
        NetEspSubscriberOptions suboptions = new NetEspSubscriberOptions();
        suboptions.set_gd_session("GD999");
        NetEspSubscriber subscriber = project.create_subscriber(suboptions, espError);

        // To create a publisher to commitGD message
        NetEspPublisher publisher = project.create_publisher(null, espError);
        publisher.connect(espError);

        //check GD status/mode 
        NetEspStream stream1 = project.get_stream("In1", espError);
        subscriber.subscribe_stream(stream1, espError);
        subscriber.connect(espError);
        stream1.is_gd_enabled(espError);
        stream1.is_checkpoint_enabled(espError);
        subscriber.is_gd_enabled();

        // retrieve GD sessions
        project.get_active_gd_sessions(espError);
        project.get_inactive_gd_sessions(espError);

        NetEspSubscriberEvent event1;
        Boolean done = false;

        while (!done)
        {
            event1 = subscriber.get_next_event(espError);

            switch (event1.getType())
            {
                case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_DATA:
                    NetEspMessageReader reader = event1.getMessageReader();
                    NetEspRowReader rows = reader.next_row(espError);
                    while (rows != null)
                    {
                        int intcol1 = rows.get_integer(1, espError);
                        Console.Out.Write(intcol1);

                        string stringco2 = rows.get_string(2, espError);
                        Console.Out.WriteLine(" " + stringco2);

                        rows = reader.next_row(espError);
                    }
                    break;
                case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_CLOSED:
                    done = true;
                    break;
                case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_DISCONNECTED:
                    done = true;
                    break;
                case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_CHECKPOINT:

                    // retrieve the sequence number returned by NET_ESP_SUBSCRIBER_CHECKPOINT message
                    long seq_val = event1.get_checkpoint_sequence_number(espError);
                    if (seq_val > 0)
                    {
                        Console.Out.WriteLine("SeqNo #" + seq_val);

                        int[] idArray = new int[1];
                        long[] seqArray = new long[1];
                        idArray[0] = stream1.get_id();
                        seqArray[0] = seq_val;

                        // commitGD message for a single stream with the corresponding last checkpointed sequence number
                        publisher.commit_gd("GD999", idArray, seqArray, espError);
                    }
                    break;
                default:
                    break;
            }
        }

        // cancel GD session and disconnect subscriber and publisher before exit
        project.cancel_gd_subscriber_session("GD999", espError);
        subscriber.disconnect(espError);
        server.disconnect(espError);