Use a Flex stream to manage your data.
The example creates three schemas named TradeSchema, Totalschema, and Tutelage, and one input window named TradeWindow. The File CSV Input adapter is attached to TradeWindow.
The example then creates a Flex stream named TrackOldTrades that outputs data from TradeWindow to OldTradeEvents. The switch statement supports only outputs for inserts and updates; as a result, deletes are not passed to the output window
CREATE FLEX TrackOldTrades
IN TradeWindow
OUT OUTPUT WINDOW OldTradeEvents
SCHEMA DeleteOrExpireSchema
Primary Key (DeleteOrExpireTime, Ts)
BEGIN
declare
integer oc;
end;
ON TradeWindow {
oc := getOpcode(TradeWindow);
switch (oc){
case insert:
output [ Ts=TradeWindow.Ts;|
Symbol=TradeWindow.Symbol;
TotalPrice = TradeWindow.Price * TradeWindow.Volume;
Counter =1; ];
break;
case update:
output [ Ts=TradeWindow.Ts;|
Symbol=TradeWindow.Symbol;
TotalPrice = TradeWindow.Price * TradeWindow.Volume;
Counter = 0; ];
break;
case delete:
break;
Default:
break;
} } ;END;
CREATE OUTPUT WINDOW OutWin
Schema Tutelage Primary Key deduced
as
Select o1.Symbol as Symbol,
Sum(o1.TotalPrice) as TotalPrice,
Sum(o1.Counter) as Counter
from OutWin1 o1
Group by o1.Symbol
;