Using SPLASH in Projects

Two projects demonstrate how SPLASH is used.

This project displays the top three prices for each stock symbol.

CREATE SCHEMA TradesSchema ( 
    Id integer, 
    TradeTime date, 
    Venue string,   
    Symbol string, 
    Price float, 
    Shares integer 
) 
; 

/* ****************************************************** 
 * Create a Nasdaq Trades Input Window 
 */ 
CREATE INPUT WINDOW QTrades SCHEMA 
TradesSchema PRIMARY KEY (Id) 
; 

/* ****************************************************** 
 * Use Case a: 
 *         Keep records corresponding to only the top three 
 * distinct values. Delete records that falls of the top 
 * three values. 
 * 
 * Here the trades corresponding to the top three prices 
 * per Symbol is maintained. It uses 
 * - eventcaches 
 * - local UDF 
 */ 
CREATE FLEX Top3TradesFlex 
    IN QTrades 
    OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) 
    BEGIN 
        DECLARE 
            eventCache(QTrades[Symbol], manual, Price asc) tradesCache; 
            /* 
             * Inserts record into cache if in top 3 prices and returns 
             * the record to delete or just the current record if it was 
             * inserted into cache with no corresponding delete. 
             */ 
            typeof(QTrades) insertIntoCache( typeof(QTrades) qTrades ) 
            { 
                // keep only the top 3 distinct prices per symbol in the 
                // event cache 
                integer counter := 0; 
                typeof (QTrades) rec; 
                long cacheSz := cacheSize(tradesCache); 
                while (counter < cacheSz) { 
                    rec := getCache( tradesCache, counter ); 
                    if( round(rec.Price,2) = round(qTrades.Price,2) ) { 
                        // if the price is the same update 
                        // the record. 
                        deleteCache(tradesCache, counter); 
                        insertCache( tradesCache, qTrades ); 
                        return rec; 
                        break; 
                    } else if( qTrades.Price < rec.Price) { 
                        break; 
                    } 
                    counter++; 
                } 
                
                //Less than 3 distinct prices 
                if(cacheSz < 3) {       
                    insertCache(tradesCache, qTrades); 
                    return qTrades; 
                } else {   //Current price is > lowest price 
                    //delete lowest price record. 
                    rec := getCache(tradesCache, 0); 
                    deleteCache(tradesCache, 0); 
                    insertCache(tradesCache, qTrades); 
                    return rec; 
                } 

                return null; 
            } 
        END; 
        
        ON QTrades { 
            keyCache( tradesCache, [Symbol=QTrades.Symbol;|] ); 
            typeof(QTrades) rec := insertIntoCache( QTrades );   
            if(rec.Id) { 
                //When id does not match current id it is a 
                //record to delete 
                if(rec.Id <> QTrades.Id) { 
                    output setOpcode(rec, delete); 
                } 
                output setOpcode(QTrades, upsert); 
            } 
        }; 
    END; 

This project collects data for thirty seconds and then computes the desired output values.

CREATE SCHEMA TradesSchema ( 
    Id integer, 
    TradeTime date, 
    Venue string,   
    Symbol string, 
    Price float, 
    Shares integer 
) 
; 

/* ****************************************************** 
 * Create a Nasdaq Trades Input Window 
 */ 
CREATE INPUT WINDOW QTrades SCHEMA 
TradesSchema PRIMARY KEY (Id) 
; 

/* ****************************************************** 
 * Use Case b: 
 * Perform a computation every N seconds for records 
 * arrived in the last N seconds. 
 * 
 * Here the Nasdaq trades data is collected for 30 seconds 
 * before being released for further computation. 
 */ 
CREATE FLEX PeriodicOutputFlex 
    IN QTrades 
    OUT OUTPUT WINDOW QTradesPeriodicOutput SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) 
    BEGIN 
        DECLARE 
            dictionary(typeof(QTrades), integer) cache; 
                END;       
        ON QTrades { 
                //Whenever a record arrives just insert into dictionary. 
                //The key of the dictionary is the key to the record. 
            cache[QTrades] := 0;   
        }; 
        EVERY 30 SECONDS { 
                //Cycle through event cache and output all the rows 
                //and delete the rows. 
                for (rec in cache) { 
                        output setOpcode(rec, upsert); 
                } 
                clear(cache);         
        }; 
    END; 

/** 
 * Perform a computation from the periodic output. 
 */     
CREATE OUTPUT WINDOW QTradesSymbolStats 
PRIMARY KEY DEDUCED 
AS SELECT 
    q.Symbol, 
    MIN(q.Price)        Minprice, 
    MAX(q.Price)        MaxPrice, 
    sum(q.Shares * q.Price)/sum(q.Shares) Vwap, 
    count(*) TotalTrades, 
    sum(q.Shares) TotalVolume 
FROM 
    QTradesPeriodicOutput q 
GROUP BY 
    q.Symbol 
;