Subscribing to a Stream in Callback Mode

Subscribing in callback mode is a multistep process that involves creating a subscriber and callback registry, connecting to the subscriber, and then subscribing to a stream.

The following code snippets are provided to illustrate one way of subscribing and not as a complete, working example. Adapt this sample as necessary to suit your particular subscription scenario.

  1. Create a subscriber:
    NetEspSubscriberOptions options = new NetEspSubscriberOptions();
    options.set_mode(NetEspSubscriberOptions.NET_ESP_ACCESS_MODE_T.NET_CALLBACK_ACCESS);
    NetEspSubscriber subscriber = project.create_subscriber(options,error);
  2. Create the callback registry:
    NetEspSubscriber.SUBSCRIBER_EVENT_CALLBACK callbackInstance = new NetEspSubscriber.SUBSCRIBER_EVENT_CALLBACK(subscriber_callback);
    subscriber.set_callback(NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_ALL, callbackInstance, null, error);
    
  3. Connect to the subscriber:
    subscriber.connect(error);
  4. Subscribe to a stream:
    subscriber.subscribe_stream(stream, error);
    • Callback function implementation:
      Public static void subscriber_callback(NetEspSubscriberEvent event, ValueType
      data) {
              switch (evt.getType())
                  {
                     case (uint)(NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_CONNECTED):
                          Console.WriteLine("the callback happened: connected!");
                          break;
      (uint)( NetEspSubscriber.NET_ESP_SUBSCRIBER_EVENT.NET_ESP_SUBSCRIBER_EVENT_DATA):
              //handleData
              ...
      			break;
              
                      default:
                          break;
                  }
      }//end subscriber_callback
    • handleData implementation:
      NetEspRowReader row_reader = null;
      while ((row_reader = evt.getMessageReader().next_row(error)) != null) {
                      for (int i = 0; i < schema.get_numcolumns(); ++i) {
                          if ( row_reader.is_null(i) == 1) {
                              Console.Write("null, ");
                              continue;
                          }
                          switch (NetEspStream.getType(schema.get_column_type((uint)i, error)))
                          {
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_INTEGER:
                                  ivalue = row_reader.get_integer(i, error);
                                  Console.Write(ivalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_LONG:
                                  lvalue = row_reader.get_long(i, error);
                                  Console.Write(lvalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_FLOAT:
                                  fvalue = row_reader.get_float(i, error);
                                  Console.Write(fvalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_STRING:
                                  svalue = row_reader.get_string(i, error);
                                  Console.Write(svalue);
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_DATE:
                                  dvalue = row_reader.get_date(i, error);
                                  Console.Write(dvalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_TIMESTAMP:
                                  tvalue = row_reader.get_timestamp(i, error);
                                  Console.Write(tvalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_BOOLEAN:
                                  boolvalue = row_reader.get_boolean(i, error);
                                  Console.Write(boolvalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_BINARY:
                                  uint buffersize = 256;
                                  binvalue = row_reader.get_binary(i, buffersize, error);
                                  Console.Write(System.Text.Encoding.Default.GetString(binvalue) + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_INTERVAL:
                                  intervalue = row_reader.get_interval(i, error);
                                  Console.Write(intervalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_MONEY01:
                                  mon = row_reader.get_money(i, error);
                                  Console.Write(mon.get_long(error) + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_MONEY02:
                                  lvalue = row_reader.get_money_as_long(i, error);
                                  Console.Write(lvalue + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_MONEY03:
                                  mon = row_reader.get_money(i, error);
                                  Console.Write(mon.get_long(error) + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_MONEY10:
                                  mon = row_reader.get_money(i, error);
                                  Console.Write(mon.get_long(error) + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_MONEY15:
                                  mon = row_reader.get_money(i, error);
                                  Console.Write(mon.get_long(error) + ", ");
                                  break;
                              case NetEspStream.NET_DATA_TYPE_T.NET_ESP_DATATYPE_BIGDATETIME:
                                  bdt2 = row_reader.get_bigdatetime(i, error);
                                  long usecs = bdt2.get_microseconds(error);
                                  Console.Write(usecs + ", ");
                                  break;
                          }
                  }
      
                  rc = subscriber.disconnect(error); 
      }