Using a vector and dictionary data structure in SPLASH.
This example implements an OUTPUT AFTER logic that accumulates N Trades per Symbol before the rows are outputted for further processing. A data structure combining a dictionary and a vector caches the rows for every symbol until there is at least N rows for a Symbol. N is controlled by the parameter NoOfRows.
To test this model, run it, view the DelayedTrades stream in the stream viewer and manually load input into the Trades stream. You will see rows in the stream viewer only after you insert N trades for a symbol.
DECLARE integer NoOfRows := 3; END; CREATE SCHEMA TradeSchema (Id long, Symbol STRING, Price MONEY(4), Volume INTEGER, TradeTime DATE); CREATE INPUT STREAM Trades SCHEMA TradeSchema; CREATE FLEX DelayedTrades_Flex IN Trades OUT OUTPUT STREAM DelayedTrades SCHEMA TradeSchema BEGIN DECLARE //Data structure combining a dictionary and a vector dictionary(string, vector(typeof(Trades))) cache; END; ON Trades { /*Get the reference to the vector associated with a Symbol from the cache.*/ vector(typeof(Trades)) symbolTrades := cache[Trades.Symbol]; if(isnull(symbolTrades)){ /*Create a new vector for this symbol. Note that you have to use a new to create a vector or dictionary if it is not directly defined in a global or local declare/end block. In this example the cache dictionary does not have to be newed because it is directly defined in the local declare/end block but the vector inside the dictionary is not.*/ symbolTrades := new vector(typeof(Trades)); //Add the current row to the vector. push_back(symbolTrades, Trades); //Assign the vector to the cache for the current Symbol. cache[Trades.Symbol] := symbolTrades; exit; } else { /*There is a vector already available for the Symbol, so insert the current row. Note that you don't have to assign the vector back into the dictionary because the vector symbolTrades is a reference to the corresponding vector in the dictionary.*/ push_back(symbolTrades, Trades); } //The vector has reached size N. if(size(symbolTrades) = NoOfRows) { //Iterate through the rows and output them. for(rec in symbolTrades) { output rec; } //Prepare for the next N Rows. Clear the vector. resize(symbolTrades, 0); } }; END;