Using a vector and dictionary data structure in SPLASH.
This example implements an OUTPUT AFTER logic that accumulates N Trades per Symbol before the rows are outputted for further processing. A data structure combining a dictionary and a vector caches the rows for every symbol until there is at least N rows for a Symbol. N is controlled by the parameter NoOfRows.
To test this model, run it, view the DelayedTrades stream in the stream viewer and manually load input into the Trades stream. You will see rows in the stream viewer only after you insert N trades for a symbol.
DECLARE
integer NoOfRows := 3;
END;
CREATE SCHEMA TradeSchema
(Id long, Symbol STRING, Price MONEY(4), Volume INTEGER, TradeTime DATE);
CREATE INPUT STREAM Trades SCHEMA TradeSchema;
CREATE FLEX DelayedTrades_Flex
IN Trades
OUT OUTPUT STREAM DelayedTrades
SCHEMA TradeSchema
BEGIN
DECLARE
//Data structure combining a dictionary and a vector
dictionary(string, vector(typeof(Trades))) cache;
END;
ON Trades {
/*Get the reference to the vector associated with a Symbol
from the cache.*/
vector(typeof(Trades)) symbolTrades := cache[Trades.Symbol];
if(isnull(symbolTrades)){
/*Create a new vector for this symbol.
Note that you have to use a new to create a vector or
dictionary if it is not directly defined in a global or
local declare/end block. In this example the cache
dictionary does not have to be newed because it is
directly defined in the local declare/end block but the
vector inside the dictionary is not.*/
symbolTrades := new vector(typeof(Trades));
//Add the current row to the vector.
push_back(symbolTrades, Trades);
//Assign the vector to the cache for the current Symbol.
cache[Trades.Symbol] := symbolTrades;
exit;
} else {
/*There is a vector already available for the Symbol, so
insert the current row. Note that you don't have to
assign the vector back into the dictionary because the
vector symbolTrades is a reference to the corresponding
vector in the dictionary.*/
push_back(symbolTrades, Trades);
}
//The vector has reached size N.
if(size(symbolTrades) = NoOfRows) {
//Iterate through the rows and output them.
for(rec in symbolTrades) {
output rec;
}
//Prepare for the next N Rows. Clear the vector.
resize(symbolTrades, 0);
}
};
END;