Use guaranteed delivery (GD) to ensure that events are still delivered to the subscriber if the connection is temporarily lost or the server is restarted.
Guaranteed delivery is a delivery mechanism that preserves events produced by a window, keeps data in a log store, and tracks events consumed by GD subscribers. For more information on guaranteed delivery, see the Programmers Guide.
A CCL project can be set to checkpoint after a number of messages pass through it. Once the configured number of messages pass through the project, the server commits the log store and sends a checkpoint message to the subscriber. This indicates that all messages up to the checkpoint sequence number are safely logged in the system.
A subscriber must indicate to the server when it has processed the messages and can recover them without the server. The subscriber can call NetEspPublisher.commit_gd at any time to tell the server the sequence number of the last message that has been processed. The commit call ensures that the server will not resend messages up to and including the last sequence number committed, and allows it to reclaim resources consumed by these messages. The subscriber should not commit sequence numbers higher than the sequence number received via the last checkpoint message. This ensures that no data is lost if the server restarts.
// To connect to ESP server NetEspError espError = new NetEspError(); NetEspSdk s_sdk = SYBASE.Esp.SDK.NetEspSdk.get_sdk(); s_sdk.start(espError); NetEspCredentials creds = new NetEspCredentials(NetEspCredentials.NET_ESP_CREDENTIALS_T.NET_ESP_CREDENTIALS_USER_PASSWORD); creds.set_user("sybase"); creds.set_password("sybase"); NetEspUri uri = new NetEspUri(); uri.set_uri("esp://localhost:19011", espError); NetEspServerOptions soptions = new NetEspServerOptions(); NetEspServer server = s_sdk.get_server(uri, creds, soptions, espError); server.connect(espError); // To connect to an ESP project NetEspProject project = new NetEspProject(); NetEspProjectOptions projoptions = new NetEspProjectOptions(); project = server.get_project("workspace", "gd", projoptions, espError); project.connect(espError); // To create a GD subscriber NetEspSubscriberOptions suboptions = new NetEspSubscriberOptions(); suboptions.set_gd_session("GD999"); NetEspSubscriber subscriber = project.create_subscriber(suboptions, espError); // To create a publisher to commitGD message NetEspPublisher publisher = project.create_publisher(null, espError); publisher.connect(espError); //check GD status/mode NetEspStream stream1 = project.get_stream("In1", espError); subscriber.subscribe_stream(stream1, espError); subscriber.connect(espError); stream1.is_gd_enabled(espError); stream1.is_checkpoint_enabled(espError); subscriber.is_gd_enabled(); // retrieve GD sessions project.get_active_gd_sessions(espError); project.get_inactive_gd_sessions(espError); NetEspSubscriberEvent event1; Boolean done = false; while (!done) { event1 = subscriber.get_next_event(espError); switch (event1.getType()) { case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_DATA: NetEspMessageReader reader = event1.getMessageReader(); NetEspRowReader rows = reader.next_row(espError); while (rows != null) { int intcol1 = rows.get_integer(1, espError); Console.Out.Write(intcol1); string stringco2 = rows.get_string(2, espError); Console.Out.WriteLine(" " + stringco2); rows = reader.next_row(espError); } break; case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_CLOSED: done = true; break; case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_DISCONNECTED: done = true; break; case (uint)NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_CHECKPOINT: // retrieve the sequence number returned by NET_ESP_SUBSCRIBER_CHECKPOINT message long seq_val = event1.get_checkpoint_sequence_number(espError); if (seq_val > 0) { Console.Out.WriteLine("SeqNo #" + seq_val); int[] idArray = new int[1]; long[] seqArray = new long[1]; idArray[0] = stream1.get_id(); seqArray[0] = seq_val; // commitGD message for a single stream with the corresponding last checkpointed sequence number publisher.commit_gd("GD999", idArray, seqArray, espError); } break; default: break; } } // cancel GD session and disconnect subscriber and publisher before exit project.cancel_gd_subscriber_session("GD999", espError); subscriber.disconnect(espError); server.disconnect(espError);