Trades Log

Use a Flex stream to manually delete data from windows.

The example creates a MEMORY store named store1, then two input windows named Trades and Trades_truncate that reference store1.

The example attaches a File CSV Input adapter named Adapter1 to Trades. The adapter reads sample data from the file pstrades1.xml in the exampledata folder, and publishes the information to Trades.

ATTACH INPUT ADAPTER Adapter1 
  TYPE xml_in TO Trades 
  PROPERTIES 
  	dir = '../exampledata' , 
  	file = 'pstrades1.xml' ;

The example creates a Flex statement named Ccl_2_Trades_log that operates on Trades and Trades_truncate, producing an output window named Trades_log. Using a DECLARE block within the Flex statement, the example declares two longs to store the lowest and the highest sequence number produced in the example so far.

CREATE FLEX Ccl_2_Trades_log
 IN Trades, Trades_truncate
 OUT  OUTPUT   WINDOW Trades_log 
 	SCHEMA (SequenceNumber LONG, GDOpcode INTEGER, Id INTEGER, 
 		Symbol STRING, TradeTime DATE, Shares INTEGER, Price MONEY(4))
	PRIMARY KEY (SequenceNumber)
 	STORE store1
BEGIN
DECLARE
	LONG low;
	LONG high;
	
END;

An ON clause executes the code below anytime a record comes through on the Trades window. A series of if, else, and while conditions tell the project server that, if this is the first record being seen by the Flex stream, it should initialize the high and low sequence numbers. The example uses an iterator to scan all of the records in the Trades_log to find the lowest and highest sequence numbers stored in the log. Once the example has finished iterating through Trades_log, the highest sequence number that exists in the log and the lowest sequence number are stored, and the iterator is deleted.

ON Trades { 
    { 
        LONG sn;
        /* on the first record, initialize the low, high record
        numbers */
        if (isnull(high)) 
            { 
                for ( Trades_log in Trades_log_stream ) 
                    {
                        if (isnull (high))
                            {
                                high := 0; low := 9223372036854775807;
                            } 
                        sn := Trades_log.SequenceNumber;
                        if (sn > high)
                            {
                                high := sn;
                            }
                        if (sn < low)
                            {
                                low := sn;
                            }
                    }
                /* If high is still null there no records in log stream
                */
                if (isnull(high))
                    {
                        high := -1; low := 0;
                    }
            }
        /* output the incoming record with a record number + opcode 
        prepended */
        high := high + 1
        output [SequenceNumber = high; |
        GDOpcode = getOpcode(Trades);
        Id=Trades.Id;
        Symbol=Trades.Symbol;
        TradeTime=Trades.TradeTime;
        Shares=Trades.Shares;
        Price=Trades.Price;
        ];
    } ;               
       

The example increments the highest sequence number by 1, and assigns this sequence number to the current trade it is processing. For the first record, the sequence number is 0.

 high:=(high+ cast(LONG ,1));
        output [SequenceNumber=high; |GDOpcode=getOpcode(Trades); 
          Id=Trades.Id; Symbol=Trades.Symbol;
         TradeTime=Trades.TradeTime; Shares=Trades.Shares; Price=Trades.Price; ];
    }

		
 }; 

An ON clause executes this code anytime a record comes through on the Trades_truncate window:

ON Trades_truncate { 
    { 
        LONG i;
        [LONG SequenceNumber; |INTEGER GDOpcode; INTEGER Id; 
        	STRING Symbol; DATE TradeTime; INTEGER Shares; MONEY(4) Price; ] outrec;
       

A series of if and while conditions provides the format for output. The example gets the sequence number that was provided on Trades_truncate. All records with sequence numbers lower than this number are removed from the trades log. If the sequence number requested is larger than or equal to the largest sequence number in the trades log, the example removes all but the latest record from the trades log.

 i:=Trades_truncate.SequenceNumber;
        if ((high> cast(LONG ,0))) 
            { 
                if ((i>= high)) 
                    i:=(high- cast(LONG ,1));
                if (((low<= i) and (i< high))) 
                    { 
                        while ((low<= i)) 
                        { 
                           

The example creates a record with an opcode of 13 (SAFE DELETE) for each sequence number lower than the value provided. Safe delete means the record is deleted from all subsequent windows if it exists; no error occurs if it does not exist.

 																									outrec:=[SequenceNumber=low; |GDOpcode=cast(INTEGER ,null); 
                            	Id=cast(INTEGER ,null); 
                            	Symbol=cast(STRING ,null); 
                            	TradeTime=cast(DATE ,null); 
                            	Shares=cast(INTEGER ,null); 
                            	Price=cast(MONEY(4),null); ];
                            setOpcode(outrec,13);
                            output outrec;
                            low:=(low+ cast(LONG ,1));
                        }
                    }
            }
    }

		
 }; 

END;