Perform callbacks for new data.
SubscriberOptions.Builder builder = new SubscriberOptions.Builder(); builder.setAccessMode(AccessMode.CALLBACK); builder.setPulseInterval(pulseInterval); SubscriberOptions opts = builder.create();
Subscriber sub = project.createSubscriber(opts); sub.setCallback(EnumSet.allOf(SubscriberEvent.Type.class), this); sub.subscribeStream(streamName); sub.connect();
public void processEvent(SubscriberEvent event) {
switch (event.getType()) {
case SYNC_START: dataFromLogstore=true; break;
case SYNC_END: dataFromLogStore=false; break; case ERROR: handleError(event); break;
case DATA: handleData(event); break;
case DISCONNECTED: cleanupExit(); break;
}
}
public void handleData(SubscriberEvent event) {
MessageReader reader = event.getMessageReader();
String streamName= event.getStream().getName();
while ( reader.hasNextRow() ) {
RowReader row = reader.nextRowReader();
int ops= row.getOperation().code();
String[] colNames=row.getSchema().getColumnNames();
List record = new ArrayList<Object>();
for (int j = 0; index = 0; j < row.getSchema().getColumnCount(); ++j) {
if ( row.isNull(j)) { record.add(index,null); index++; continue; }
switch ( row.getSchema().getColumnTypes()[j]) {
case BOOLEAN: record.add(j, row.getBoolean(j));break;
case INTEGER: record.add(j, row.getInteger(j));break;
case TIMESTAMP: record.add(j, row.getTimestamp(j)); break;
}//switch
}//for loop
sendRecordToExternalDataSource(record);
}//while loop
}//handleData