Use guaranteed delivery (GD) to ensure that events are still delivered to the subscriber if the connection is temporarily lost or the server is restarted.
Guaranteed delivery is a delivery mechanism that preserves events produced by a window, keeps data in a log store, and tracks events consumed by GD subscribers. For more information on guaranteed delivery, see the Programmers Guide.
A CCL project can be set to checkpoint after a number of messages pass through it. Once the configured number of messages pass through the project, the server commits the log store and sends a checkpoint message to the subscriber. This indicates that all messages up to the checkpoint sequence number are safely logged in the system.
A subscriber must indicate to the server when it has processed the messages and can recover them without the server. The subscriber can call esp_publisher_commit_gd at any time to tell the server the sequence number of the last message that has been processed. The commit call ensures that the server will not resend messages up to and including the last sequence number committed, and allows it to reclaim resources consumed by these messages. The subscriber should not commit sequence numbers higher than the sequence number received via the last checkpoint message. This ensures that no data is lost if the server restarts.
// create a GD subscriber SDK sdk = SDK.getInstance(); SubscriberOptions.Builder optBuilder = new SubscriberOptions.Builder(); optBuilder.setGDSession(gdName); SubscriberOptions options = optBuilder.create(); Subscriber subscriber = sdk.createSubscriber(projectUri, creds, options); subscriber.connect(); subscriber.subscribeStream(gdStream); // create an ESP project ProjectOptions opts = new ProjectOptions.Builder().create(); ProjectImpl project = (ProjectImpl) SDK.getInstance().getProject(projectUri, creds, opts); project.connect(60000); // create a publisher to commitGD message Publisher publisher = project.createPublisher(); publisher.connect(); Stream stream = project.getStream(gdStream); // check GD status/mode boolean gdStatus = project.isGDStream(gdStream); boolean gdCPStatus = project.isGDCheckPointStream(gdStream); boolean isGD = subscriber.isGD(); // retrieve GD sessions String[] actGD = project.getActiveGDSessions(); String[] inactGD = project.getInactiveGDSessions(); int streamId = stream.getStreamId(); long seqNo = 0L; // sequence number for GD record SubscriberEvent event = null; boolean done = false; while (!done) { event = subscriber.getNextEvent(); switch (event.getType()) { case DATA: MessageReader reader = event.getMessageReader(); Stream s = reader.getStream(); String str = s.getName(); while (reader.hasNextRow()) { // use nextRowReader(isGD) to read GD and non-GD records RowReader row = reader.nextRowReader(isGD); System.out.println(row.toXmlString(str)); } break; case CHECKPOINT: // retrieve the sequence number returned by CHECKPOINT message seqNo = event.getSeqNo(); System.out.println("<checkpoint>" + seqNo + "</checkpoint>"); int[] streamIds = new int[1]; streamIds[0] = streamId; long[] seqNos = new long[1]; seqNos[0] = seqNo; // commitGD message for a single stream with the corresponding // last checkpointed sequence number publisher.commitGD(gdName, streamIds, seqNos); System.out.println("<commitGD>" + seqNo + "</commitGD>"); break; case CLOSED: System.out.println("<closed/>"); done = true; break; case DISCONNECTED: System.out.println("<disconnected/>"); done = true; break; } } // cancel GD session and disconnect subscriber and publisher before exit project.cancelGDSession(gdName); subscriber.disconnect(); publisher.disconnect();