Publishing

The SDK provides several options for publishing data to a project.

The steps involved in publishing data are:
  1. Create an EspPublisher for the project to publish to. You can create an EspPublisher directly or from a previously retrieved and connected EspProject object.
  2. Create an EspMessageWriter for the stream to publish to. You can create multiple EspMessageWriters from a single EspPublisher.
  3. Create an EspRelativeRowWriter.
  4. Format the data buffer to publish using EspRelativeRowWriter methods.
  5. Publish the data.

While EspPublisher is thread safe, EspMessageWriter and EspRelativeRowWriter are not. Therefore, ensure that you synchronize access to the latter two.

The SDK provides a number of options to tune the behavior of an EspPublisher. Specify these options using EspPublisherOptions when creating the EspPublisher. Once created, options cannot be changed. Like all other entities in the SDK, publishing also supports the direct, callback, and select access modes.

In addition to access modes, the SDK supports internal buffering. When publishing is buffered, the data is first written to an internal queue. This is picked up by a publishing thread and then written to the ESP project. Buffering is possible only in direct access mode. Direct and buffered publishing potentially provides the best throughput.

Two other settings influence publishing: batching mode and sync mode. Batching controls how data rows are written to the socket. They can be written individually or grouped together in either envelope or transaction batches. Envelopes group individual rows together to send to the ESP project and are read together from the socket by the project. This improves network throughput. Transaction batches, like envelope batches, are also written and read in groups. However, with transaction batches, the platform only processes the group if all the rows in the batch are processed successfully. If one fails, the whole batch is rolled back
Note: When using shine-through to preserve previous values for data that are null in an update record, publish rows individually or in envelopes, rather than in transaction batches.
.

Sync mode settings control the publishing handshake between the SDK and the ESP project. By default, the SDK keeps sending data to the ESP project without waiting for acknowledgement. But if sync mode is set to true, the SDK waits for acknowledgement from the ESP project before sending the next batch of data. This provides an application level delivery guarantee, but it reduces throughput.

There are certain considerations to keep in mind when using callback or select mode publishing. These modes are driven by the ESP_PUBLISHER_EVENT_READY event, which indicates that the publisher is ready to accept more data. In response, you can publish data or issue a commit, but only one such action is permitted in response to a single ESP_PUBLISHER_EVENT_READY event.

Publishing in async mode improves throughput, but does not provide an application level delivery guarantee. Since TCP does not provide an application level delivery guarantee either, data in the TCP buffer could be lost when a client exits. Therefore, a commit must be executed before a client exit when publishing in async mode.

In general terms, the return code from a Publish call indicates whether or not the row was successfully transmitted. Any error that occurs during processing on the ESP project (such as a duplicate insert) will not get returned. The precise meaning of the return code from a Publish call depends on the access mode and the choice of synchronous or asynchronous transmission.

When using callback or select access mode, the return only indicates whether or not the SDK was able to queue the data. The indication of whether or not the data was actually written to the socket will be returned by the appropriate event. The callback and select access modes do not currently support synchronous publishing.

When using direct access mode, the type of transmission used determines what the return from the Publish call indicates. If publishing in asynchronous mode, the return only indicates that the SDK has written the data to the socket. If publishing in synchronous mode, the return from the Publish call indicates the response code the ESP project sent.

In no case will errors that occur during processing on the ESP project (such as a duplicate insert) be returned by a Publish call.

Like all entities, if you intend to work in callback mode with a Publisher and want to get notified, register the callback handler before the event is triggered. For example:
esp_publisher_options_set_access_mode(options, CALLBACK_ACCESS, error);
esp_publisher_set_callback(publisher, events, callback, NULL, error)
esp_publisher_connect(publisher, error);

The following code snippets illustrate different ways of publishing data.

The first example shows publishing in direct access mode with transaction blocks.
    EspCredentials * creds = esp_credentials_create(ESP_CREDENTIALS_USER_PASSWORD, error);
    esp_credentials_set_user(creds, “user”, error);
    esp_credentials_set_password(creds, “password”, error);
                 // create publisher with default options from an existing EspProject 
    publisher = esp_project_create_publisher(project, creds, error);
    esp_credentials_free(creds, error);
    int rc = esp_publisher_connect(publisher, error);
                 // connect the publisher
    const EspStream * stream = esp_project_get_stream(project, "Stream1", error); 
                // retrieve EspStream we want to publish to
    const EspSchema * schema = esp_stream_get_schema(stream, error);
                // determine its schema
    EspMessageWriter * writer = esp_publisher_get_writer(publisher, stream, error);
                // create EspMessageWriter to publish to "Stream1"
    EspRelativeRowWriter * row_writer = esp_message_writer_get_relative_rowwriter(writer, error);

    int32_t numcols;
    esp_schema_get_numcolumns(schema, &numcols, error);       // number of columns in "Stream1"

    int32_t intvalue = 10;
    bool inblock = false;

    while (....) {                      // your logic to determine how long to publish
        if (!inblock) {                 // your logic to determine if to start a transaction
            esp_message_writer_start_transaction(writer, 0, NULL);
            inblock = true;
        }
        esp_relative_rowwriter_start_row(row_writer, NULL);         // start a data row
        int32_t coltype;

        for (int i = 0; i < numcols; ++i) {
            esp_schema_get_column_type(schema, i, &coltype, error);
            switch (coltype) {
                case ESP_DATATYPE_INTEGER:
                    esp_relative_rowwriter_set_integer(row_writer, intvalue++, error);
                    break;
                // ...
                // Code to fill in other data types goes here ....
                // ...
                // NOTE - you must fill in all data fields, with NULLs is needed
                default:
                    esp_relative_rowwriter_set_null(row_writer, error);
                    break;
            }
        }
        esp_relative_rowwriter_end_row(row_writer, error);                  // end the data row

        if ((nrows % 60) == 0) {                         
                        // determine if the batch is to be ended, we code for 60 rows per block
            esp_message_writer_end_block(writer, error);    
                        // end the batch started in esp_message_writer_start_transaction()
            esp_publisher_publish(publisher, writer, error);               // publish the batch
            inblock = false;
        }
    }
    esp_publisher_close(publisher, error);                          // done with publishing
This example shows publishing in callback access mode.
    int rc;
    EspPublisherOptions * options = esp_publisher_options_create(error);
                // create EspPublisherOptions 
    rc = esp_publisher_options_set_access_mode(options, CALLBACK_ACCESS, error);   
                // set access mode
    publisher = esp_project_create_publisher(project, options, error);
                // create EspPublisher using the options above from existing EspProject
    esp_publisher_options_free(options, error);                    // free EspPublisherOptions
    rc = esp_publisher_set_callback(publisher, ESP_PUBLISHER_EVENT_ALL, publish_callback,
    NULL, m_error);   // set callback handler
    rc = esp_publisher_connect(publisher, error);                  // connect publisher

    ...
    ...
    ...

    // Handler function
    void publish_callback(const EspPublisherEvent * event, void * user_data)
    {
        EspPublisher * publisher = NULL;
        EspMessageWriter * mwriter = NULL;
        EspRelativeRowWriter * row_writer = NULL;
        EspProject * project = NULL;
        const EspStream * stream = NULL;
        const EspSchema * schema = NULL;

        EspError * error = esp_error_create();

        int rc;
        uint32_t type;

        publisher = esp_publisher_event_get_publisher(event, error);
        rc = esp_publisher_event_get_type(event, &type, error);

        switch (type)
        {
            case ESP_PUBLISHER_EVENT_CONNECTED:
                // EspProject, EspStream, EspSchema can be retrieved from the EspPublisherEvent
                // if required
                project = esp_publisher_get_project(publisher, error);
                stream = esp_project_get_stream(project, "Stream1", error);
                schema = esp_stream_get_schema(stream, error);
                break;

            case ESP_PUBLISHER_EVENT_READY:

                // populate EspMessageWriter with data to publish

                rc = esp_publisher_publish(publisher, mwriter, error);
                break;

            case ESP_PUBLISHER_EVENT_DISCONNECTED:
                esp_publisher_close(publisher, error);
                break;

            case ESP_PUBLISHER_EVENT_CLOSED:
                break;
        }

        if (error)
            esp_error_free(error);

    }