Procedures written in SPLASH are integrated into Projects using the CCL Flex operator.
Procedures written in SPLASH are not meant to be standalone programs. They are meant to be used in Sybase® Event Stream Processor projects that are primarily written in CCL. The following examples show complete projects that incorporate SPLASH code using the CCL Flex operator.
This project displays the top three prices for each stock symbol.
CREATE SCHEMA TradesSchema ( Id integer, TradeTime date, Venue string, Symbol string, Price float, Shares integer ) ; /* ****************************************************** * Create a Nasdaq Trades Input Window */ CREATE INPUT WINDOW QTrades SCHEMA TradesSchema PRIMARY KEY (Id) ; /* ****************************************************** * Use Case a: * Keep records corresponding to only the top three * distinct values. Delete records that falls of the top * three values. * * Here the trades corresponding to the top three prices * per Symbol is maintained. It uses * - eventcaches * - local UDF */ CREATE FLEX Top3TradesFlex IN QTrades OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) BEGIN DECLARE eventCache(QTrades[Symbol], manual, Price asc) tradesCache; /* * Inserts record into cache if in top 3 prices and returns * the record to delete or just the current record if it was * inserted into cache with no corresponding delete. */ typeof(QTrades) insertIntoCache( typeof(QTrades) qTrades ) { // keep only the top 3 distinct prices per symbol in the // event cache integer counter := 0; typeof (QTrades) rec; long cacheSz := cacheSize(tradesCache); while (counter < cacheSz) { rec := getCache( tradesCache, counter ); if( round(rec.Price,2) = round(qTrades.Price,2) ) { // if the price is the same update // the record. deleteCache(tradesCache, counter); insertCache( tradesCache, qTrades ); return rec; break; } else if( qTrades.Price < rec.Price) { break; } counter++; } //Less than 3 distinct prices if(cacheSz < 3) { insertCache(tradesCache, qTrades); return qTrades; } else { //Current price is > lowest price //delete lowest price record. rec := getCache(tradesCache, 0); deleteCache(tradesCache, 0); insertCache(tradesCache, qTrades); return rec; } return null; } END; ON QTrades { keyCache( tradesCache, [Symbol=QTrades.Symbol;|] ); typeof(QTrades) rec := insertIntoCache( QTrades ); if(rec.Id) { //When id does not match current id it is a //record to delete if(rec.Id <> QTrades.Id) { output setOpcode(rec, delete); } output setOpcode(QTrades, upsert); } }; END;
This project collects data for thirty seconds and then computes the desired output values.
CREATE SCHEMA TradesSchema ( Id integer, TradeTime date, Venue string, Symbol string, Price float, Shares integer ) ; /* ****************************************************** * Create a Nasdaq Trades Input Window */ CREATE INPUT WINDOW QTrades SCHEMA TradesSchema PRIMARY KEY (Id) ; /* ****************************************************** * Use Case b: * Perform a computation every N seconds for records * arrived in the last N seconds. * * Here the Nasdaq trades data is collected for 30 seconds * before being released for further computation. */ CREATE FLEX PeriodicOutputFlex IN QTrades OUT OUTPUT WINDOW QTradesPeriodicOutput SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) BEGIN DECLARE dictionary(typeof(QTrades), integer) cache; END; ON QTrades { //Whenever a record arrives just insert into dictionary. //The key of the dictionary is the key to the record. cache[QTrades] := 0; }; EVERY 30 SECONDS { //Cycle through event cache and output all the rows //and delete the rows. for (rec in cache) { output setOpcode(rec, upsert); } clear(cache); }; END; /** * Perform a computation from the periodic output. */ CREATE OUTPUT WINDOW QTradesSymbolStats PRIMARY KEY DEDUCED AS SELECT q.Symbol, MIN(q.Price) Minprice, MAX(q.Price) MaxPrice, sum(q.Shares * q.Price)/sum(q.Shares) Vwap, count(*) TotalTrades, sum(q.Shares) TotalVolume FROM QTradesPeriodicOutput q GROUP BY q.Symbol ;