Entity Lifecycles and Access Modes

In the Sybase® Event Stream Processor C SDK, all entities exposed by the SDK have a common life cycle and multiple access modes.

User interaction in the Event Stream Processor (ESP) SDK is handled through entities the SDK exposes. The main entities are Server, Project, Publisher, and Subscriber. These entities correspond to the functional areas of the SDK. For example, the Server object represents a running instance of a cluster, the Project corresponds to a single project deployed to the cluster, the Publisher object deals with publishing data to a running project, and so on.

On initial retrieval, an entity is considered to be open. When an entity is open, you can retrieve certain static information about it. To accomplish its assigned tasks, an entity has to connect to the corresponding component in the cluster. A server connects to a running instance of a cluster, and EspProject, EspPublisher, and EspSubscriber all connect to running instances of a project in a cluster.

In the connected state, an entity can interact with the cluster components. Once an entity is disconnected, it can no longer interact with the cluster but is still an active object in the SDK, and can be reconnected to the cluster. Once an entity is closed, it is no longer available for interaction and is reclaimed by the SDK. To reuse an entity that has closed, retrieve a fresh copy of the entity.

For example, you can retrieve a Project object and connect it to a project in the cluster. If the back-end project dies, the SDK Project receives a disconnected event. You can attempt to reconnect manually, or, if you are using callback mode and your configuration supports it, the SDK tries to reconnect automatically. Upon successful reconnection, the SDK generates a connected event. If you actively close the entity, it disconnects from the back-end project and the SDK reclaims the Project object. To reconnect, you first need to retrieve a new Project object.

The SDK provides great flexibility in structuring access to the entities exposed by the API. There are three modes that can be used to access entities: direct, callback, and select.

Direct access is the default mode when retrieving an entity. In this mode, all operations on an entity return when an error occurs or the operation completes successfully. There are no events generated later, so there is no need to have an associated event handler.

In callback access, an event handler must be associated with the request. Most calls to the entity return immediately, but completion of the request is indicated by the generation of the corresponding event. The SDK has two internal threads to implement the callback mechanism. The update thread monitors all entities currently registered for callbacks for applicable updates. If an update is found, an appropriate event is created and queued to the dispatch thread. The dispatch thread calls the registered handlers for the user code to process them.

The following example shows how an EspProject could be accessed in callback mode. If you are working in callback mode and want to receive the callback events, register your callback handlers before you call connect on the entity you are interested in:

    EspProjectOptions * options = esp_project_options_create(error);

    int rc = esp_project_options_set_access_mode(options, CALLBACK_ACCESS, error);

    const char * temp = "esp://host.domain.com/workspace/project";
    EspUri * uri = esp_uri_create_string(temp, error);

    // Create credentials to authenticate with project. Assume cluster is setup to use user password authentication
    EspCredentials * creds = esp_credentials_create(ESP_CREDENTIALS_USER_PASSWORD, error);
    esp_credentials_set_user(creds, “user”, error);
    esp_credentials_set_password(creds, “password”, error);

    EspProject * project = esp_project_get(uri, creds, options, error);

    // If you are not going to reuse the credentials, you need to free it
    esp_credentials_free(creds, error);

    rc = esp_project_set_callback(project, ESP_PROJECT_EVENT_ALL, project_callback, NULL, error);

    rc = esp_project_connect(project, error);

    //
    // The callback handler
    //
    void project_callback(const EspProjectEvent * event, void * data)
    {
        EspProject * project = NULL;
        const EspError * error = NULL;
        int rc;
        uint32_t type;
        rc = esp_project_event_get_type(event, &type, NULL);

        switch (type) {
            case ESP_PROJECT_EVENT_CONNECTED:
                project = esp_project_event_get_project(event, NULL);
                break;
            case ESP_PROJECT_EVENT_DISCONNECTED:
                project = esp_project_event_get_project(event, NULL);
                esp_project_close(project, NULL);           // you can call close inside a callback
                break;
            case ESP_PROJECT_EVENT_CLOSED:
            case ESP_PROJECT_EVENT_STALE:
            case ESP_PROJECT_EVENT_UPTODATE:
                break;
            case ESP_PROJECT_EVENT_ERROR:
                error = esp_project_event_get_error(event, NULL);
                break;
        }
    }

The select access mode lets you multiplex various entities in a single thread—somewhat similar to the select and poll mechanisms available on many systems—to monitor file descriptors. An entity is registered with an EspSelector together with the events to monitor for. Then call esp_selector_select(...) which blocks until a monitored update occurs in the background. The function returns a list of EspEvent objects. First determine the category (server, project, publisher, subscriber) of the event, then handle the appropriate event type. In select mode, the SDK uses one background update thread to monitor for updates. If detected, the appropriate event is created and pushed to the EspSelector. The event is then handled in your own thread.

The following example uses a single selector to multiplex different entities.

// Assuming the EspServer, EspProject, EspPublisher, EspSubscriber have been created with the correct options
// Not doing error checking, etc for clarity

    EspSelector * selector = esp_selector_create("server-select", error);
    rc = esp_server_select_with(server, selector, ESP_SERVER_EVENT_ALL, error);
    EspList * list = esp_list_create(ESP_LIST_EVENT_T, error);

    rc = esp_server_connect(m_server, error);

    uint32_t type;
    const void * ev;
    int c;
    int done = 0;

    while (!done)
    {
        esp_list_clear(list, error);
        rc = esp_selector_select(selector, list, error);

        c = esp_list_get_count(list, error);
        
        for (int i = 0; i < c; i++)
        {
            ev = esp_list_get_event(list, i, error);

            int cat = esp_event_get_category(ev, error); 

            switch ( cat ) {
                case ESP_EVENT_SERVER:
                    srvevent = (EspServerEvent*) ev;
                    esp_server_event_get_type(srvevent, &type, error);
                    switch (type) {
                        // process server events
                        case ESP_SERVER_EVENT_CONNECTED:
                            break;
                        // .....
                    }
                default:
                    break;

                case ESP_EVENT_PROJECT:
                    prjevent = (EspProjectEvent*) ev;
                    esp_project_event_get_type(prjevent, &type, error);
                    switch (type) {
                        // process project events
                        case ESP_PROJECT_EVENT_CONNECTED:
                            break;
                    }
                case ESP_EVENT_PUBLISHER:
                    {
                        pubevent = (EspPublisherEvent*) ev;
                        esp_publisher_event_get_type(pubevent, &type, error);
                        switch (type) {
                            case ESP_PUBLISHER_EVENT_CONNECTED:
                                break;
                        }
                    }
                    break;

                case ESP_EVENT_SUBSCRIBER:
                    {
                        subevent = (EspSubscriberEvent*) ev;
                        esp_subscriber_event_get_type(subevent, &type, error);
                        switch (type) {
                            case ESP_SUBSCRIBER_EVENT_CONNECTED:
                                break;
                        }
                        break;
                    }
            }
        }
    }