Vectors and Dictionaries

Using a vector and dictionary data structure in SPLASH.

This example implements an OUTPUT AFTER logic that accumulates N Trades per Symbol before the rows are outputted for further processing. A data structure combining a dictionary and a vector caches the rows for every symbol until there is at least N rows for a Symbol. N is controlled by the parameter NoOfRows.

To test this model, run it, view the DelayedTrades stream in the stream viewer and manually load input into the Trades stream. You will see rows in the stream viewer only after you insert N trades for a symbol.

DECLARE
	integer NoOfRows := 3;
END;

CREATE SCHEMA TradeSchema 
	(Id long, Symbol STRING, Price MONEY(4), Volume INTEGER, TradeTime DATE);

CREATE INPUT STREAM Trades SCHEMA TradeSchema;

CREATE FLEX DelayedTrades_Flex
IN Trades
OUT OUTPUT STREAM DelayedTrades
    SCHEMA TradeSchema
BEGIN
    DECLARE
        //Data structure combining a dictionary and a vector
        dictionary(string, vector(typeof(Trades))) cache;
    END;
    ON Trades {
        /*Get the reference to the vector associated with a Symbol 
        from the cache.*/
        vector(typeof(Trades)) symbolTrades := cache[Trades.Symbol];
        if(isnull(symbolTrades)){
            /*Create a new vector for this symbol.
            Note that you have to use a new to create a vector or
            dictionary if it is not directly defined in a global or
            local declare/end block. In this example the cache 
            dictionary does not have to be newed because it is 
            directly defined in the local declare/end block but the
            vector inside the dictionary is not.*/
            symbolTrades := new vector(typeof(Trades));

            //Add the current row to the vector.
            push_back(symbolTrades, Trades);

            //Assign the vector to the cache for the current Symbol.
            cache[Trades.Symbol] := symbolTrades;

            exit;
        } else {
            /*There is a vector already available for the Symbol, so  
            insert the current row. Note that you don't have to 
            assign the vector back into the dictionary because the
            vector symbolTrades is a reference to the corresponding
            vector in the dictionary.*/
            push_back(symbolTrades, Trades);
        }

        //The vector has reached size N.
        if(size(symbolTrades) = NoOfRows) {
            //Iterate through the rows and output them.
            for(rec in symbolTrades) {
                output rec;                    
            }

            //Prepare for the next N Rows. Clear the vector.
            resize(symbolTrades, 0);
        }
    };
END;