This variation of the portfolio valuation project uses a defined module with a named schema to easily scale out the application in a very high volume deployment.
The module, valuation.ccl, computes the VWAP aggregate, and does the join to the Positions window. The project uses the module to divide the moving data into smaller partitions, based on the first letter of the Symbol column. This strategy spreads the load out to more cores, thereby increasing throughput. By using modules, with very little coding you can easily double, quadruple, and so on, the number of partitions.
This example also implements the streaming tick data in PriceFeed as a stream rather than as an input window. Because keeping every tick would use a lot of memory, and because the state is never updated or queried, a stream is a more likely choice than a window in a real-world scenario for this event stream.
CREATE MODULE valuation
IN TradesIn,Portfolio
OUT ValueBySymbol, VWAP
BEGIN
IMPORT 'import.ccl';
DECLARE
PARAMETER STRING afrom;
PARAMETER STRING ato;
END;
CREATE INPUT STREAM TradesIn
SCHEMA TradesSchema ;
CREATE STREAM Filter1 AS
SELECT * FROM TradesIn
WHERE substr(TradesIn.Symbol,1,1) >= afrom
and substr(TradesIn.Symbol,1,1) <= ato
;
CREATE INPUT WINDOW Portfolio
SCHEMA PortfolioSchema
PRIMARY KEY (BookId, Symbol);
CREATE OUTPUT WINDOW VWAP
PRIMARY KEY DEDUCED AS
SELECT Filter1.Symbol Symbol ,
(sum((Filter1.Price * cast(FLOAT ,Filter1.Shares))) /
cast(FLOAT ,sum(Filter1.Shares)))
AS VWAP,
sum (Filter1.Shares ) Total_Shares ,
valueinserted(Filter1.Price) LastPrice,
valueinserted(Filter1.TradeTime) TradeTime
FROM Filter1
GROUP BY Filter1.Symbol ;
CREATE OUTPUT WINDOW ValueBySymbol
SCHEMA (BookId STRING, Symbol STRING, CurrentPosition FLOAT, AveragePosition FLOAT)
PRIMARY KEY (BookId, Symbol) AS
SELECT
Portfolio.BookId AS BookId,
Portfolio.Symbol AS Symbol,
(VWAP.LastPrice * cast(FLOAT ,Portfolio.SharesHeld))
AS CurrentPosition,
(VWAP.VWAP * cast(FLOAT ,Portfolio.SharesHeld))
AS AveragePosition
FROM Portfolio JOIN
VWAP
ON Portfolio.Symbol = VWAP.Symbol;
END;
CREATE SCHEMA TradesSchema
( Id integer ,
Symbol string ,
TradeTime date ,
Price float ,
Shares integer ) ;
CREATE SCHEMA PortfolioSchema
( BookId string ,
Symbol string ,
SharesHeld integer ) ;
IMPORT 'import.ccl';
IMPORT 'valuation.ccl';
DECLARE
PARAMETER STRING afrom :='A';
PARAMETER STRING ato := 'Z';
END;
CREATE INPUT STREAM InputStream1 SCHEMA TradesSchema ;
CREATE INPUT WINDOW InputPositions
SCHEMA PortfolioSchema PRIMARY KEY ( BookId , Symbol ) ;
LOAD MODULE valuation as Valuation1
in TradesIn = InputStream1, Portfolio = InputPositions
OUT ValueBySymbol = VbySym1, VWAP = VWAP1
PARAMETERS afrom = 'A', ato = 'J'
;
LOAD MODULE valuation as Valuation2
in TradesIn = InputStream1, Portfolio = InputPositions
OUT ValueBySymbol = VbySym2, VWAP = VWAP2
PARAMETERS afrom = 'K', ato = 'Q'
;
LOAD MODULE valuation as Valuation3
in TradesIn = InputStream1, Portfolio = InputPositions
OUT ValueBySymbol = VbySym3, VWAP = VWAP3
PARAMETERS afrom = 'R', ato = 'Z'
;
CREATE OUTPUT WINDOW UnionVWAP
PRIMARY KEY DEDUCED
AS SELECT * FROM VWAP1
UNION SELECT * FROM VWAP3
UNION SELECT * FROM VWAP2 ;
CREATE OUTPUT WINDOW ValueBySymbol
PRIMARY KEY (BookId,Symbol)
AS SELECT * FROM VbySym1
UNION SELECT * FROM VbySym3
UNION SELECT * FROM VbySym2 ;
// ----------------------------
// stream ValueByBook
CREATE OUTPUT WINDOW ValueByBook
SCHEMA (BookId STRING, CurrentPosition FLOAT, AveragePosition FLOAT)
PRIMARY KEY DEDUCED AS
SELECT ValueBySymbol.BookId AS BookId,
sum(ValueBySymbol.CurrentPosition) AS CurrentPosition,
sum(ValueBySymbol.AveragePosition) AS AveragePosition
FROM ValueBySymbol
GROUP BY ValueBySymbol.BookId;
ATTACH INPUT ADAPTER Adapter1 TYPE xml_in TO InputStream1
GROUP nostartGroup
PROPERTIES dir = '../exampledata' ,
file = 'pricefeed.xml' ,
matchStreamName = FALSE ,
repeatCount = 0 ,
repeatField = '-' ,
filePattern = '*.xml' ,
pollperiod = 0 ,
safeOps = FALSE ,
skipDels = FALSE ,
dateFormat = '%Y-%m-%dT%H:%M:%S' ,
timestampFormat = '%Y-%m-%dT%H:%M:%S' ,
blockSize = 1 ;
ATTACH INPUT ADAPTER Adapter2 TYPE xml_in TO InputPositions
PROPERTIES dir = '../exampledata' ,
file = 'positions.xml' ,
matchStreamName = FALSE ,
repeatCount = 0 ,
repeatField = '-' ,
filePattern = '*.xml' ,
pollperiod = 0 ,
safeOps = FALSE ,
skipDels = FALSE ,
dateFormat = '%Y-%m-%dT%H:%M:%S' ,
timestampFormat = '%Y-%m-%dT%H:%M:%S' ,
blockSize = 1 ;
ADAPTER START GROUPS nostartGroup nostart ;