Dictionary of Dictionaries

This example uses one dictionary to sort incoming data for entry into other dictionaries.

This sample shows a flex stream that uses a dictionary to index other dictionaries. The input data comes from a CSV file, ticks.csv. The records are inserted into an input window named TickIn using a File CSV Input adapter. Each record describes a stock sale: it includes the number of shares sold, the symbol of the stock, and the sector of the market (e.g. software) to which the stock belongs. The first dictionary uses the market sector to select (or create) a dictionary and that dictionary uses the symbol to store the record in a cache variable. Every 60 seconds the contents of the cache are sent to the output stream, and removed from the cache.

To check the content of the cache,
  1. Enter esp_client -p localhost:9786/default/project_name -c studio:studio at the system prompt.
  2. Enter trace_mode on at the esp_client> prompt.
  3. Enter ex `var` `outstream` `cache` at the esp_client> prompt.
After sixty seconds the data are sent to the output stream and removed from the cache.
CREATE INPUT WINDOW TickIn SCHEMA (seqnum integer,sector string,symbol string,volume integer)
PRIMARY KEY(seqnum);

CREATE FLEX flex1
IN TickIn
OUT OUTPUT STREAM outstream SCHEMA (sector string,symbol string,volume integer)
BEGIN
DECLARE
typedef[string sector; string symbol; | integer volume] outrec;

dictionary(string,dictionary(string,integer)) cache;
END;
ON TickIn
{
    /* Storing data in a dictionary */
    dictionary(string,integer) symcache;
    integer prevTotalVolume := 0;
    symcache:=cache[TickIn.sector];
    /* Check to see if there is a dictionary assigned for the sector, if not create a new
     * dictionary
     */
    if(isnull(symcache))
    {
        symcache := new dictionary(string,integer);
        cache[TickIn.sector]:=symcache;
    } else {
        prevTotalVolume := symcache[TickIn.symbol];
    }
    symcache[TickIn.symbol]:= prevTotalVolume + TickIn.volume;
};
EVERY 60 SECONDS
{
    /* Displaying the content of the dictionary */
    for (sector_key in cache)
    {
        dictionary(string,integer) symcache;
        /* Here symcache is a reference variable so deleting or modifying the
         * content of symcache will modify the actual dictionary content
         */
        symcache:=cache[sector_key];
        for(symbol_key in cache[sector_key])
        {

            outrec rec :=[
            sector = sector_key;
            symbol = symbol_key;
            volume = symcache[symbol_key];

            ];
            output setOpcode(rec,insert);
        }
    }

    /* Deleting the content of the dictionary */
    for(sector_key in cache)
    {

        clear(cache[sector_key]);
    }
    clear(cache);
};
END;

ATTACH INPUT ADAPTER csvInConn1
TYPE dsv_in
TO TickIn
PROPERTIES
  blockSize=1,
  dateFormat='%Y/%m/%d %H:%M:%S',
  delimiter=',',
  dir='../exampledata',
  expectStreamNameOpcode=false,
  fieldCount=0,
  file='tick.csv',
  filePattern='*.csv',
  hasHeader=false,
  safeOps=false,
  skipDels=false,
  Pollperiod=2,
  timestampFormat='%Y/%m/%d %H:%M:%S';