Publishing and Subscribing

Create a publisher and subscriber, and implement a callback instance.

  1. Create the publisher:
    EspPublisherOptions* publisherOptions = esp_publisher_options_create (error); 
    Int rc
    EspPublisher * m_publisher = esp_project_create_publisher (m_project,publisherOptions,error);
    EspStream* m_stream = esp_project_get_stream (m_project,m_opts->target.c_str(),error);
    rc = esp_publisher_connect (m_publisher,error);
  2. Publish:
    Note: The sample code in this step includes syntax for adding rows to messages.
    EspMessageWriter* m_msgwriter   =   esp_publisher_get_writer (m_publisher,m_stream,error);
     EspRelativeRowWriter* m_rowwriter =  esp_message_writer_get_relative_rowwriter(m_msgwriter, error);
    	const EspSchema* m_schema = esp_stream_get_schema (m_stream,error);
    	int numColumns;
    	rc = esp_schema_get_numcolumns (m_schema, &numColumns,error);
    	rc = esp_message_writer_start_envelope(m_msgwriter, 0, error);
    	rc = esp_relative_rowwriter_start_row(m_rowwriter, error);
    	rc = esp_relative_rowwriter_set_operation(m_rowwriter, (const ESP_OPERATION_T)opcode, error);
    int32_t colType;
    	for (int j = 0;j < numColumns;j++){
    	rc = esp_schema_get_column_type (m_schema,j,&colType,error);
    	switch (type){
    		case ESP_DATATYPE_INTEGER:
    			memcpy (&integer_val,(int32_t *)(dataValue),sizeof(uint32_t));
    			rc = esp_relative_rowwriter_set_integer(m_rowwriter, integer_val, error);
    			break;
    		case ESP_DATATYPE_LONG:
    			memcpy (&long_val,(int64_t *)(dataValue),sizeof(int64_t));
    			rc = esp_relative_rowwriter_set_long(m_rowwriter, long_val, error);
    			break;
    		}
    }//for
    rc = esp_relative_rowwriter_end_row(m_rowwriter, error);
    rc = esp_message_writer_end_block(m_msgwriter, error);
    rc = esp_publisher_publish(m_publisher, m_msgwriter, error);
    
    
  3. Create the subscriber options:
    EspSubscriberOptions * m_subscriberOptions = esp_subscriber_options_create (error);
    	int rc = esp_subscriber_options_set_access_mode(options, CALLBACK_ACCESS, m_error);
    	EspSubscriber * m_subscriber = esp_project_create_subscriber (m_project,m_subscriberOptions,error);
    	rc = esp_subscriber_options_free(options, m_error);
    	rc = esp_subscriber_set_callback(subscriber , ESP_SUBSCRIBER_EVENT_ALL, 
    			subscriber_callback, NULL, m_error);
    	 subscriber_callback is global function which will get called up.
    
  4. Subscribe using callback:
    void subscriber_callback(const EspSubscriberEvent * event, void * data) {
    	uint32_t type;
    	rc = esp_subscriber_event_get_type(event, &type, error);
    	 switch (type) {
            		case ESP_SUBSCRIBER_EVENT_CONNECTED:	init(event,error);break;
    		case ESP_SUBSCRIBER_EVENT_SYNC_START: 	fromLogStore = true; break;
            		case ESP_SUBSCRIBER_EVENT_SYNC_END:	fromLogStore = false; break;
            		case ESP_SUBSCRIBER_EVENT_DATA:		handleData(event,error); break;
            		case ESP_SUBSCRIBER_EVENT_DISCONNECTED:	cleanupaExit(); break;
            		case ESP_SUBSCRIBER_EVENT_ERROR:	handleError(event,error); break;
    	}
    }//end subscriber_callback