Subscribing Using Callback

Perform callbacks for new data.

  1. Create the subscriber options:
    SubscriberOptions.Builder builder = new SubscriberOptions.Builder();
    builder.setAccessMode(AccessMode.CALLBACK);
    builder.setPulseInterval(pulseInterval);
    SubscriberOptions opts = builder.create();
    
    Set the access mode to CALLBACK and the pulse interval for how often you want to make the callback.
  2. Create the subscriber and register the callback:
    Subscriber sub = project.createSubscriber(opts);
    	sub.setCallback(EnumSet.allOf(SubscriberEvent.Type.class), this);
    	sub.subscribeStream(streamName);
    	sub.connect();
    
    sub.setCallback is the class that implements the processEvent method and gets called by the callback mechanism.
  3. Create the callback class, which registers with the subscriber.
    1. Implement Callback<SubscriberEvent>.
    2. Implement the getName() and processEvent(SubsriberEvent) methods.
      	 public void processEvent(SubscriberEvent event) {
      		 switch (event.getType()) {
      			case SYNC_START: 	dataFromLogstore=true; 	break;
      			case SYNC_END: 	dataFromLogStore=false;	break;   			 case ERROR: 	handleError(event);	 	break; 
      			case DATA: 		handleData(event); 		break;
      			case DISCONNECTED: cleanupExit();		break;
      		}
      	}
      
    A separate method named handleData is declared in this example, which is referenced in Step 4. The name of the method is variable.
    Note: When the event is received, the callback mechanism calls processEvent and passes the event to it.
  4. (Optional) Use handleData to complete a separate method to retrieve and use subscribed data. Otherwise, data can be directly processed in processEvent:
    public void handleData(SubscriberEvent event) { 
    		MessageReader reader = event.getMessageReader();
    		 String streamName= event.getStream().getName();
    		while ( reader.hasNextRow() ) {
    			 RowReader row = reader.nextRowReader();
    			 int ops= row.getOperation().code();
    			String[] colNames=row.getSchema().getColumnNames();
    			List record = new ArrayList<Object>();
    			for (int j = 0; index = 0; j < row.getSchema().getColumnCount(); ++j) {
    				if ( row.isNull(j)) { record.add(index,null);  index++;  continue;  }
    				switch ( row.getSchema().getColumnTypes()[j]) {
    				      case BOOLEAN: record.add(j, row.getBoolean(j));break;
    				      case INTEGER: record.add(j, row.getInteger(j));break;
    				      case TIMESTAMP: record.add(j, row.getTimestamp(j)); break;
    				}//switch
    			}//for loop
    			sendRecordToExternalDataSource(record);
    		}//while loop
    	}//handleData
    The handleData event contains a message reader, gets the stream name, and uses the row reader to search for new rows as long as there is data being subscribed to. Datatypes are specified.