CCL for Sample Project with Modules

This variation of the portfolio valuation project uses a defined module with a named schema to easily scale out the application in a very high volume deployment.

The module, valuation.ccl, computes the VWAP aggregate, and does the join to the Positions window. The project uses the module to divide the moving data into smaller partitions, based on the first letter of the Symbol column. This strategy spreads the load out to more cores, thereby increasing throughput. By using modules, with very little coding you can easily double, quadruple, and so on, the number of partitions.

This example also implements the streaming tick data in PriceFeed as a stream rather than as an input window. Because keeping every tick would use a lot of memory, and because the state is never updated or queried, a stream is a more likely choice than a window in a real-world scenario for this event stream.

Create Module valuation

The valuation module:
  1. Defines the input stream TradesIn.
  2. Defines a stream, Filter1, that filters TradesIn data into a substream based on the declared parameters afrom and ato.
  3. Defines the input window Portfolio.
  4. Defines the VWAP aggregate as an output window.
  5. Defines another output window, ValueBySymbol, that performs a join similar to the Join simple query in the simple PortfolioValuation project, with the addition of a cast for the float data.
CREATE MODULE valuation
IN TradesIn,Portfolio
OUT ValueBySymbol,   VWAP

BEGIN
	IMPORT 'import.ccl';
	
	DECLARE
		PARAMETER STRING afrom;
		PARAMETER STRING ato;
	END;
	
	CREATE INPUT STREAM TradesIn 
       SCHEMA TradesSchema ;
	CREATE  STREAM Filter1  AS 
       SELECT * FROM TradesIn
         WHERE substr(TradesIn.Symbol,1,1) >= afrom 
               and substr(TradesIn.Symbol,1,1) <= ato  
	;

	CREATE INPUT WINDOW Portfolio 
       SCHEMA PortfolioSchema 
       PRIMARY KEY (BookId, Symbol);
	CREATE OUTPUT WINDOW VWAP 
       PRIMARY KEY DEDUCED AS 
       SELECT Filter1.Symbol Symbol , 
	   (sum((Filter1.Price * cast(FLOAT ,Filter1.Shares))) / 
          cast(FLOAT ,sum(Filter1.Shares))) 
       AS VWAP, 
	   sum (Filter1.Shares ) Total_Shares ,
	   valueinserted(Filter1.Price) LastPrice, 
	   valueinserted(Filter1.TradeTime) TradeTime 
	   FROM Filter1 
	   GROUP BY Filter1.Symbol ;

	CREATE OUTPUT WINDOW ValueBySymbol
		SCHEMA (BookId STRING, Symbol STRING, CurrentPosition FLOAT, AveragePosition FLOAT)
		PRIMARY KEY (BookId, Symbol) AS 
		SELECT  
            Portfolio.BookId  AS BookId,  
            Portfolio.Symbol  AS Symbol,  
		    (VWAP.LastPrice * cast(FLOAT ,Portfolio.SharesHeld))  
                AS CurrentPosition,  
			(VWAP.VWAP * cast(FLOAT ,Portfolio.SharesHeld))  
                AS AveragePosition 
			FROM Portfolio JOIN
          	VWAP
	 			ON Portfolio.Symbol = VWAP.Symbol;

END;

Create Named Schema TradesSchema

CREATE SCHEMA  TradesSchema 
    ( Id integer , 
    Symbol string , 
    TradeTime date , 
    Price float , 
    Shares integer )  ;

Create Named Schema PortfolioSchema

CREATE SCHEMA PortfolioSchema 
    ( BookId string , 
    Symbol string , 
    SharesHeld integer ) ;

Import and Load the valuation Module

In the parent scope, the valuation module is loaded three times, as Valuation1, Valuation2, and Valuation3.
  1. The IN clause binds the input streams in the module to streams in the parent scope. TradesIn is bound to InputStream1, and Portfolio is bound to InputPositions.
  2. The OUT clause binds the output window in the module, ValueBySymbol, with the three parameterized output windows, VbySym1, VbySym2, and VbySym3, and partitions the VWAP aggregate as VWAP1, VWAP2, and VWAP3.
  • InputStream1 – Input stream based on the imported schema, TradesSchema.
  • InputPositions – Input window based on the imported schema, PortfolioSchema.
  • UnionVWAP – Output window created as a UNION of the partitioned VWAP aggregate.
IMPORT 'import.ccl';
IMPORT 'valuation.ccl';
DECLARE
	PARAMETER STRING afrom :='A';
	PARAMETER STRING ato := 'Z';
END; 

CREATE INPUT STREAM InputStream1 SCHEMA TradesSchema  ;


CREATE INPUT WINDOW InputPositions 
  SCHEMA PortfolioSchema PRIMARY KEY ( BookId , Symbol ) ;
  LOAD MODULE valuation as Valuation1 
    in TradesIn = InputStream1, Portfolio = InputPositions 
    OUT ValueBySymbol = VbySym1,  VWAP = VWAP1
    PARAMETERS afrom = 'A', ato = 'J'
    ;
  LOAD MODULE valuation as Valuation2 
    in TradesIn = InputStream1, Portfolio = InputPositions 
    OUT ValueBySymbol = VbySym2, VWAP = VWAP2
    PARAMETERS afrom = 'K', ato = 'Q'
    ;
  LOAD MODULE valuation as Valuation3 
    in TradesIn = InputStream1, Portfolio = InputPositions 
    OUT ValueBySymbol = VbySym3,  VWAP = VWAP3
    PARAMETERS afrom = 'R', ato = 'Z'
    ;

CREATE OUTPUT WINDOW UnionVWAP
 PRIMARY KEY DEDUCED
 AS SELECT * FROM VWAP1
    UNION SELECT * FROM VWAP3 
    UNION SELECT * FROM VWAP2 ;

CREATE OUTPUT WINDOW ValueBySymbol
 PRIMARY KEY (BookId,Symbol)
 AS SELECT * FROM VbySym1
    UNION SELECT * FROM VbySym3
    UNION SELECT * FROM VbySym2 ;

// ----------------------------
//  stream ValueByBook

CREATE  OUTPUT  WINDOW ValueByBook
	SCHEMA (BookId STRING, CurrentPosition FLOAT, AveragePosition FLOAT)
 	PRIMARY KEY DEDUCED AS 
	SELECT ValueBySymbol.BookId AS BookId,
		sum(ValueBySymbol.CurrentPosition) AS CurrentPosition, 
		sum(ValueBySymbol.AveragePosition) AS AveragePosition
	FROM ValueBySymbol
	GROUP BY ValueBySymbol.BookId;
	
	
ATTACH INPUT ADAPTER Adapter1 TYPE xml_in TO InputStream1 
GROUP nostartGroup
PROPERTIES dir = '../exampledata' ,
file = 'pricefeed.xml' ,
matchStreamName = FALSE ,
repeatCount = 0 ,
repeatField = '-' ,
filePattern = '*.xml' ,
pollperiod = 0 ,
safeOps = FALSE ,
skipDels = FALSE ,
dateFormat = '%Y-%m-%dT%H:%M:%S' ,
timestampFormat = '%Y-%m-%dT%H:%M:%S' ,
blockSize = 1 ;

ATTACH INPUT ADAPTER Adapter2 TYPE xml_in TO InputPositions
PROPERTIES dir = '../exampledata' ,
file = 'positions.xml' ,
matchStreamName = FALSE ,
repeatCount = 0 ,
repeatField = '-' ,
filePattern = '*.xml' ,
pollperiod = 0 ,
safeOps = FALSE ,
skipDels = FALSE ,
dateFormat = '%Y-%m-%dT%H:%M:%S' ,
timestampFormat = '%Y-%m-%dT%H:%M:%S' ,
blockSize = 1 ;

ADAPTER START GROUPS nostartGroup nostart   ;
Related concepts
SPLASH
CCL Authoring
Related tasks
Editing in the CCL Editor
Related reference
CCL for the Sample Project