Registering a query: Example

This section contains an example program that registers a query.

The outline of the program is:

  1. Initialize the SDK. This step is required before calling other functions in the SDK.
  2. Compile a project named "streamkeeper", which contains the streams that our dynamic query binds.
  3. Start executing the streamkeeper project.
  4. Create StreamInfo objects that contain the information required to bind the registered query's streams to the streamkeeper project's streams.
  5. Create a SybaseC8ParameterInfo object to store information about the parameter ($VolumeThreshold) that is referenced in the registered query's CCL statement(s).
  6. Register the query.
  7. Publish some messages/rows.
  8. Read those messages/rows.
  9. Stop the registered query.
  10. Stop the project whose streams you are bound to.
  11. Clean up.

    The example code below is a modified subset of the example provided with the SDK.

    /** 
     *  Example for registering a query in C 
     * Copyright (C) 2006, Sybase, Inc. All rights reserved. 
     */
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <math.h>
    #include "nspr.h" /* for PR_snprintf */
    #include "c8adapter.h"
    #include "c8compiler.h"
    #include "c8status.h"
    #include "c8regqry.h"
    int
    example_register_query_main(int argc, char **argv)
    { 
        /* 
         * This is an example of using the Sybase CEP C API 
         * for registering queries. 
         */ 
        /* Before this example can be run, Sybase CEP Server 
         * must be running, and a workspace named "Default" 
         * must exist on it (create the workspace with 
         * Sybase CEP Studio or the c8_client application). 
         */ 
        /* declare necessary variables */ 
        int ret = -1; 
        const C8Char * workspace_name = "Default"; 
        // This is the name of a 'project' that will have 
        // streams that our registered query can bind to.  
        const C8Char * stream_keeper_name = "StreamKeeper"; 
        const C8Char * in_stream_name = "InTrades"; 
        const C8Char * out_stream_name = "OutTrades"; 
        const C8Char * server_hostname = "localhost"; 
        const C8Char * server_port = "6789"; 
        C8Char server_uri [1024]; 
        C8Char ccl_uri_of_input_stream [1024]; 
        C8Char ccl_uri_of_output_stream [1024]; 
        const C8Char * query_name = 0; 
        const C8Char * query_text = 0; 
        // For each stream that the registered query uses, 
        // we must provide a description in the form of a 
        // C8StreamInfo object.  In this example, our 
        // registered query will use 3 streams: an Input 
        // stream, an Output stream, and a local stream.  
        C8StreamInfo *streams[3] = { 0,0,0 }; 
        // In this example, we will use a CCL parameter 
        // named $VolumeThreshold, e.g. in the CCL statement: 
        //    INSERT INTO ... 
        //    SELECT ... 
        //    FROM 
        //    WHERE Volume > $VolumeThreshold; 
        // We will define a SybaseC8ParameterInfo object that 
        // contains information about that $VolumeThreshold 
        // parameter.  
        C8ParameterInfo *parameters[1] = {0}; 
        C8Subscriber * sub = 0; 
        C8Publisher * pub = 0; 
        C8Schema * schema = 0; 
        // The message/row that we will send.  
        C8Message * msg_1 = 0; 
        // The message/row that we expect to receive.  
        C8Message * rcv_msg_1 = 0; 
        // This will point to an array of strings that 
        // contain the names of the temporary files that 
        // will be used during the compile-and-register 
        // process.  
        C8Char ** tmp_files = 0; 
        C8Timestamp now = 0; 
        const C8Char * str = 0; 
        /* Initialize the SDK; must do this exactly once per 
         * process.  Note that NSPR and other libraries are 
         * initialized within this call as well.  
         */ 
        if (C8_OK != C8ClientSDKInitialize(argv[0], 0)) { 
            return -1; 
        } 
        // Compose the URI of the server.  
        PR_snprintf(server_uri, 1024, "http://%s:%s", 
            server_hostname, 
            server_port); 
        /* First, we need a module that defines our streams. 
         * The StreamKeeper.ccl is an example of such a 
         * module.For the purposes of this example, we will 
         * compile and start it here; however, in a typical 
         * application this will be done elsewhere. 
         */ 
        if (C8_OK != C8Compile("stream-keeper.ccl", "stream-keeper.ccx", 
            workspace_name, "StreamKeeper", 0)) { 
            fprintf(stderr, "%s: Could not compile StreamKeeper\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        if (C8_OK != C8StartProgram(server_uri, workspace_name, 
                "stream-keeper.ccx")) { 
            fprintf (stderr, "%s: Could not start StreamKeeper\n",
                     argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        C8Sleep(5*C8PerSecond); /* give it some time to settle */ 
        /* Determine the URIs for the StreamKeeper's streams. 
         * Typically, these would be hard-coded or passed as 
         * parameters. 
         */ 
        PR_snprintf(ccl_uri_of_input_stream, 1024, 
            "ccl://%s:%s/Stream/%s/%s/%s", 
            server_hostname, server_port, 
            workspace_name, stream_keeper_name, in_stream_name ); 
        PR_snprintf(ccl_uri_of_output_stream, 1024, 
            "ccl://%s:%s/Stream/%s/%s/%s", 
            server_hostname, server_port, 
            workspace_name, stream_keeper_name, out_stream_name ); 
        /* 
         * Now, that the environment is set up, on to the core 
         * of the example! 
         * 
         * The StreamKeeper contains two streams, InTrades and 
         * OutTrades, but no queries. The query we will be 
         * registering in this example filters the data on its 
         * input stream and forward the query results into its 
         * output stream. 
         * 
         * The query uses three streams: 
         *      MyStrIn (input), 
         *      MyStrOut (output), and 
         *      MyStrLocal (a local stream for the query's own use).  
         * We need to define the array of StreamInfo 
         * in order to bind these streams appropriately. 
         */ 
        streams [0] = C8StreamInfoCreateInputBound( 
                         "MyStrIn", ccl_uri_of_input_stream, 
                         2*C8PerSecond, C8_FALSE, 0, C8_FALSE); 
        streams [1] = C8StreamInfoCreateLocal("MyStrLocal"); 
        streams [2] = C8StreamInfoCreateOutputBound( 
                         "MyStrOut", ccl_uri_of_output_stream); 
        if (0 == streams[0] || 
            0 == streams[1] || 
            0 == streams[2]) { 
            fprintf (stderr, 
                     "%s: Could not create stream info objects\n",
                     argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        parameters[0] = C8ParameterInfoCreateFloat("VolumeThreshold", 
                100.0); 
        if (0 == parameters[0]) { 
            fprintf (stderr, 
                    "%s: Could not create parameter info objects\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        /* 
         * Note, that the registerQuery() call determines the schema 
         * for the input and output streams automatically by querying 
         * the server (that's why the module containing the streams, 
         * in our case StreamKeeper, must be running). 
         * There can be more than one input stream and more than 
         * one output stream defined for the query. 
         * 
         * A note about "local" streams. There can be 0 or more local 
         * streams defined for the query (depending on the query). 
         * The query does not define the schema for the MyStrLocal 
         * explicitly: the query must be written in such a way that 
         * the compiler can determine the local streams' schemas 
         * automatically. (See Sybase CEP Programmer's Guide
             and 
    					Sybase CEP CCL Reference Guide
             for details on how to write CCL in such a way that 
         * the compiler can deduce the schemas for the local streams.) 
         */ 
        query_name = "MyFilter"; 
        query_text = 
                    "INSERT INTO " 
                    "    MyStrLocal " 
                    "SELECT * " 
                    "FROM " 
                    "     MyStrIn " 
                    "WHERE " 
                    "     Volume > $VolumeThreshold;\n" 
                    "INSERT INTO " 
                    "     MyStrOut " 
                    "SELECT * " 
                    "FROM MyStrLocal;\n" 
                    ; 
        if (C8_OK != C8RegisterQuery(server_uri, workspace_name, 
            query_name, query_text, (C8SizeType) 3, 
            (const C8StreamInfo **) streams, (C8SizeType) 1, 
            (const C8ParameterInfo **) parameters, 
            0, C8_FALSE, & tmp_files)) { 
            fprintf (stderr, "%s: Could not register query\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        if (tmp_files) { 
            /* tmp_files contains null-terminated array of strings; 
             * let's print it out! later, we'll have to free all the 
             * strings as well as tmp_files. 
             */ 
            C8Char ** pp = tmp_files; 
            for (; *pp; ++pp) { 
                fprintf(stderr, "Created temporary file: %s\n", *pp); 
            } 
        } 
        /* The official example is now over! Well, almost over (we 
         * still need to be able to stop the module we just started). 
         * However, let's make sure it works.  There are no 
         * adapters connected to the StreamKeeper module, so we 
         * must (a) generate some data, and (b) receive the output. 
         */ 
        C8Sleep(5*C8PerSecond); /* let things settle a bit */ 
        /* Create a subscription first (note, that the stream URI 
         * is the URI from the StreamKeeper module). 
         */ 
        sub = C8SubscriberCreate(ccl_uri_of_output_stream); 
        if (0 == sub) { 
            fprintf (stderr, "%s: Could not subscribe to stream\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        /* Create a publisher (note, that the stream URI is the 
         * URI from the StreamKeeper module). 
         */ 
        pub = C8PublisherCreate(ccl_uri_of_input_stream); 
        if (0 == pub) { 
            fprintf (stderr, 
                     "%s: Could not create a publisher to stream\n", 
                     argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        C8Sleep(3*C8PerSecond); /* wait for things to settle */ 
        schema = C8GetStreamSchema(ccl_uri_of_input_stream); 
        if (0 == schema) { 
            fprintf (stderr, "%s: Could not determine stream schema\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        /* Now, publish some data */ 
        msg_1 = C8MessageCreate(C8_MESSAGE_POSITIVE, schema); 
        if (0 == msg_1) { 
            fprintf (stderr, "%s: Could not create message\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        now = C8Now(); 
        if (C8_OK != 
             C8MessageColumnSetStringByName(msg_1,"Symbol","IBM") || 
            C8_OK != 
             C8MessageColumnSetFloatByName(msg_1,"Price",50.11) || 
            C8_OK != 
             C8MessageColumnSetIntByName(msg_1,"Volume",120) ) { 
            fprintf (stderr, "%s: Could not set message data\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        C8MessageSetMessageTimestamp(msg_1, now); 
        if (C8_OK != C8PublisherSendMessage(pub, msg_1)) { 
            fprintf (stderr, "%s: Could not publish message\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        /* let's now receive the message */ 
        rcv_msg_1 = C8SubscriberGetNextMessage(sub, 20*C8PerSecond); 
        if (0 == rcv_msg_1) { 
            fprintf (stderr, "%s: didn't receive first message\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        /* check that the expected messages came through: */ 
        if (C8_OK != C8MessageColumnGetStringByName(rcv_msg_1, 
                "Symbol", &str)) { 
            fprintf (stderr, "%s: could not read message value\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        if (strcmp (str, "IBM")) { 
            fprintf (stderr, "%s: unexpected message value received\n", 
                    argv[0]); 
            ret = -1; 
            goto cleanup; 
        } 
        /* disconnect publisher and subscriber */ 
        C8SubscriberDestroy(sub); 
        sub = 0; 
        C8PublisherDestroy(pub); 
        pub = 0; 
        /* unregister query by stopping it */ 
        if (C8_OK != C8StopProgram(server_uri, workspace_name, 
                query_name)) { 
            fprintf (stderr, "%s: could not stop program %s\n", 
                    argv[0], query_name); 
            ret = -1; 
            goto cleanup; 
        } 
        /* stop the master query */ 
        if (C8_OK != C8StopProgram(server_uri, workspace_name, 
                "StreamKeeper")) { 
            fprintf (stderr, "%s: could not stop program %s\n", 
                    argv[0], "StreamKeeper"); 
            ret = -1; 
            goto cleanup; 
        } 
        ret = 0; /* completed successfully! */ 
    cleanup: 
        if (ret != 0) { 
            C8SizeType l_errtxtlen = C8ErrorGetMessageLength(); 
            C8Char * l_errbuf = 0; 
            if (l_errtxtlen > 0) { 
                l_errbuf = (C8Char *) C8Malloc(l_errtxtlen); 
                *l_errbuf = 0; 
                C8ErrorGetMessageText(l_errbuf, l_errtxtlen); 
                fprintf(stderr, "Error message: %s\n", l_errbuf); 
                C8Free(l_errbuf); 
                l_errbuf = 0; 
            } 
        } 
        /* release all resources */ 
        if (sub) { C8SubscriberDestroy(sub); sub = 0; } 
        if (pub) { C8PublisherDestroy(pub); pub = 0; } 
        if (0 != ret) { 
            C8StopProgram(server_uri, workspace_name, query_name); 
            C8StopProgram(server_uri, workspace_name, "StreamKeeper"); 
        } 
        if (streams[0]) { 
            C8StreamInfoDestroy(streams[0]); streams[0] = 0; 
            } 
        if (streams[1]) { 
            C8StreamInfoDestroy(streams[1]); streams[1] = 0; 
            } 
        if (streams[2]) { 
            C8StreamInfoDestroy(streams[2]); streams[2] = 0; 
            } 
        if (parameters[0]) { 
            C8ParameterInfoDestroy(parameters[0]); parameters[0] = 0; 
            } 
        if (msg_1) { 
            C8MessageDestroy(msg_1); msg_1 = 0; 
            } 
        if (rcv_msg_1) { 
            C8MessageDestroy(rcv_msg_1); rcv_msg_1 = 0; 
            } 
        if (schema) { 
            C8SchemaDestroy(schema); schema = 0; 
            } 
        if (tmp_files) { 
            C8Char ** pp = tmp_files; 
            for (; *pp; ++pp) { 
                C8Free(*pp); 
                *pp = 0; 
            } 
            C8Free(tmp_files); 
            tmp_files = 0; 
        } 
        /* and shut down the library */ 
        if (C8ClientSDKShutdown() != C8_OK) { 
            /* could not shutdown the library */ 
            ret = -1; 
        } 
        return ret; 
    }
    

    The complete source code is in the example_register_query.c program.

    On Microsoft Windows, if you installed to the default directory, the file(s) will be in:

    C:\Program Files\sybasec8\server\sdk\c\examples

    On UNIX-like operating systems, if you installed to the default directory, the file(s) will be in:

    /home/<userid>/sybasec8/server/sdk/c/examples

    The example uses two additional files, named stream-keeper.ccl and stock-trades.ccs, which are in the same directory.

    Instructions for compiling and linking this code are very similar to the instructions for compiling and linking an out-of-process adapter. Because this example uses some code in the nspr library, you must also perform the following actions: