The SDK provides various options for subscribing to a project.
EspError * error = esp_error_create(); esp_sdk_start(error); EspUri * project_uri = esp_uri_create_string("esp://server:port//default/vwap", error); EspCredentials * creds = esp_credentials_create(ESP_CREDENTIALS_USER_PASSWORD, error); esp_credentials_set_user(creds, “user”, error); esp_credentials_set_password(creds, “password”, error); EspProject * project = esp_project_get(project_uri, creds, NULL, error); rc = esp_project_connect(project, error); // Reusing credentials for the subscriber EspSubscriber * subscriber = esp_project_create_subscriber(project, creds, error); // Now free credentials esp_credentials_free(creds, error); rc = esp_subscriber_connect(subscriber, error); EspStream * stream = esp_project_get_stream(project, "Trades", error); rc = esp_subscriber_subsribe(subscriber, stream, error); while (true) { EspSubscriberEvent * event = esp_subscriber_get_next_event(subscriber, error); // process event data // delete event esp_subscriber_event_free(event); } esp_subscriber_close(subscriber, error); esp_sdk_close();
// stream for this event const EspStream * stream = esp_subscriber_event_get_stream(event, error); // get message reader EspMessageReader * reader = esp_subscriber_event_get_reader(event, error); int rc = esp_message_reader_is_block(reader, &flag, error); // get the stream schema if you do not have it const EspSchema * schema = esp_stream_get_schema(stream, error); EspRowReader * row_reader; int32_t int_value; int numcolumns = 0, numrows = 0; int type; // need to know how many columns are there rc = esp_schema_get_numcolumns(schema, &numcolumns, error); // loop until we finish all rows while ((row_reader = esp_message_reader_next_row(reader, error)) != NULL) { for (int i = 0; i < numcolumns; ++i) { // if column is null, skip rc = esp_row_reader_is_null(row_reader, i, &flag, error); if ( flag ) continue; rc = esp_schema_get_column_type(schema, i, &type, error); switch ( type ) { case ESP_DATATYPE_INTEGER: rc = esp_row_reader_get_integer(row_reader, i, &int_value, error); break; case ESP_DATATYPE_LONG: rc = esp_row_reader_get_long(row_reader, i, &long_value, error); break; case ESP_DATATYPE_FLOAT: rc = esp_row_reader_get_float(row_reader, i, &double_value, error); // ... // other data types // ... } } }