Perform callbacks for new data.
SubscriberOptions.Builder builder = new SubscriberOptions.Builder(); builder.setAccessMode(AccessMode.CALLBACK); builder.setPulseInterval(pulseInterval); SubscriberOptions opts = builder.create();
Subscriber sub = project.createSubscriber(opts); sub.setCallback(EnumSet.allOf(SubscriberEvent.Type.class), this); sub.subscribeStream(streamName); sub.connect();
	 public void processEvent(SubscriberEvent event) {
		 switch (event.getType()) {
			case SYNC_START: 	dataFromLogstore=true; 	break;
			case SYNC_END: 	dataFromLogStore=false;	break;   			 case ERROR: 	handleError(event);	 	break; 
			case DATA: 		handleData(event); 		break;
			case DISCONNECTED: cleanupExit();		break;
		}
	}
public void handleData(SubscriberEvent event) { 
		MessageReader reader = event.getMessageReader();
		 String streamName= event.getStream().getName();
		while ( reader.hasNextRow() ) {
			 RowReader row = reader.nextRowReader();
			 int ops= row.getOperation().code();
			String[] colNames=row.getSchema().getColumnNames();
			List record = new ArrayList<Object>();
			for (int j = 0; index = 0; j < row.getSchema().getColumnCount(); ++j) {
				if ( row.isNull(j)) { record.add(index,null);  index++;  continue;  }
				switch ( row.getSchema().getColumnTypes()[j]) {
				      case BOOLEAN: record.add(j, row.getBoolean(j));break;
				      case INTEGER: record.add(j, row.getInteger(j));break;
				      case TIMESTAMP: record.add(j, row.getTimestamp(j)); break;
				}//switch
			}//for loop
			sendRecordToExternalDataSource(record);
		}//while loop
	}//handleData