Execution function of input adapters

This section describes the execution function for input adapters, which retrieves data from external sources, creates, and processes messages.

For an in-process input adapter, Sybase CEP Server will repeatedly call the execute function. The period between calls cannot be reliably predicted since it depends heavily on system load. Each adapter runs in its own thread and should limit the amount of time processing input data. An input adapter should not block on waiting for input.

The execution function of input adapters is to retrieve data from an external source, create a Sybase CEP message, and send that message to the Sybase CEP Engine. An input adapter may send more than one message per invocation, but should not send an excessive number of messages. As a general rule, sending more than 100 messages per execute call is likely not a good idea. If the number of messages per invocation of the execute module is important, add a "number of messages" parameter to the ADL so that Studio will allow easy tuning of your application.

Message creation is an important part of execution. Empty messages are created as:

// Wait a limited time for data to show up in the data source 
if (MyDataSourceSelect(i_adapter_ptr, l_ds, l_ds->m_ms_delay)) { 
    /* Create a message to send to the Sybase CEP Engine */ 
    l_message_ptr = C8MessageCreate(C8_MESSAGE_POSITIVE, 
                                        i_schema_ptr); 
    if(l_message_ptr == 0) { 
        printf("ERROR: Cannot create positive message!!\n"); 
        return C8_FALSE; 
    } 
    // Arbitrarily create a Null record every 50th line. 
    if( (l_ds->m_number_lines_read % 50) == 0) { 
        l_is_null = C8_TRUE; 
    } 
    MyCreateRandomMessage(l_ds, l_message_ptr, i_schema_ptr, 
        l_is_null); 
    C8AdapterSendMessage(i_adapter_ptr, l_message_ptr); 
    ++l_ds->m_number_lines_read; 
    C8MessageDestroy(l_message_ptr); 
}

The function MyDataSourceSelect() returns true when data is available. A message must be created to send to the Sybase CEP Engine; C8MessageCreate() performs the task of constructing an empty message. The message gets populated with MyCreateRandomMessage() that fills each column according to datatype with random data, then the message gets sent by C8AdapterSendMessage(). To illustrate setting message fields to null, every fiftieth message will null all the fields. This can be observed in the input Studio stream viewer.

After the Sybase CEP Engine receives the message, the message is processed. Your adapter must destroy the message as seen by the adapter using C8MessageDestroy(). Failure to perform this function will result in a memory leak.

Because the persistent state has changed, it must be saved. The function C8AdapterSetPersistentState() saves the state.

The execution function finishes by returning C8_TRUE. This indicates to Sybase CEP Server that all is well and no fatal conditions have occurred. Problems that might arise include socket timeouts, file permissions, broken connections, and anything that is not recoverable. These fatal conditions return C8_FALSE.

If the execution function returns C8_FALSE, the following sequence is performed once in an attempt to recover:

  1. The shutdown function is called. This should properly close/terminate the adapter.

  2. The initialization function is called. This is an attempt to re-initialize the adapter.

  3. The execute function is called once again. If this call fails, no further attempts at revival are performed.