Using SPLASH in Flex Operators

Procedures written in SPLASH are integrated into Projects using the CCL Flex operator.

Procedures written in SPLASH are not meant to be standalone programs. They are meant to be used in SAP Sybase Event Stream Processor projects that are primarily written in CCL. The Flex Operator is the CCL statement that incorporates a SPLASH routine into a CCL project.

Operations on Windows that are inputs to the Flex Operator

You can also iterate through all the records in a window using a "for" loop.

Examples

The following examples show complete projects that incorporate SPLASH code using the CCL Flex operator.

This project displays the top three prices for each stock symbol.

CREATE SCHEMA TradesSchema ( 
    Id integer, 
    TradeTime date, 
    Venue string,   
    Symbol string, 
    Price float, 
    Shares integer 
) 
; 

/* ****************************************************** 
 * Create a Nasdaq Trades Input Window 
 */ 
CREATE INPUT WINDOW QTrades SCHEMA 
TradesSchema PRIMARY KEY (Id) 
; 

/* ****************************************************** 
 * Use Case a: 
 *         Keep records corresponding to only the top three 
 * distinct values. Delete records that falls of the top 
 * three values. 
 * 
 * Here the trades corresponding to the top three prices 
 * per Symbol is maintained. It uses 
 * - eventcaches 
 * - local UDF 
 */ 
CREATE FLEX Top3TradesFlex 
    IN QTrades 
    OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) 
    BEGIN 
        DECLARE 
            eventCache(QTrades[Symbol], manual, Price asc) tradesCache; 
            /* 
             * Inserts record into cache if in top 3 prices and returns 
             * the record to delete or just the current record if it was 
             * inserted into cache with no corresponding delete. 
             */ 
            typeof(QTrades) insertIntoCache( typeof(QTrades) qTrades ) 
            { 
                // keep only the top 3 distinct prices per symbol in the 
                // event cache 
                integer counter := 0; 
                typeof (QTrades) rec; 
                long cacheSz := cacheSize(tradesCache); 
                while (counter < cacheSz) { 
                    rec := getCache( tradesCache, counter ); 
                    if( round(rec.Price,2) = round(qTrades.Price,2) ) { 
                        // if the price is the same update 
                        // the record. 
                        deleteCache(tradesCache, counter); 
                        insertCache( tradesCache, qTrades ); 
                        return rec; 
                        break; 
                    } else if( qTrades.Price < rec.Price) { 
                        break; 
                    } 
                    counter++; 
                } 
                
                //Less than 3 distinct prices 
                if(cacheSz < 3) {       
                    insertCache(tradesCache, qTrades); 
                    return qTrades; 
                } else {   //Current price is > lowest price 
                    //delete lowest price record. 
                    rec := getCache(tradesCache, 0); 
                    deleteCache(tradesCache, 0); 
                    insertCache(tradesCache, qTrades); 
                    return rec; 
                } 

                return null; 
            } 
        END; 
        
        ON QTrades { 
            keyCache( tradesCache, [Symbol=QTrades.Symbol;|] ); 
            typeof(QTrades) rec := insertIntoCache( QTrades );   
            if(rec.Id) { 
                //When id does not match current id it is a 
                //record to delete 
                if(rec.Id <> QTrades.Id) { 
                    output setOpcode(rec, delete); 
                } 
                output setOpcode(QTrades, upsert); 
            } 
        }; 
    END; 

This project collects data for thirty seconds and then computes the desired output values.

CREATE SCHEMA TradesSchema ( 
    Id integer, 
    TradeTime date, 
    Venue string,   
    Symbol string, 
    Price float, 
    Shares integer 
) 
; 

/* ****************************************************** 
 * Create a Nasdaq Trades Input Window 
 */ 
CREATE INPUT WINDOW QTrades SCHEMA 
TradesSchema PRIMARY KEY (Id) 
; 

/* ****************************************************** 
 * Use Case b: 
 * Perform a computation every N seconds for records 
 * arrived in the last N seconds. 
 * 
 * Here the Nasdaq trades data is collected for 30 seconds 
 * before being released for further computation. 
 */ 
CREATE FLEX PeriodicOutputFlex 
    IN QTrades 
    OUT OUTPUT WINDOW QTradesPeriodicOutput SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) 
    BEGIN 
        DECLARE 
            dictionary(typeof(QTrades), integer) cache; 
                END;       
        ON QTrades { 
                //Whenever a record arrives just insert into dictionary. 
                //The key of the dictionary is the key to the record. 
            cache[QTrades] := 0;   
        }; 
        EVERY 30 SECONDS { 
                //Cycle through event cache and output all the rows 
                //and delete the rows. 
                for (rec in cache) { 
                        output setOpcode(rec, upsert); 
                } 
                clear(cache);         
        }; 
    END; 

/** 
 * Perform a computation from the periodic output. 
 */     
CREATE OUTPUT WINDOW QTradesSymbolStats 
PRIMARY KEY DEDUCED 
AS SELECT 
    q.Symbol, 
    MIN(q.Price)        Minprice, 
    MAX(q.Price)        MaxPrice, 
    sum(q.Shares * q.Price)/sum(q.Shares) Vwap, 
    count(*) TotalTrades, 
    sum(q.Shares) TotalVolume 
FROM 
    QTradesPeriodicOutput q 
GROUP BY 
    q.Symbol 
;