Create a publisher and subscriber, and implement a callback instance.
EspPublisherOptions* publisherOptions = esp_publisher_options_create (error); Int rc EspPublisher * m_publisher = esp_project_create_publisher (m_project,publisherOptions,error); EspStream* m_stream = esp_project_get_stream (m_project,m_opts->target.c_str(),error); rc = esp_publisher_connect (m_publisher,error);
EspMessageWriter* m_msgwriter = esp_publisher_get_writer (m_publisher,m_stream,error);
EspRelativeRowWriter* m_rowwriter = esp_message_writer_get_relative_rowwriter(m_msgwriter, error);
const EspSchema* m_schema = esp_stream_get_schema (m_stream,error);
int numColumns;
rc = esp_schema_get_numcolumns (m_schema, &numColumns,error);
rc = esp_message_writer_start_envelope(m_msgwriter, 0, error);
rc = esp_relative_rowwriter_start_row(m_rowwriter, error);
rc = esp_relative_rowwriter_set_operation(m_rowwriter, (const ESP_OPERATION_T)opcode, error);
int32_t colType;
for (int j = 0;j < numColumns;j++){
rc = esp_schema_get_column_type (m_schema,j,&colType,error);
switch (type){
case ESP_DATATYPE_INTEGER:
memcpy (&integer_val,(int32_t *)(dataValue),sizeof(uint32_t));
rc = esp_relative_rowwriter_set_integer(m_rowwriter, integer_val, error);
break;
case ESP_DATATYPE_LONG:
memcpy (&long_val,(int64_t *)(dataValue),sizeof(int64_t));
rc = esp_relative_rowwriter_set_long(m_rowwriter, long_val, error);
break;
}
}//for
rc = esp_relative_rowwriter_end_row(m_rowwriter, error);
rc = esp_message_writer_end_block(m_msgwriter, error);
rc = esp_publisher_publish(m_publisher, m_msgwriter, error);
EspSubscriberOptions * m_subscriberOptions = esp_subscriber_options_create (error); int rc = esp_subscriber_options_set_access_mode(options, CALLBACK_ACCESS, m_error); EspSubscriber * m_subscriber = esp_project_create_subscriber (m_project,m_subscriberOptions,error); rc = esp_subscriber_options_free(options, m_error); rc = esp_subscriber_set_callback(subscriber , ESP_SUBSCRIBER_EVENT_ALL, subscriber_callback, NULL, m_error); subscriber_callback is global function which will get called up.
void subscriber_callback(const EspSubscriberEvent * event, void * data) {
uint32_t type;
rc = esp_subscriber_event_get_type(event, &type, error);
switch (type) {
case ESP_SUBSCRIBER_EVENT_CONNECTED: init(event,error);break;
case ESP_SUBSCRIBER_EVENT_SYNC_START: fromLogStore = true; break;
case ESP_SUBSCRIBER_EVENT_SYNC_END: fromLogStore = false; break;
case ESP_SUBSCRIBER_EVENT_DATA: handleData(event,error); break;
case ESP_SUBSCRIBER_EVENT_DISCONNECTED: cleanupaExit(); break;
case ESP_SUBSCRIBER_EVENT_ERROR: handleError(event,error); break;
}
}//end subscriber_callback