In-process Input Adapters

This section describes the intialization, execution, and shutdown subroutines for in-process input adapter, and provides an outline of the user code for an input adapter.

An in-process input adapter gets data from a source and sends the data to the Sybase CEP Engine. The sample code performs the outlined operations as well as other demonstration routines. The purpose of the sample code is to create a random input adapter that will work for any schema. The sample code also provides various routines to access parameters of various data types and print schemas among other things. These functions demonstrate the various API calls.

As we described, an in-process input adapter must have at least three subroutines:

Initialization: This may includes activities such as opening sockets, verifying the existence of files and/or directories, and pinging network connections. Both persistent and session data should be created. To make it easier to handle restarts, we recommend that the code for creating and storing session data be put in a separate subroutine.

Execution: This involves a query for any input to send to the Sybase CEP Engine. If input is available, messages get created and sent to the input stream. Whether one or multiple messages are sent per invocation depends on expected behavior of the adapter. If no input is available, the routine should call C8AdapterSleep() and specify a short time period, for example, one second. Alternatively, the function can return and wait until it is called again.

Shutdown: This occurs when the Sybase CEP Engine receives information that the adapter should shut down. This may be from someone clicking the "Stop" button in the Studio, or from another source such as an end of a data file. The shutdown routine should perform appropriate shutdown operations pertinent to the adapter: closing sockets, closing database connections, closing files, and so on. Release any resources obtained from the Sybase CEP Engine in order to complete the shutdown. In particular, call C8AdapterSetSessionState(adapter_ptr, NULL) to allow the Sybase CEP Engine to release internal resources.

An outline of the user code for an input adapter:

// Perform session initialization 
C8Bool my_input_c8adapter_session(C8Adapter *i_adapter_ptr) 
{ 
  struct MySessionState *l_session_ptr = 
        (struct MySessionState*)C8Malloc( 
         (C8UInt) sizeof(struct MySessionState)); 
  // Open files, sockets, db connections, ... 
  // Specific information in the session 
  l_session_ptr->m_file = fopen(...); 
  ... 
  C8AdapterSetSessionState(i_adapter_ptr, (void*)l_session_ptr); 
  return C8_TRUE; 
} 
C8_Bool my_input_c8adapter_initialize(C8Adapter *i_adapter_ptr) 
{ 
  struct MyPersistentData *l_persistent_ptr = 
        (struct MyPersistentData*)C8Malloc( 
         (C8UInt) sizeof(struct MyPersistentData)); 
  // Perform session initialization 
  if ( ! my_input_c8adapter_session(i_adapter_ptr) ) { 
      C8ErrorSet(MY_ERR_CODE, "Cannot initialize ..."); 
      return C8_FALSE; 
  } 
  
  // Perform persistent data initialization 
  if ( ! l_persistent_ptr ) { 
      C8ErrorSet(MY_ERROR_CODE, "Cannot obtain memory..."); 
      return C8_FALSE; 
  } 
  // Get user parameters from Studio. Notice these may be placed 
  // in persistent data or re-read as required. 
  l_persistent_ptr->m_ms_delay = 
    C8AdapterGetParamInt(i_adapter_ptr, (const C8Char*)"MsDelay", 
              (C8Int)10); 
  ... other parameters setup ... 
  // Save the persistent state 
  C8AdapterSetPersistentState(i_adapter_ptr, l_persistent_ptr, 
      sizeof(struct MyPersistentData)); 
  return C8_TRUE; 
} 
C8_BOOL my_input_c8adapter_execute(C8Adapter * i_adapter_ptr) 
{ 
   // Get the session state 
   struct MySessionState *l_session_ptr = (struct MySessionState*) 
                       C8AdapterGetSessionState(i_adapter_ptr); 
   if ( ! l_session_ptr) { 
      // The Sybase CEP Engine has been restarted for some reason. 
      // Need to recreate the session data. 
      if (! my_input_c8adapter_session(i_adapter_ptr)) { 
          // Cannot create session! 
          C8ErrorSet(...); 
          return C8_FALSE; 
      } 
     l_session_ptr = (struct MySessionState*) 
                       C8AdapterGetSessionState(i_adapter_ptr); 
     if ( ! l_session_ptr ) { 
          // Cannot retrieve newly created session state! 
          C8ErrorSet(...); 
          return C8_FALSE; 
      } 
   } 
   // Get the persistent data for operations. 
   ... 
   // Get data from the data source and publish the data to Sybase CEP
   ... 
   return C8_TRUE; 
} 
// Close any input files, socket connections, database 
// connections, etc. 
void my_input_c8adapter_shutdown(C8Adapter *i_adapter_ptr) 
{ 
    C8UInt l_data_size = 0; 
    MySessionState *l_session_ptr = (struct MySessionState*) 
        C8AdapterGetSessionState(i_adapter_ptr); 
    MyPersistentData *l_persistent_ptr = 
        (MyPersistentData*)C8AdapterGetPersistentState(i_adapter_ptr, 
         &l_data_size); 
    if(l_persistent_ptr) { 
        printf("Closing data source. Lines processed: %d\n", 
            l_persistent_ptr->m_number_lines_read); 
        C8AdapterSetPersistentState(i_adapter_ptr, (void *)NULL, 0); 
    } else { 
        printf("Cannot get persistent data in shutdown!\n"); 
    } 
    C8AdapterSetSessionState(i_adapter_ptr, NULL); 
    if(l_session_ptr) { 
        C8Free((void *)l_session_ptr); 
    } else { 
        printf("Cannot get session data in shutdown!\n"); 
    } 
    return; 
}