Partitioning a Module

Automatic partitioning is the creation of parallel instances of an element and splitting input data across these instances. This can improve the performance of an element and complex projects, which perform computationally expensive operations such as aggregation and joins. This example demonstrates how to partition a module when loading it into an ESP project. Though you can only partition a module on load, this example also demonstrates how you can have nested partitioning when creating a module.

You can create parallel instances of a delta stream, stream, window, or module. Reference streams, unions, inputs, adapters, splitters, and error streams cannot use partitioning.

This example uses the IMPORT statement to load the module defined in the CREATE MODULE example, which is saved as module1.ccl.
import  'module1.ccl';
The example creates two schemas named StocksSchema and ComputedStocksSchema, a default store named MyStore1, and a memory store named MyStore2.
// 
//Schema for Stocks
CREATE SCHEMA StocksSchema (
	Ts      BIGDATETIME,
    Symbol  STRING, 
    Price money(2), 
    Volume  INTEGER
); 

CREATE SCHEMA ComputedStocksSchema (
    Symbol string, 
    AvgPrice money(2), 
    Volume integer,
    NumRecordsForSymbol integer, 
    TotalNumRecords integer , dummy integer);

//
// Creater Memory Stores
CREATE DEFAULT MEMORY STORE MyStore1;
CREATE  MEMORY STORE MyStore2;
The example then creates an input window named InStocks that references StocksSchema, and to which it attaches a File CSV Input adapter named csvInStocks.
//
// Stock Trade Window
CREATE INPUT Window InStocks SCHEMA StocksSchema Primary Key (Ts)KEEP ALL; //

// Computed Stocks Window

// Input Adaptor for InStocks Stream
ATTACH INPUT ADAPTER csvInStocks
TYPE dsv_in
TO InStocks
PROPERTIES 
	blockSize=1, 
	dateFormat='%Y/%m/%d %H:%M:%S', 
	delimiter=',', 
	dir='../../../../examples/ccl/exampledata', 
	expectStreamNameOpcode=false, 
	fieldCount=0, 
	file='stock-trades.csv', 
	filePattern='*.csv', 
	hasHeader=true, 
	safeOps=false, 
	skipDels=false, 
	timestampFormat= '%Y/%m/%d %H:%M:%S';
The example uses the LOAD MODULE statement to load Module1, linking the input window identified within the module to InStocks, and referencing MyStore1. It does not create a new output window, but assigns a new name (CompStocks2) to the window loaded from Module1 and sets a value for the myparam parameter declared in Module1. Finally, the input to this module (InStocks) is partitioned using the HASH partitioning method and three partitions are created:
//Load the module

LOAD MODULE Module1 AS Module1_instance_01 
   IN rawStockFeed = InStocks
    OUT infoByStockSymbol = CompStocks2 
      Parameters myparam = 1000
    STORES store1=MyStore1
    PARTITION BY InStocks HASH (Ts)
    PARTITIONS 3;
Note that you cannot partition a module on creation, you can only partition it on load. You can, however, partition elements within the module. This is known as nested partitioning. For example, the module1.ccl file displays the creation of a new module which inherits an output window that has partitioning. The partitioning is commented out in this file for simplicity.
CREATE OUTPUT WINDOW infoByStockSymbol SCHEMA outputSchema 
	  PRIMARY KEY DEDUCED 
		DECLARE
			integer recordCount:=1;
			integer getRecordCount() {
				return recordCount++ ; 
			}
		END
       //PARTITION BY HASH (Ts)
       //PARTITIONS 2		
	as	 
		SELECT rawStockFeed.Symbol, 
			avg(rawStockFeed.Price) AvgPrice, 
			sum(rawStockFeed.Volume) Volume,
		  count(rawStockFeed.Symbol) NumRecordsForSymbol,
		   getRecordCount() TotalNumRecords, 
		   myparam as dummy
		FROM rawStockFeed 
		where rawStockFeed.Volume > myparam
		GROUP BY rawStockFeed.Symbol;
See Guidelines for Partitioning Modules in the Programmers Guide for additional details.
The example creates an output window named myw2 that references ComputedStocksSchema. The SELECT all (*) syntax outputs all data processed by CompStocks2 to myw2.
Create OUTPUT Window myw2 
	schema ComputedStocksSchema 
	Primary Key (Symbol)
as 
	select * from CompStocks2;