Subscribing with Guaranteed Delivery

Use guaranteed delivery (GD) to ensure that events are still delivered to the subscriber if the connection is temporarily lost or the server is restarted.

Prerequisites
Enable guaranteed delivery in a window and attach a log store in the CCL. To receive checkpoint messages from the server on streams using GD with checkpoint, set the Auto Checkpoint parameter in the project configuration file. The client may also receive checkpoint messages if the consistent recovery option is turned on and a publisher commits a message.
Task

Guaranteed delivery is a delivery mechanism that preserves events produced by a window, keeps data in a log store, and tracks events consumed by GD subscribers. For more information on guaranteed delivery, see the Programmers Guide.

A CCL project can be set to checkpoint after a number of messages pass through it. Once the configured number of messages pass through the project, the server commits the log store and sends a checkpoint message to the subscriber. This indicates that all messages up to the checkpoint sequence number are safely logged in the system.

A subscriber must indicate to the server when it has processed the messages and can recover them without the server. The subscriber can call esp_publisher_commit_gd at any time to tell the server the sequence number of the last message that has been processed. The commit call ensures that the server will not resend messages up to and including the last sequence number committed, and allows it to reclaim resources consumed by these messages. The subscriber should not commit sequence numbers higher than the sequence number received via the last checkpoint message. This ensures that no data is lost if the server restarts.

  1. Request a GD subscription by setting setGDSession(String gdName) in SubscriberOptions and creating the subscriber.
  2. Create and connect a Publisher object.
  3. Check if streams have GD or GD with checkpoint enabled, either from the project interface by calling project.isGDStream(String streamName) and project.isGDCheckPointStream(String streamName) or from the stream interface by calling stream.isGDEnabled() and stream.isGDCheckPointEnabled().
  4. Retrieve active and inactive GD sessions by calling project.getActiveGDSessions() and project.getInactiveGDSessions().
  5. Tell the server that the subscriber has committed messages up to a given sequence number and no longer needs them by calling publisher.commitGD(String gdName, int[] streamIDs, long[] seqNos) throws IOException.
  6. Cancel the GD session by closing the subscriber or with project.cancelGDSession(String gdName).

Example

	// create a GD subscriber
	SDK sdk = SDK.getInstance();
	SubscriberOptions.Builder optBuilder = new SubscriberOptions.Builder();
	optBuilder.setGDSession(gdName);
	SubscriberOptions options = optBuilder.create();
	Subscriber subscriber = sdk.createSubscriber(projectUri, creds, options);

	subscriber.connect();
	subscriber.subscribeStream(gdStream);

	// create an ESP project
	ProjectOptions opts = new ProjectOptions.Builder().create();
	ProjectImpl project = (ProjectImpl) SDK.getInstance().getProject(projectUri, creds, opts);
	project.connect(60000);

	// create a publisher to commitGD message
	Publisher publisher = project.createPublisher();
	publisher.connect();
	Stream stream = project.getStream(gdStream);

	// check GD status/mode
	boolean gdStatus = project.isGDStream(gdStream);
	boolean gdCPStatus = project.isGDCheckPointStream(gdStream);
	boolean isGD = subscriber.isGD();

	// retrieve GD sessions
	String[] actGD = project.getActiveGDSessions();
	String[] inactGD = project.getInactiveGDSessions();

	int streamId = stream.getStreamId();
	long seqNo = 0L; // sequence number for GD record
	SubscriberEvent event = null;
	boolean done = false;

	while (!done) {
		event = subscriber.getNextEvent();
		switch (event.getType()) {
                     case DATA:
                           MessageReader reader = event.getMessageReader();
                           Stream s = reader.getStream();
                           String str = s.getName();
                           while (reader.hasNextRow()) {
                                  // use nextRowReader(isGD) to read GD and non-GD records
                                  RowReader row = reader.nextRowReader(isGD);
                                  System.out.println(row.toXmlString(str));
                           }
                           break;
                     case CHECKPOINT:
                           // retrieve the sequence number returned by CHECKPOINT message
                           seqNo = event.getSeqNo();
                           System.out.println("<checkpoint>" + seqNo + "</checkpoint>");
                           int[] streamIds = new int[1];
                           streamIds[0] = streamId;
                           long[] seqNos = new long[1];
                           seqNos[0] = seqNo;
                           // commitGD message for a single stream with the corresponding
                           // last checkpointed sequence number
                           publisher.commitGD(gdName, streamIds, seqNos);
                           System.out.println("<commitGD>" + seqNo + "</commitGD>");
                           break;
                     case CLOSED:
                           System.out.println("<closed/>");
                           done = true;
                           break;
                     case DISCONNECTED:
                           System.out.println("<disconnected/>");
                           done = true;
                           break;
		}
	}

	// cancel GD session and disconnect subscriber and publisher before exit
	project.cancelGDSession(gdName);
	subscriber.disconnect();
	publisher.disconnect();