Two projects demonstrate how SPLASH is used.
This project displays the top three prices for each stock symbol.
CREATE SCHEMA TradesSchema ( Id integer, TradeTime date, Venue string, Symbol string, Price float, Shares integer ) ; /* ****************************************************** * Create a Nasdaq Trades Input Window */ CREATE INPUT WINDOW QTrades SCHEMA TradesSchema PRIMARY KEY (Id) ; /* ****************************************************** * Use Case a: * Keep records corresponding to only the top three * distinct values. Delete records that falls of the top * three values. * * Here the trades corresponding to the top three prices * per Symbol is maintained. It uses * - eventcaches * - local UDF */ CREATE FLEX Top3TradesFlex IN QTrades OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) BEGIN DECLARE eventCache(QTrades[Symbol], manual, Price asc) tradesCache; /* * Inserts record into cache if in top 3 prices and returns * the record to delete or just the current record if it was * inserted into cache with no corresponding delete. */ typeof(QTrades) insertIntoCache( typeof(QTrades) qTrades ) { // keep only the top 3 distinct prices per symbol in the // event cache integer counter := 0; typeof (QTrades) rec; long cacheSz := cacheSize(tradesCache); while (counter < cacheSz) { rec := getCache( tradesCache, counter ); if( round(rec.Price,2) = round(qTrades.Price,2) ) { // if the price is the same update // the record. deleteCache(tradesCache, counter); insertCache( tradesCache, qTrades ); return rec; break; } else if( qTrades.Price < rec.Price) { break; } counter++; } //Less than 3 distinct prices if(cacheSz < 3) { insertCache(tradesCache, qTrades); return qTrades; } else { //Current price is > lowest price //delete lowest price record. rec := getCache(tradesCache, 0); deleteCache(tradesCache, 0); insertCache(tradesCache, qTrades); return rec; } return null; } END; ON QTrades { keyCache( tradesCache, [Symbol=QTrades.Symbol;|] ); typeof(QTrades) rec := insertIntoCache( QTrades ); if(rec.Id) { //When id does not match current id it is a //record to delete if(rec.Id <> QTrades.Id) { output setOpcode(rec, delete); } output setOpcode(QTrades, upsert); } }; END;
This project collects data for thirty seconds and then computes the desired output values.
CREATE SCHEMA TradesSchema ( Id integer, TradeTime date, Venue string, Symbol string, Price float, Shares integer ) ; /* ****************************************************** * Create a Nasdaq Trades Input Window */ CREATE INPUT WINDOW QTrades SCHEMA TradesSchema PRIMARY KEY (Id) ; /* ****************************************************** * Use Case b: * Perform a computation every N seconds for records * arrived in the last N seconds. * * Here the Nasdaq trades data is collected for 30 seconds * before being released for further computation. */ CREATE FLEX PeriodicOutputFlex IN QTrades OUT OUTPUT WINDOW QTradesPeriodicOutput SCHEMA TradesSchema PRIMARY KEY(Symbol,Price) BEGIN DECLARE dictionary(typeof(QTrades), integer) cache; END; ON QTrades { //Whenever a record arrives just insert into dictionary. //The key of the dictionary is the key to the record. cache[QTrades] := 0; }; EVERY 30 SECONDS { //Cycle through event cache and output all the rows //and delete the rows. for (rec in cache) { output setOpcode(rec, upsert); } clear(cache); }; END; /** * Perform a computation from the periodic output. */ CREATE OUTPUT WINDOW QTradesSymbolStats PRIMARY KEY DEDUCED AS SELECT q.Symbol, MIN(q.Price) Minprice, MAX(q.Price) MaxPrice, sum(q.Shares * q.Price)/sum(q.Shares) Vwap, count(*) TotalTrades, sum(q.Shares) TotalVolume FROM QTradesPeriodicOutput q GROUP BY q.Symbol ;