Subscribing with Guaranteed Delivery

Use guaranteed delivery (GD) to ensure that events are still delivered to the subscriber if the connection is temporarily lost or the server is restarted.

Prerequisites
Enable guaranteed delivery in a window and attach a log store in the CCL. To receive checkpoint messages from the server on streams using GD with checkpoint, set the Auto Checkpoint parameter in the project configuration file. The client may also receive checkpoint messages if the consistent recovery option is turned on and a publisher commits a message.
Task

Guaranteed delivery is a delivery mechanism that preserves events produced by a window, keeps data in a log store, and tracks events consumed by GD subscribers. For more information on guaranteed delivery, see the Programmers Guide.

A CCL project can be set to checkpoint after a number of messages pass through it. Once the configured number of messages pass through the project, the server commits the log store and sends a checkpoint message to the subscriber. This indicates that all messages up to the checkpoint sequence number are safely logged in the system.

A subscriber must indicate to the server when it has processed the messages and can recover them without the server. The subscriber can call esp_publisher_commit_gd at any time to tell the server the sequence number of the last message that has been processed. The commit call ensures that the server will not resend messages up to and including the last sequence number committed, and allows it to reclaim resources consumed by these messages. The subscriber should not commit sequence numbers higher than the sequence number received via the last checkpoint message. This ensures that no data is lost if the server restarts.

  1. Request a GD subscription by calling esp_subscriber_options_set_gd_session(EspSubscriberOptions * options, char * session_name, EspError * error) and creating the Subscriber object.
  2. Create and connect a Publisher object.
  3. Check if streams have GD or GD with checkpoint enabled by calling esp_stream_is_gd_enabled(const EspStream * stream, EspError * error) and esp_stream_is_checkpoint_enabled(const EspStream * stream, EspError * error).
  4. Retrieve active and inactive GD sessions by calling esp_project_get_active_gd_sessions(EspProject * Project, EspList * gd_sessions, EspError * error) and esp_project_get_inactive_gd_sessions(EspProject * Project, EspList * gd_sessions, EspError * error).
  5. Retrieve the checkpoint sequence number for the last checkpointed data by calling esp_subscriber_event_get_checkpoint_sequence_number(const EspSubscriberEvent * event, int64_t * seq_val, EspError * error).
  6. Tell the server that the subscriber has committed messages up to a given sequence number and no longer needs them by calling esp_publisher_commit_gd(EspPublisher * publisher, char * gd_name, int32_t stream_ids[], int64_t seq_nos[], int32_t len, EspError * error).
  7. Cancel the GD session by closing the subscriber or by calling esp_project_cancel_gd_subscriber_session(EspProject * project, char * gd_session, EspError * error).

Example

    int rc = esp_project_connect(project, g_error);
    if (rc != 0) { print_error_and_exit(g_error, __LINE__); }

    EspSubscriberOptions *options = esp_subscriber_options_create(g_error);

    // This will set the GD session. Provide a unique GD session name (GD999 in the example).
    // The GD session will only be created once esp_project_create_subscriber(…) is called.
    esp_subscriber_options_set_gd_session(options, "GD999", g_error);

    EspSubscriber * subscriber = esp_project_create_subscriber(project, options, g_error);
    if (NULL == subscriber) { print_error_and_exit(g_error, __LINE__); }

    // Here this example expects that the CCL has a window called "In1" with an attached log store and
    // guaranteed delivery enabled. You can use stream name from your CCL.
    const EspStream * stream = esp_project_get_stream(project, "In1", g_error);
    if (NULL == stream) { print_error_and_exit(g_error, __LINE__); }

    // This call checks whether stream in the CCL has GD enabled.
    rc = esp_stream_is_gd_enabled(stream, g_error);
    if (rc == 0)
    {
        printf("%s\n", "stream has GD enabled!”);
    }

    // This call checks whether the checkpoint is set in the CCL project.
    rc = esp_stream_is_checkpoint_enabled(stream, g_error);
    if (rc == 0)
    {
        printf("%s\n", "chkpointEnable...OK");
    }
    rc = esp_subscriber_connect(subscriber, g_error);
    if (rc != 0) { print_error_and_exit(g_error, __LINE__); }

    // This call checks whether it is a GD subscriber.
    rc = esp_subscriber_is_gd_enabled(subscriber, g_error);
    if (rc == 0)
    {
        printf("%s\n", "GD subscriber!");
    }

    EspList * gd_sessions  = esp_list_create(ESP_LIST_STRING_T, g_error);

    // This gets all the active gd_sessions and populates the gd_sessions list.
    rc =  esp_project_get_active_gd_sessions(project, gd_sessions, g_error);

    const char * gds = esp_list_get_string(gd_sessions, 0, g_error);
    printf("%s", gds);

    while (!done) 
    {
          event = esp_subscriber_get_next_event(subscriber, g_error);
          if (esp_error_get_code(g_error) != 0) { print_error_and_exit(g_error, __LINE__); }

          rc = esp_subscriber_event_get_type(event, &eventType, g_error);
          if (rc != 0) 
          {
                esp_subscriber_event_free(event, g_error);
                print_error_and_exit(g_error, __LINE__);
          }

          switch(eventType) 
          {
                case ESP_SUBSCRIBER_EVENT_DATA:
                      // Process events                            
                      break;      
          
                // This event type indicates that the event is a check point event. Check point events are received only
                // if CCR file has “auto-cp-trans” set.
                // You can issue GD commit at any time after you have processed the events and can recover it without the ESP server. 
                // This example assumes that CCL project is check point enabled.
                case ESP_SUBSCRIBER_CHECKPOINT:
                      printf("<checkpoint-is-received/>\n");
                      int64_t seq_val;

                      // Get the check point sequence number from the check point event.
                      rc = esp_subscriber_event_get_checkpoint_sequence_number(event, &seq_val, g_error);
                      if (rc == 0) 
                      {
                             printf("SeqNo # %ld\n", seq_val);
                             int32_t ids[1]; 
                             esp_stream_get_id(stream, &ids[0], g_error);
                             printf("stream id = %d\n", ids[0]);
                             int64_t sq[1];
                             sq[0] = seq_val;
                             printf("seq = %d\n", sq[0]);
                             // Do extra steps to make sure that all the events up to this seq_val are consumed
                             // by third party applications or safely logged so that they can be recovered without
                             // ESP server if needed.
                             // Code for logging or handing off to 3rd party applications goes here…      

                             // The following call sends commit message to server indicating that it no longer needs 
                             // events up to seq_val. In this scenario (GD + check point), you should not commit sequence
                             // numbers higher than the sequence number received from the server to safely recover data when
                             // server is restarted.
                             rc = esp_publisher_commit_gd(publisher, "GD999", ids, sq, 1, g_error);
                             printf("rc = %d\n", rc);                            
                      }              
                      break;
                default:
                      break;
          }
          esp_subscriber_event_free(event, g_error);
    }
    
    // This cancels the GD subscriber session. When you close a subscriber, this call is internally called and GD session is
    // cancelled. This call is here to show how to use the API.
    rc = esp_project_cancel_gd_subscriber_session(project, "GD999", g_error);

    if (rc == 0) { printf("cancelled.... SUCCESS!\n"); }