Perform callbacks for new data.
SubscriberOptions.Builder builder = new SubscriberOptions.Builder(); builder.setAccessMode(AccessMode.CALLBACK); builder.setPulseInterval(pulseInterval); SubscriberOptions opts = builder.create();
Subscriber sub = project.createSubscriber(opts); sub.setCallback(EnumSet.allOf(SubscriberEvent.Type.class), this); sub.subscribeStream(streamName); sub.connect();
public void processEvent(SubscriberEvent event) { switch (event.getType()) { case SYNC_START: dataFromLogstore=true; break; case SYNC_END: dataFromLogStore=false; break; case ERROR: handleError(event); break; case DATA: handleData(event); break; case DISCONNECTED: cleanupExit(); break; } }
public void handleData(SubscriberEvent event) { MessageReader reader = event.getMessageReader(); String streamName= event.getStream().getName(); while ( reader.hasNextRow() ) { RowReader row = reader.nextRowReader(); int ops= row.getOperation().code(); String[] colNames=row.getSchema().getColumnNames(); List record = new ArrayList<Object>(); for (int j = 0; index = 0; j < row.getSchema().getColumnCount(); ++j) { if ( row.isNull(j)) { record.add(index,null); index++; continue; } switch ( row.getSchema().getColumnTypes()[j]) { case BOOLEAN: record.add(j, row.getBoolean(j));break; case INTEGER: record.add(j, row.getInteger(j));break; case TIMESTAMP: record.add(j, row.getTimestamp(j)); break; }//switch }//for loop sendRecordToExternalDataSource(record); }//while loop }//handleData