Entity Lifecycles and Access Modes

In the Sybase® Event Stream Processor Java SDK, all entities share a common lifecycle and set of access modes.

User interaction in the Event Stream Processor (ESP) SDK is handled through entities the SDK exposes. The main entities are Server, Project, Publisher, and Subscriber. These entities correspond to the functional areas of the SDK. For example, the Server object represents a running instance of a cluster, the Project corresponds to a single project deployed to the cluster, the Publisher object deals with publishing data to a running project, and so on.

On initial retrieval an entity is considered to be open. When an entity is open, you can retrieve certain static information about it. To accomplish its assigned tasks, an entity has connect to the corresponding component in the cluster. A server connects to a running instance of a cluster, and Project, Publisher and Subscriber all connect to running instances of a project in a cluster.

In the connected state, an entity can interact with the cluster components. Once an entity is disconnected, it can no longer interact with the cluster but is still an active object in the SDK, and can be reconnected to the cluster. Once an entity is closed, it is no longer available for interaction and is reclaimed by the SDK. To reuse an entity that has closed, retrieve a fresh copy of the entity.

For example, you can retrieve a Project object and connect it to a project in the cluster. If the back-end project dies, the SDK Project receives a disconnected event. You can attempt to reconnect manually, or, if you are using callback mode and your configuration supports it, the SDK tries to reconnect automatically. Upon successful reconnection, the SDK generates a connected event. If you actively close the entity, it disconnects from the back-end project and the SDK reclaims the Project object. To reconnect, you first need to retrieve a new Project object.

The SDK provides great flexibility in structuring access to the entities exposed by the API. There are three modes that can be used to access entities: direct, callback, and select.

Direct access is the default mode when retrieving an entity. In this mode, all calls return when an error occurs or the operation completes successfully. There are no events generated later, so there is no need to have an associated event handler.

In callback access, an event handler must be associated with the request. Most calls to the entity return immediately, but completion of the request is indicated by the generation of the corresponding event. The SDK has two internal threads to implement the callback mechanism. The update thread monitors all entities currently registered for callbacks for applicable updates. If an update is found, an appropriate event is created and queued to the dispatch thread. The dispatch thread calls the registered handlers for the user code to process them.

The following sample illustrates accessing a project in callback mode. If you are working in callback mode and want to receive the callback events, register your callback handlers before you call connect on the entity you are interested in.

    ProjectOptions opts = new ProjectOptions.Builder().setAccessMode(AccessMode.CALLBACK).create();

    Project project = SDK.getInstance().getProject(projectUri, creds, opts);



    project.setCallback(EnumSet.allOf(ProjectEvent.Type.class), new ProjectHandler("Handler"));

    

    project.connect(60000);

    

    //

    // Wait or block.
 Rest of the project lifecycle is handled in the project callback handler

    //





    // Project handler class        

    public class ProjectHandler implements Callback

    {

        String m_name;

        

        ProjectHandler(String name) {

            m_name = name;

        }

        

        public String getName() {

            return m_name;

        }



        public void processEvent(ProjectEvent pevent)

        {

            Project p = pevent.getProject();

            try {

                switch ( pevent.getType() ) {

                // Project has connected - can retrieve streams, deployment etc.

                case CONNECTED:

                    String[] streams = pevent.getProject().getModelledStreamNames();

                    break;

                // Project disconnected - only call possible connect again

                case DISCONNECTED:

                    break;

                // Project closed - this object should not be accessed anymore by user code

                case CLOSED:

                    break;

                case STALE:

                case UPTODATE:

                    break;

                case ERROR:

                    break;

                }

            } catch (IOException e) {

            }

        }   

    }

The select access mode lets you multiplex various entities in a single user thread—somewhat similar to the select and poll mechanisms available on many systems—to monitor file descriptors. Register an entity using a Selector together with the events to monitor for. Then, call Selector.select(), which blocks until a monitored update occurs in the background. The function returns a list of SdkEvent objects. First determine the category (server, project, publisher, subscriber) of the event, then handle the appropriate event type. In this mode, the SDK uses a single background update thread to monitor for updates. If detected, the appropriate event is created and pushed to the Selector. The event is then handled in your own thread.

This example shows multiplexing of different entities.

        Uri cUri = new Uri.Builder(REAL_CLUSTER_URI).create();

        

        Selector selector = SDK.getInstance().getDefaultSelector(); 

        

        ServerOptions srvopts = new ServerOptions.Builder().setAccessMode(AccessMode.SELECT).create();

        Server server = SDK.getInstance().getServer(cUri, creds, srvopts);



        ProjectOptions prjopts = new ProjectOptions.Builder().setAccessMode(AccessMode.SELECT).create();

        Project project = null; //SDK.getInstance().getProject(cUri, creds, prjopts);

        

        SubscriberOptions subopts = new SubscriberOptions.Builder().setAccessMode(AccessMode.SELECT).create();

        Subscriber subscriber = null; //SDK.getInstance().getProject(cUri, creds, prjopts);



        PublisherOptions pubopts = new PublisherOptions.Builder().setAccessMode(AccessMode.SELECT).create();

        Publisher publisher = null; //SDK.getInstance().getProject(cUri, creds, prjopts);



        server.connect();

        

        server.selectWith(selector, EnumSet.allOf(ServerEvent.Type.class));

        

        // Your logic to exit the loop goes here ...

        while (true) {

            

            List<sdkevent> events = selector.select();

            

            for (SdkEvent event : events) {

                switch (event.getCategory()) {

                // Server events

                case SERVER:

                    ServerEvent srvevent = (ServerEvent) event;

                    switch (srvevent.getType()) {

                    

                    // Server has connected - can now perform operations, such as adding removing 

                    // applications.

                    case CONNECTED:

                    case MANAGER_LIST_CHANGE:

                        Manager[] managers = srvevent.getServer().getManagers();

                        for (Manager m : managers)

                            System.out.println("Manager:" + m);

                        break;

                    case CONTROLLER_LIST_CHANGE:

                        Controller[] controllers = srvevent.getServer().getControllers();

                        for (Controller cn : controllers)

                            System.out.println("Controller:" + cn);

                        break;

                    case WORKSPACE_LIST_CHANGE:

                        break;

                    

                    // This indicates that the Server has updated its state with the latest running application

                    // information.
 Project objects can now be retrieved 

                    case APPLICATION_LIST_CHANGE:

                    case DISCONNECTED:

                    case CLOSED:

                    case ERROR:

                        break;

                    }

                    break;

                        

                // Project events

                case ESP_PROJECT:

                    ProjectEvent prjevent = (ProjectEvent) event;

                    switch (prjevent.getType()) {

                    case CONNECTED:

                    case DISCONNECTED:

                    case CLOSED:

                    case ERROR:

                    case WARNING:

                        break;

                    }

                    break;

                        

                // Publisher events

                case PUBLISHER:

                    PublisherEvent pubevent = (PublisherEvent) event;

                    switch (pubevent.getType()) {

                    case CONNECTED:

                    // The publisher is read.
 This event is to be used to publish data in callback mode

                    case READY:

                    case DISCONNECTED:

                    case CLOSED:

                        break;

                    }

                    break;

                    

                // Subscriber events

                case SUBSCRIBER:

                    SubscriberEvent subevent = (SubscriberEvent) event;

                    switch (subevent.getType()) {

                    case CONNECTED:

                    case SUBSCRIBED:

                    case SYNC_START:

                    // There is data.
 This event is to be used to retrieve the subscribed data.

                    case DATA:

                    case SYNC_END:

                    case DISCONNECTED:

                    case CLOSED:

                    case UNSUBSCRIBED:

                    case DATA_INVALID:

                    case ERROR:

                    case STREAM_EXIT:

                    case DATA_LOST:

                        break;

                    }

                    break;

                }

            }

        }