Subscribing Using Callback

Perform callbacks for new data.

  1. Create the subscriber options:
    SubscriberOptions.Builder builder = new SubscriberOptions.Builder();
    builder.setAccessMode(AccessMode.CALLBACK);
    builder.setPulseInterval(pulseInterval);
    SubscriberOptions opts = builder.create();
    
    Set the access mode to CALLBACK and the pulse interval for how often you wish to make the callback.
  2. Create the subscriber and register the callback:
    Subscriber sub = project.createSubscriber(opts);
    	sub.setCallback(EnumSet.allOf(SubscriberEvent.Type.class), this);
    	sub.subscribeStream(streamName);
    	sub.connect();
    
    sub.setCallback is the class which implements the processEvent method and gets called by the callback mechanism.
  3. Create the callback class, which is used to register with the subscriber.
    1. Implement Callback<SubscriberEvent>.
    2. Implement the getName() and processEvent(SubsriberEvent) methods.
      	 public void processEvent(SubscriberEvent event) {
      		 switch (event.getType()) {
      			case SYNC_START: 	dataFromLogstore=true; 	break;
      			case SYNC_END: 	dataFromLogStore=false;	break;   			 case ERROR: 	handleError(event);	 	break; 
      			case DATA: 		handleData(event); 		break;
      			case DISCONNECTED: cleanupExit();		break;
      		}
      	}
      
    A separate method named handleData is declared in this example, which is referenced in Step 4. The name of the method is variable.
    Note: When the event is received, the callback mechanism calls processEvent and passes the event to it.
  4. (Optional) Use handleData to complete a separate method to retrieve and use subscribed data. Otherwise, data can be directly processed in processEvent:
    public void handleData(SubscriberEvent event) { 
    		MessageReader reader = event.getMessageReader();
    		 String streamName= event.getStream().getName();
    		while ( reader.hasNextRow() ) {
    			 RowReader row = reader.nextRowReader();
    			 int ops= row.getOperation().code();
    			String[] colNames=row.getSchema().getColumnNames();
    			List record = new ArrayList<Object>();
    			for (int j = 0; index = 0; j < row.getSchema().getColumnCount(); ++j) {
    				if ( row.isNull(j)) { record.add(index,null);  index++;  continue;  }
    				switch ( row.getSchema().getColumnTypes()[j]) {
    				      case BOOLEAN: record.add(j, row.getBoolean(j));break;
    				      case INTEGER: record.add(j, row.getInteger(j));break;
    				      case TIMESTAMP: record.add(j, row.getTimestamp(j)); break;
    				}//switch
    			}//for loop
    			sendRecordToExternalDataSource(record);
    		}//while loop
    	}//handleData
    The handleData event contains a message reader, gets the stream name, and uses the row reader to search for new rows as long as there is data being subscribed to. Datatypes are specified.