Use multiple Flex streams with multiple inputs.
The example creates two input windows named Trades 1 and Trades 2.
The example then creates a Flex stream named TradesMSFTFlexStream that joins the two input windows, and adds an output window called TradesMSFTFlexStream.
CREATE FLEX Ccl_2_TradesMSFTFlexStream IN Trades2, Trades1 OUT OUTPUT WINDOW TradesMSFTFlexStream SCHEMA (Id INTEGER, Symbol STRING, TradeTime DATE, Price FLOAT, Shares INTEGER, Corr INTEGER) PRIMARY KEY (Id) BEGIN ON Trades1 { if (Trades1.Symbol = 'MSFT') output copyRecord(Trades1); }; ON Trades2 { if (Trades2.Symbol = 'MSFT') output copyRecord(Trades2); }; END;
The example creates another Flex stream (TradesCSCOFlexStream) that joins the Trades1 and Trades2 windows.
CREATE FLEX Ccl_4_TradesCSCOFlexStream IN Trades1, Trades2 OUT OUTPUT WINDOW TradesCSCOFlexStream SCHEMA (Id INTEGER, Symbol STRING, TradeTime DATE, Price FLOAT, Shares INTEGER, Corr INTEGER) PRIMARY KEY (Id) BEGIN ON Trades1 { if (Trades1.Symbol = 'CSCO') output copyRecord(Trades1); }; ON Trades2 { if (Trades2.Symbol = 'CSCO') output copyRecord(Trades2); };
Finally, the example creates a Flex stream named TradesPickedFlexStream that joins TradesMSFTFlexStream and TradesCSCOFlexStream.
CREATE FLEX Ccl_5_TradesPickedFlexStream IN TradesMSFTFlexStream, TradesCSCOFlexStream OUT OUTPUT WINDOW TradesPickedFlexStream SCHEMA (Id INTEGER, Symbol STRING, TradeTime DATE, Price FLOAT, Shares INTEGER, Corr INTEGER) PRIMARY KEY (Id) BEGIN ON TradesMSFTFlexStream { if (TradesMSFTFlexStream.Price >= 93) output copyRecord(TradesMSFTFlexStream); }; ON TradesCSCOFlexStream { if (TradesCSCOFlexStream.Price >= 74.5) output copyRecord(TradesCSCOFlexStream); }; END;