Create a publisher and subscriber, and implement a callback instance.
EspPublisherOptions* publisherOptions = esp_publisher_options_create (error); Int rc EspPublisher * m_publisher = esp_project_create_publisher (m_project,publisherOptions,error); EspStream* m_stream = esp_project_get_stream (m_project,m_opts->target.c_str(),error); rc = esp_publisher_connect (m_publisher,error);
EspMessageWriter* m_msgwriter = esp_publisher_get_writer (m_publisher,m_stream,error); EspRelativeRowWriter* m_rowwriter = esp_message_writer_get_relative_rowwriter(m_msgwriter, error); const EspSchema* m_schema = esp_stream_get_schema (m_stream,error); int numColumns; rc = esp_schema_get_numcolumns (m_schema, &numColumns,error); rc = esp_message_writer_start_envelope(m_msgwriter, 0, error); rc = esp_relative_rowwriter_start_row(m_rowwriter, error); rc = esp_relative_rowwriter_set_operation(m_rowwriter, (const ESP_OPERATION_T)opcode, error); int32_t colType; for (int j = 0;j < numColumns;j++){ rc = esp_schema_get_column_type (m_schema,j,&colType,error); switch (type){ case ESP_DATATYPE_INTEGER: memcpy (&integer_val,(int32_t *)(dataValue),sizeof(uint32_t)); rc = esp_relative_rowwriter_set_integer(m_rowwriter, integer_val, error); break; case ESP_DATATYPE_LONG: memcpy (&long_val,(int64_t *)(dataValue),sizeof(int64_t)); rc = esp_relative_rowwriter_set_long(m_rowwriter, long_val, error); break; } }//for rc = esp_relative_rowwriter_end_row(m_rowwriter, error); rc = esp_message_writer_end_block(m_msgwriter, error); rc = esp_publisher_publish(m_publisher, m_msgwriter, error);
EspSubscriberOptions * m_subscriberOptions = esp_subscriber_options_create (error); int rc = esp_subscriber_options_set_access_mode(options, CALLBACK_ACCESS, m_error); EspSubscriber * m_subscriber = esp_project_create_subscriber (m_project,m_subscriberOptions,error); rc = esp_subscriber_options_free(options, m_error); rc = esp_subscriber_set_callback(subscriber , ESP_SUBSCRIBER_EVENT_ALL, subscriber_callback, NULL, m_error); subscriber_callback is global function which will get called up.
void subscriber_callback(const EspSubscriberEvent * event, void * data) { uint32_t type; rc = esp_subscriber_event_get_type(event, &type, error); switch (type) { case ESP_SUBSCRIBER_EVENT_CONNECTED: init(event,error);break; case ESP_SUBSCRIBER_EVENT_SYNC_START: fromLogStore = true; break; case ESP_SUBSCRIBER_EVENT_SYNC_END: fromLogStore = false; break; case ESP_SUBSCRIBER_EVENT_DATA: handleData(event,error); break; case ESP_SUBSCRIBER_EVENT_DISCONNECTED: cleanupaExit(); break; case ESP_SUBSCRIBER_EVENT_ERROR: handleError(event,error); break; } }//end subscriber_callback