Publishing

The SDK provides a number of different options to publish data to a project.

The steps involved in publishing data are.
  1. Retrieve a Publisher for the project to which you need to publish. You can create a Publisher directly or from a previously retrieved and connected Project object.
  2. Create a MessageWriter for the stream to publish to. You can create multiple MessageWriters from a single Publisher.
  3. Create a RelativeRowWriter method.
  4. Format the data buffer to publish using RelativeRowWriter methods.
  5. Publish the data.

While Publisher is thread safe, MessageWriter and RelativeRowWriter are not. Therefore, ensure that you synchronize access to the latter two.

The SDK supports automatic publisher switchover in high availability (HA) configurations, except when publishing asynchronously. Switching over automatically in this instance risks dropping or duplicating records because the SDK does not know which records have been published.

The SDK provides a number of options to tune the behaviour of a Publisher. Specify these options using the PublisherOptions object when the Publisher is created. Once created, options cannot be changed. Like all other entities in the SDK, publishing supports the direct, callback, and select access modes.

In addition to access modes, the SDK supports internal buffering. When publishing is buffered, the data is first written to an internal queue. This is picked up by a publishing thread and then written to the ESP project. Note that buffering is possible only in direct access mode. Direct access mode together with buffered publishing potentially provides the best throughput.

Two other settings influence publishing: batching mode and sync mode. Batching controls how data rows are written to the socket. They can be written individually, or grouped together in envelope or transaction batches. Envelopes group individual rows together to send to the ESP project and are read together from the socket by the project. This improves network throughput. Transaction batches, like envelopes, are also written and read in groups. However, with transaction batches, the ESP project only processes the group if all the rows in the batch are processed successfully. If one fails, the whole batch is rolled back.
Note: When using shine-through to preserve previous values for data that are null in an update record, publish rows individually or in envelopes, rather than in transaction batches.

Sync mode settings control the publishing handshake between the SDK and the ESP project. By default, the SDK keeps sending data to the ESP project without waiting for an acknowledgement. But if sync mode is set to true, the SDK waits for an acknowledgement from the ESP project each time it sends data before sending more. Issue an explicit Publisher.commit() call when publishing in asynchronous mode before the client application exits or closes the Publisher. This is to guarantee that all data written is read by the ESP project.

In general terms, the return code from a Publish call indicates whether or not the row was successfully transmitted. Any error that occurs during processing on the ESP project (such as a duplicate insert) will not get returned. The precise meaning of the return code from a Publish call depends on the access mode and the choice of synchronous or asynchronous transmission.

When using callback or select access mode, the return only indicates whether or not the SDK was able to queue the data. The indication of whether or not the data was actually written to the socket will be returned by the appropriate event. The callback and select access modes do not currently support synchronous publishing.

When using direct access mode, the type of transmission used determines what the return from the Publish call indicates. If publishing in asynchronous mode, the return only indicates that the SDK has written the data to the socket. If publishing in synchronous mode, the return from the Publish call indicates the response code the ESP project sent.

In no case will errors that occur during processing on the ESP project (such as a duplicate insert) be returned by a Publish call.

There are certain considerations to keep in mind when using callback or select mode publishing. These modes are driven by the PublisherEvent.READY event, which indicates that the publisher is ready to accept more data. In response, users can publish data or issue a commit, but only one such action is permitted in response to a single PublisherEvent.READY event.

Like all entities, if you intend to work in callback mode with a Publisher and want to get notified, register the callback handler before the event is triggered. For example:
PublisherOptions opts = new PublisherOptions.Builder().setAccessMode(AccessMode.CALLBACK).create();
		Publisher publisher = project.createPublisher(opts);
		PublisherHandler handler = new PublisherHandler();
		publisher.setCallback(EnumSet.allOf(PublisherEvent.Type.class), handler);
		publisher.connect();

Below are some code snippets that illustrate different ways of publishing data.

This example demonstrates publishing in direct access mode with transaction blocks.

    // The Project must be connected first

    project.connect(60000);

    

    Publisher publisher = project.createPublisher();

    publisher.connect();



    Stream stream = project.getStream("Stream");    

    MessageWriter mw = publisher.getMessageWriter(s);

    RelativeRowWriter writer = mw.getRelativeRowWriter();

    // It is more efficient to cache this

    DataType[] types = stream.getEffectiveSchema().getColumnTypes();

    

    // Your logic to loop over data to publish

    while (true) {

    

        // Logic to determine if to start a transaction block

        if ( ...)

            mw.startTransaction(0);

        

        // Loop over rows in a single block            

        while (true) {

        

            // Loop over columns in a row

            for (i = ...) {

            

                writer.startRow();

                writer.setOperation(Operation.INSERT);

                

                switch (types[i]) {            

                case DATE:

                    writer.setDate(datevalue);

                    break;

                case DOUBLE:

                    writer.setDouble(doublevalue);

                    break;

                case INTEGER:

                    writer.setInteger(intvalue);

                    break;

                case LONG:

                    writer.setLong(longvalue);

                    break;

                case MONEY:

                    break;

                case STRING:

                    writer.setString(stringvalue);

                    break;

                case TIMESTAMP:

                    writer.setTimestamp(tsvalue);

                    break;

                //

                // Other data types

                //

                }

            }

            writer.endRow();

        }

        

        // Logic to determine if to end block

        if ( ...)

            mw.endBlock();

    }

    

    publisher.commit();

This example demonstrates publishing in callback access mode. Notice that the access mode is set before the publisher is connected; this is a requirement for both callback and select access modes.

    p.connect(60000);

    

    PublisherOptions opts = new PublisherOptions.Builder()

        .setAccessMode(AccessMode.CALLBACK)

        .create();

    

    Publisher pub = p.createPublisher(opts);

    

    PublisherHandler handler = new PublisherHandler();

    pub.setCallback(EnumSet.allOf(PublisherEvent.Type.class), handler);

    pub.connect();

    

    // Block/wait. Publishing happens in the callback handler

    // ....



    //

    // Publisher callback handler

    //

    public class PublisherHandler implements Callback<PublisherEvent>


    {

        Stream m_stream;

        MessageWriter m_mwriter;

        RelativeRowWriter m_rowwriter;

        

        public PublisherHandler() throws IOException {

        }



        public String getName() {

            return "PublishHandler";

        }



        public void processEvent(PublisherEvent event)

        {

            switch (event.getType()) {

            case CONNECTED:

                // It is advisable to create and cache these

                try {

                    m_stream = event.getPublisher().getProject().getStream("Stream");

                    m_mwriter = event.getPublisher().getMessageWriter(m_stream);

                    m_rowwriter = m_mwriter.getRelativeRowWriter();

                } catch (IOException e) {

                    e.printStackTrace();

                }

                break;



            case READY:

                // Publishing code goes here.

                // NOTE: Only a single publish or a commit call can be made in one READY callback

                

                break;



            case ERROR:

            case DISCONNECTED:

            case CLOSED:

                break;

            }

        }

    }