This variation of the portfolio valuation project uses a defined module with a named schema to easily scale out the application in a very high volume deployment.
The module, valuation.ccl, computes the VWAP aggregate, and does the join to the Positions window. The project uses the module to divide the moving data into smaller partitions, based on the first letter of the Symbol column. This strategy spreads the load out to more cores, thereby increasing throughput. By using modules, with very little coding you can easily double, quadruple, and so on, the number of partitions.
This example also implements the streaming tick data in PriceFeed as a stream rather than as an input window. Because keeping every tick would use a lot of memory, and because the state is never updated or queried, a stream is a more likely choice than a window in a real-world scenario for this event stream.
CREATE MODULE valuation IN TradesIn,Portfolio OUT ValueBySymbol, VWAP BEGIN IMPORT 'import.ccl'; DECLARE PARAMETER STRING afrom; PARAMETER STRING ato; END; CREATE INPUT STREAM TradesIn SCHEMA TradesSchema ; CREATE STREAM Filter1 AS SELECT * FROM TradesIn WHERE substr(TradesIn.Symbol,1,1) >= afrom and substr(TradesIn.Symbol,1,1) <= ato ; CREATE INPUT WINDOW Portfolio SCHEMA PortfolioSchema PRIMARY KEY (BookId, Symbol); CREATE OUTPUT WINDOW VWAP PRIMARY KEY DEDUCED AS SELECT Filter1.Symbol Symbol , (sum((Filter1.Price * cast(FLOAT ,Filter1.Shares))) / cast(FLOAT ,sum(Filter1.Shares))) AS VWAP, sum (Filter1.Shares ) Total_Shares , valueinserted(Filter1.Price) LastPrice, valueinserted(Filter1.TradeTime) TradeTime FROM Filter1 GROUP BY Filter1.Symbol ; CREATE OUTPUT WINDOW ValueBySymbol SCHEMA (BookId STRING, Symbol STRING, CurrentPosition FLOAT, AveragePosition FLOAT) PRIMARY KEY (BookId, Symbol) AS SELECT Portfolio.BookId AS BookId, Portfolio.Symbol AS Symbol, (VWAP.LastPrice * cast(FLOAT ,Portfolio.SharesHeld)) AS CurrentPosition, (VWAP.VWAP * cast(FLOAT ,Portfolio.SharesHeld)) AS AveragePosition FROM Portfolio JOIN VWAP ON Portfolio.Symbol = VWAP.Symbol; END;
CREATE SCHEMA TradesSchema ( Id integer , Symbol string , TradeTime date , Price float , Shares integer ) ;
CREATE SCHEMA PortfolioSchema ( BookId string , Symbol string , SharesHeld integer ) ;
IMPORT 'import.ccl'; IMPORT 'valuation.ccl'; DECLARE PARAMETER STRING afrom :='A'; PARAMETER STRING ato := 'Z'; END; CREATE INPUT STREAM InputStream1 SCHEMA TradesSchema ; CREATE INPUT WINDOW InputPositions SCHEMA PortfolioSchema PRIMARY KEY ( BookId , Symbol ) ; LOAD MODULE valuation as Valuation1 in TradesIn = InputStream1, Portfolio = InputPositions OUT ValueBySymbol = VbySym1, VWAP = VWAP1 PARAMETERS afrom = 'A', ato = 'J' ; LOAD MODULE valuation as Valuation2 in TradesIn = InputStream1, Portfolio = InputPositions OUT ValueBySymbol = VbySym2, VWAP = VWAP2 PARAMETERS afrom = 'K', ato = 'Q' ; LOAD MODULE valuation as Valuation3 in TradesIn = InputStream1, Portfolio = InputPositions OUT ValueBySymbol = VbySym3, VWAP = VWAP3 PARAMETERS afrom = 'R', ato = 'Z' ; CREATE OUTPUT WINDOW UnionVWAP PRIMARY KEY DEDUCED AS SELECT * FROM VWAP1 UNION SELECT * FROM VWAP3 UNION SELECT * FROM VWAP2 ; CREATE OUTPUT WINDOW ValueBySymbol PRIMARY KEY (BookId,Symbol) AS SELECT * FROM VbySym1 UNION SELECT * FROM VbySym3 UNION SELECT * FROM VbySym2 ; // ---------------------------- // stream ValueByBook CREATE OUTPUT WINDOW ValueByBook SCHEMA (BookId STRING, CurrentPosition FLOAT, AveragePosition FLOAT) PRIMARY KEY DEDUCED AS SELECT ValueBySymbol.BookId AS BookId, sum(ValueBySymbol.CurrentPosition) AS CurrentPosition, sum(ValueBySymbol.AveragePosition) AS AveragePosition FROM ValueBySymbol GROUP BY ValueBySymbol.BookId; ATTACH INPUT ADAPTER Adapter1 TYPE xml_in TO InputStream1 GROUP nostartGroup PROPERTIES dir = '../exampledata' , file = 'pricefeed.xml' , matchStreamName = FALSE , repeatCount = 0 , repeatField = '-' , filePattern = '*.xml' , pollperiod = 0 , safeOps = FALSE , skipDels = FALSE , dateFormat = '%Y-%m-%dT%H:%M:%S' , timestampFormat = '%Y-%m-%dT%H:%M:%S' , blockSize = 1 ; ATTACH INPUT ADAPTER Adapter2 TYPE xml_in TO InputPositions PROPERTIES dir = '../exampledata' , file = 'positions.xml' , matchStreamName = FALSE , repeatCount = 0 , repeatField = '-' , filePattern = '*.xml' , pollperiod = 0 , safeOps = FALSE , skipDels = FALSE , dateFormat = '%Y-%m-%dT%H:%M:%S' , timestampFormat = '%Y-%m-%dT%H:%M:%S' , blockSize = 1 ; ADAPTER START GROUPS nostartGroup nostart ;