Procedures written in SPLASH are integrated into Projects using the CCL Flex operator.
Procedures written in SPLASH are not meant to be standalone programs. They are meant to be used in Sybase® Event Stream Processor projects that are primarily written in CCL. The Flex Operator is the CCL statement that incorporates a SPLASH routine into a CCL project.
Syntax: windowValue[ recordValue ]
Type: The recordValue must have the record type of the window. The operation returns a value of the record type of the window.
Example: input_window[ [k = 3; | ] ]
If a key field is missing from the argument, or the key field is null, then this operation always returns null. It doesn't make sense to compare key fields in the stream to null, since null is never equivalent to any value (including null).
Syntax: windowName{ recordValue }
Type: The record must be consistent with the record type of the window. The operation returns a value of the record type of the window.
Example: input_window{ [ | d = 5 ] }
You can use key and non-key fields in the record.
You can also iterate through all the records in a window using a "for" loop.
The following examples show complete projects that incorporate SPLASH code using the CCL Flex operator.
This project displays the top three prices for each stock symbol.
CREATE SCHEMA TradesSchema ( Id integer, TradeTime date, Venue string, Symbol string, Price float, Shares integer ) ; /* ****************************************************** * Create a Nasdaq Trades Input Window */ CREATE INPUT WINDOW QTrades SCHEMA TradesSchema PRIMARY KEY (Id) ; /* ****************************************************** * Use Case a: * Keep records corresponding to only the top three * distinct values. Delete records that falls of the top * three values. * * Here the trades corresponding to the top three prices * per Symbol is maintained. It uses * - eventcaches * - local UDF */ CREATE FLEX Top3TradesFlex IN QTrades OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) BEGIN DECLARE eventCache(QTrades[Symbol], manual, Price asc) tradesCache; /* * Inserts record into cache if in top 3 prices and returns * the record to delete or just the current record if it was * inserted into cache with no corresponding delete. */ typeof(QTrades) insertIntoCache( typeof(QTrades) qTrades ) { // keep only the top 3 distinct prices per symbol in the // event cache integer counter := 0; typeof (QTrades) rec; long cacheSz := cacheSize(tradesCache); while (counter < cacheSz) { rec := getCache( tradesCache, counter ); if( round(rec.Price,2) = round(qTrades.Price,2) ) { // if the price is the same update // the record. deleteCache(tradesCache, counter); insertCache( tradesCache, qTrades ); return rec; break; } else if( qTrades.Price < rec.Price) { break; } counter++; } //Less than 3 distinct prices if(cacheSz < 3) { insertCache(tradesCache, qTrades); return qTrades; } else { //Current price is > lowest price //delete lowest price record. rec := getCache(tradesCache, 0); deleteCache(tradesCache, 0); insertCache(tradesCache, qTrades); return rec; } return null; } END; ON QTrades { keyCache( tradesCache, [Symbol=QTrades.Symbol;|] ); typeof(QTrades) rec := insertIntoCache( QTrades ); if(rec.Id) { //When id does not match current id it is a //record to delete if(rec.Id <> QTrades.Id) { output setOpcode(rec, delete); } output setOpcode(QTrades, upsert); } }; END;
This project collects data for thirty seconds and then computes the desired output values.
CREATE SCHEMA TradesSchema ( Id integer, TradeTime date, Venue string, Symbol string, Price float, Shares integer ) ; /* ****************************************************** * Create a Nasdaq Trades Input Window */ CREATE INPUT WINDOW QTrades SCHEMA TradesSchema PRIMARY KEY (Id) ; /* ****************************************************** * Use Case b: * Perform a computation every N seconds for records * arrived in the last N seconds. * * Here the Nasdaq trades data is collected for 30 seconds * before being released for further computation. */ CREATE FLEX PeriodicOutputFlex IN QTrades OUT OUTPUT WINDOW QTradesPeriodicOutput SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) BEGIN DECLARE dictionary(typeof(QTrades), integer) cache; END; ON QTrades { //Whenever a record arrives just insert into dictionary. //The key of the dictionary is the key to the record. cache[QTrades] := 0; }; EVERY 30 SECONDS { //Cycle through event cache and output all the rows //and delete the rows. for (rec in cache) { output setOpcode(rec, upsert); } clear(cache); }; END; /** * Perform a computation from the periodic output. */ CREATE OUTPUT WINDOW QTradesSymbolStats PRIMARY KEY DEDUCED AS SELECT q.Symbol, MIN(q.Price) Minprice, MAX(q.Price) MaxPrice, sum(q.Shares * q.Price)/sum(q.Shares) Vwap, count(*) TotalTrades, sum(q.Shares) TotalVolume FROM QTradesPeriodicOutput q GROUP BY q.Symbol ;