Converting Publishers to Adapters

The third step in the migration process, after connecting to an Event Stream Processor project and upgrading the databases, is upgrading your RAP publishers so that they can publish data to Event Stream Processor. 

RAP publishers are standalone programs that need to be started manually. They use the RAP Publisher API to publish data.  The least invasive migration route is to replace the RAP Publisher API calls with the ESP C SDK calls. In doing so, the publisher becomes an external adapter to Event Stream Processor. However, this adapter is not managed by Event Stream Processor, and therefore, cannot be started or stopped by it. If you want to manage your publisher through Event Stream Processor, see the Event Stream Processor Adapters Guide for more information. Note that when creating a managed adapter, the publisher must run on the same machine as Event Stream Processor.

Follow these general steps to move your existing publishers to use the new C SDK. For detailed information on the C SDK, see the Event Stream Processor C SDK documentation.

  1. Your publisher needs certain information to connect to Event Stream Processor.
    Property Description
    ESPURI The URI needed to connect to a project. For example, esp://localhost:19011/default/RAP.
    Encryption Indicates whether encryption is enabled. See the Event Stream Processor SDK esp_uri_set_ssl API.
    AuthenticationType Indicates how the client is authenticated. Possible values are USER_PASSWORD, SERVER_RSA, and KERBEROS.
    Username User name for connecting to the ESP cluster. This is needed for authentication.
    Password The user password. Required for authentication.
  2. Connect to your project:
    1. Use esp_credentials_create() API to set the authentication type.
    2. If the authentication type is USER_PASSWORD (Native OS), use esp_credentials_set_user() and esp_credentials_set_password() to set the username and password.
    3. If the authentication type is SERVER_RSA, use esp_credentials_set_keyfile to set the key file.
    4. If the authentication type is KERBEROS, use esp_credentials_set_user() to set the username. There are additional environment variables that you need to set to use this authentication type. See the C SDK documentation for more information.
    5. Use esp_uri_create_string() to set the uri provided to connect to the project.
    6. If you provided the encryption property and set it to true, use esp_uri_set_ssl() API.
    7. Connect to the project using the esp_project_get() API and then the esp_project_connect() API.
    8. Create an Event Stream Processor publisher object using esp_project_create_publisher() API. Use these publisher options for optimal performance:
      • Mode: EXPLICIT_BLOCKING
      • Access: DIRECT_ACCESS
      • Async: true
      • Buffer Size: 4096
    9. Connect to the publisher object using the esp_publisher_connect() API.
    10. Get socket MSS (Maximum Segment Size) for the ESP Server using these APIs to determine how much data the Server can accept within a packet. This helps you maximize network packet utilization.
      • esp_socket_details_create()
      • esp_publisher_get_socket_details()
      • esp_socket_details_get_mss()
  3. Get access to an output stream and publish data.
    1. Get the stream object using the esp_project_get_stream() API.
    2. Create a message writer object by passing in the publisher and stream objects as parameters into the esp_publisher_get_writer() API.
    3. Open a new envelope using esp_message_writer_start_envelope() API. You can also use transaction by using esp_message_writer_start_transaction() API. Transactions are atomic whereas envelopes are not.
    4. Start adding a new row (esp_relative_rowwriter_start_row() API) and set the column values by calling the SDK APIs. This table displays mapping between Event Stream Processor datatypes and the C SDK APIs:
      Event Stream Processor Datatype Event Stream Processor C SDK API
      integer esp_relative_rowwriter_set_integer
      long esp_relative_rowwriter_set_long
      float esp_relative_rowwriter_set_float
      timestamp esp_relative_rowwriter_set_timestamp
      bigdatetime esp_relative_rowwriter_set_bigdatetime
      date esp_relative_rowwriter_set_date
      interval esp_relative_rowwriter_set_interval
      string esp_relative_rowwriter_set_string
    5. Keep track of the amount of data written. If you reach the MSS size, close the envelope using the esp_message_writer_end_block() API and publish this envelope using the esp_publisher_publish() API. At this point, start a new envelope. If you do not want to use MSS, you can always send a set number of rows in an envelope. This depends on the amount of data in a row. In this case, find the optimal block size by trying out different values of block size. For the RAP Sample Project, the block size of 4096 gave optimal performance.