The SDK provides a number of different options to publish data to a project.
While Publisher is thread safe, MessageWriter and RelativeRowWriter are not. Therefore, ensure that you synchronize access to the latter two.
The SDK supports automatic publisher switchover in high availability (HA) configurations, except when publishing asynchronously. Switching over automatically in this instance risks dropping or duplicating records because the SDK does not know which records have been published.
The SDK provides a number of options to tune the behavior of a Publisher. Specify these options using the PublisherOptions object when the Publisher is created. Once created, options cannot be changed. Like all other entities in the SDK, publishing supports the direct, callback, and select access modes.
In addition to access modes, the SDK supports internal buffering. When publishing is buffered, the data is first written to an internal queue. This is picked up by a publishing thread and then written to the ESP project. Note that buffering is possible only in direct access mode. Direct access mode together with buffered publishing potentially provides the best throughput.
Sync mode settings control the publishing handshake between the SDK and the ESP project. By default, the SDK keeps sending data to the ESP project without waiting for an acknowledgement. But if sync mode is set to true, the SDK waits for an acknowledgement from the ESP project each time it sends data before sending more. Issue an explicit Publisher.commit() call when publishing in asynchronous mode before the client application exits or closes the Publisher. This is to guarantee that all data written is read by the ESP project.
In general terms, the return code from a Publish call indicates whether or not the row was successfully transmitted. Any error that occurs during processing on the ESP project (such as a duplicate insert) will not get returned. The precise meaning of the return code from a Publish call depends on the access mode and the choice of synchronous or asynchronous transmission.
When using callback or select access mode, the return only indicates whether or not the SDK was able to queue the data. The indication of whether or not the data was actually written to the socket will be returned by the appropriate event. The callback and select access modes do not currently support synchronous publishing.
When using direct access mode, the type of transmission used determines what the return from the Publish call indicates. If publishing in asynchronous mode, the return only indicates that the SDK has written the data to the socket. If publishing in synchronous mode, the return from the Publish call indicates the response code the ESP project sent.
In no case will errors that occur during processing on the ESP project (such as a duplicate insert) be returned by a Publish call.
There are certain considerations to keep in mind when using callback or select mode publishing. These modes are driven by the PublisherEvent.READY event, which indicates that the publisher is ready to accept more data. In response, users can publish data or issue a commit, but only one such action is permitted in response to a single PublisherEvent.READY event.
PublisherOptions opts = new PublisherOptions.Builder().setAccessMode(AccessMode.CALLBACK).create(); Publisher publisher = project.createPublisher(opts); PublisherHandler handler = new PublisherHandler(); publisher.setCallback(EnumSet.allOf(PublisherEvent.Type.class), handler); publisher.connect();
Below are some code snippets that show different ways of publishing data. The sample code provided here is for illustration only; it does not comprise a complete, working example.
This example demonstrates publishing in direct access mode with transaction blocks.
// The Project must be connected first project.connect(60000); Publisher publisher = project.createPublisher(); publisher.connect(); Stream stream = project.getStream("Stream"); MessageWriter mw = publisher.getMessageWriter(s); RelativeRowWriter writer = mw.getRelativeRowWriter(); // It is more efficient to cache this DataType[] types = stream.getEffectiveSchema().getColumnTypes(); // Your logic to loop over data to publish while (true) { // Logic to determine if to start a transaction block if ( ...) mw.startTransaction(0); // Loop over rows in a single block while (true) { // Loop over columns in a row for (i = ...) { writer.startRow(); writer.setOperation(Operation.INSERT); switch (types[i]) { case DATE: writer.setDate(datevalue); break; case DOUBLE: writer.setDouble(doublevalue); break; case INTEGER: writer.setInteger(intvalue); break; case LONG: writer.setLong(longvalue); break; case MONEY: break; case STRING: writer.setString(stringvalue); break; case TIMESTAMP: writer.setTimestamp(tsvalue); break; // // Other data types // } } writer.endRow(); } // Logic to determine if to end block if ( ...) mw.endBlock(); } publisher.commit();
This example demonstrates publishing in callback access mode. Notice that the access mode is set before the publisher is connected; this is a requirement for both callback and select access modes.
p.connect(60000); PublisherOptions opts = new PublisherOptions.Builder() .setAccessMode(AccessMode.CALLBACK) .create(); Publisher pub = p.createPublisher(opts); PublisherHandler handler = new PublisherHandler(); pub.setCallback(EnumSet.allOf(PublisherEvent.Type.class), handler); pub.connect(); // Block/wait. Publishing happens in the callback handler // .... // // Publisher callback handler // public class PublisherHandler implements Callback<PublisherEvent> { Stream m_stream; MessageWriter m_mwriter; RelativeRowWriter m_rowwriter; public PublisherHandler() throws IOException { } public String getName() { return "PublishHandler"; } public void processEvent(PublisherEvent event) { switch (event.getType()) { case CONNECTED: // It is advisable to create and cache these try { m_stream = event.getPublisher().getProject().getStream("Stream"); m_mwriter = event.getPublisher().getMessageWriter(m_stream); m_rowwriter = m_mwriter.getRelativeRowWriter(); } catch (IOException e) { e.printStackTrace(); } break; case READY: // Publishing code goes here. // NOTE: Only a single publish or a commit call can be made in one READY callback break; case ERROR: case DISCONNECTED: case CLOSED: break; } } }