Two projects demonstrate how SPLASH is used.
This project displays the top three prices for each stock symbol.
CREATE SCHEMA TradesSchema (
Id integer,
TradeTime date,
Venue string,
Symbol string,
Price float,
Shares integer
)
;
/* ******************************************************
* Create a Nasdaq Trades Input Window
*/
CREATE INPUT WINDOW QTrades SCHEMA
TradesSchema PRIMARY KEY (Id)
;
/* ******************************************************
* Use Case a:
* Keep records corresponding to only the top three
* distinct values. Delete records that falls of the top
* three values.
*
* Here the trades corresponding to the top three prices
* per Symbol is maintained. It uses
* - eventcaches
* - local UDF
*/
CREATE FLEX Top3TradesFlex
IN QTrades
OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price)
BEGIN
DECLARE
eventCache(QTrades[Symbol], manual, Price asc) tradesCache;
/*
* Inserts record into cache if in top 3 prices and returns
* the record to delete or just the current record if it was
* inserted into cache with no corresponding delete.
*/
typeof(QTrades) insertIntoCache( typeof(QTrades) qTrades )
{
// keep only the top 3 distinct prices per symbol in the
// event cache
integer counter := 0;
typeof (QTrades) rec;
long cacheSz := cacheSize(tradesCache);
while (counter < cacheSz) {
rec := getCache( tradesCache, counter );
if( round(rec.Price,2) = round(qTrades.Price,2) ) {
// if the price is the same update
// the record.
deleteCache(tradesCache, counter);
insertCache( tradesCache, qTrades );
return rec;
break;
} else if( qTrades.Price < rec.Price) {
break;
}
counter++;
}
//Less than 3 distinct prices
if(cacheSz < 3) {
insertCache(tradesCache, qTrades);
return qTrades;
} else { //Current price is > lowest price
//delete lowest price record.
rec := getCache(tradesCache, 0);
deleteCache(tradesCache, 0);
insertCache(tradesCache, qTrades);
return rec;
}
return null;
}
END;
ON QTrades {
keyCache( tradesCache, [Symbol=QTrades.Symbol;|] );
typeof(QTrades) rec := insertIntoCache( QTrades );
if(rec.Id) {
//When id does not match current id it is a
//record to delete
if(rec.Id <> QTrades.Id) {
output setOpcode(rec, delete);
}
output setOpcode(QTrades, upsert);
}
};
END;
This project collects data for thirty seconds and then computes the desired output values.
CREATE SCHEMA TradesSchema (
Id integer,
TradeTime date,
Venue string,
Symbol string,
Price float,
Shares integer
)
;
/* ******************************************************
* Create a Nasdaq Trades Input Window
*/
CREATE INPUT WINDOW QTrades SCHEMA
TradesSchema PRIMARY KEY (Id)
;
/* ******************************************************
* Use Case b:
* Perform a computation every N seconds for records
* arrived in the last N seconds.
*
* Here the Nasdaq trades data is collected for 30 seconds
* before being released for further computation.
*/
CREATE FLEX PeriodicOutputFlex
IN QTrades
OUT OUTPUT WINDOW QTradesPeriodicOutput SCHEMA TradesSchema PRIMARY KEY(Symbol,Price)
BEGIN
DECLARE
dictionary(typeof(QTrades), integer) cache;
END;
ON QTrades {
//Whenever a record arrives just insert into dictionary.
//The key of the dictionary is the key to the record.
cache[QTrades] := 0;
};
EVERY 30 SECONDS {
//Cycle through event cache and output all the rows
//and delete the rows.
for (rec in cache) {
output setOpcode(rec, upsert);
}
clear(cache);
};
END;
/**
* Perform a computation from the periodic output.
*/
CREATE OUTPUT WINDOW QTradesSymbolStats
PRIMARY KEY DEDUCED
AS SELECT
q.Symbol,
MIN(q.Price) Minprice,
MAX(q.Price) MaxPrice,
sum(q.Shares * q.Price)/sum(q.Shares) Vwap,
count(*) TotalTrades,
sum(q.Shares) TotalVolume
FROM
QTradesPeriodicOutput q
GROUP BY
q.Symbol
;