You can improve the performance of a given element and complex projects, which perform computationally expensive operations such as aggregation and joins, by using automatic partitioning. Automatic partitioning is the creation of parallel instances of a given element and partitioning input data into these instances. Partitioning data this way results in higher performance as the workload is split across the parallel instances.
You can create parallel instances of a delta stream, stream, window, or module by using the PARTITION BY clause. Add this clause within a CCL statement such as CREATE DELTA STREAM, CREATE STREAM, CREATE WINDOW, or LOAD MODULE. Specify the partitioning degree, elements to be partitioned, and a partitioning function.
The partitioning degree is the natural number of parallel instances you wish to create for a given element (delta stream, stream, window, or module). As an alternative to specifying the partitioning degree as a constant, you can specify it using an integer parameter with an optional default value. You can then provide the actual value for the parameter in the CCR project configuration file.
When using the PARTITION BY clause, partition at least one input stream of the corresponding element by specifying a partitioning function. In the case that an element accepts multiple input streams and some of these input streams do not have a partitioning function defined, those streams are broadcast to all parallel instances.
The partitioning function is effectively a demultiplexer which determines the target parallel instances for a given partitioning key. There are three valid types of partition functions: ROUNDROBIN, HASH, and CUSTOM. Choose a type based on the calculations you are performing on the input data. For example, ROUNDROBIN is sufficient for stateless operations like simple filters, but not for aggregation as this would produce differing results. HASH is necessary for grouping records together, but grouping may not evenly distribute the data across instances.
create input stream priceW schema (isin string, price money(2)); create output window maxPriceW schema (isin string, maxPrice money(2)) primary key deduced keep 5 minutes PARTITION by priceW { integer hashValue := ascii(substr(priceW.isin,1,1)); return hashValue % maxPriceW_partitions; } PARTITIONS 2 as SELECT upper(left(priceW.isin,1)) isin, max(priceW.price) maxPrice FROM priceW group by upper(left(priceW.isin,1));Do not explicitly provide a runtime value for this parameter. To ensure uniqueness, the compiler throws an error if you create a global variable by the same name.
The CUSTOM partitioning function returns an integer which determines the parallel instance that should receive a given event (row). A modulo operation applies to this result, which ensures that the returned instance number is greater than or equal to zero and is less than the number of available instances. This prevents runtime errors. For example, if you create three partitions, those partitions will have the IDs 0, 1, and 2.
You cannot apply the PARTITION BY clause to these elements: inputs, splitters, unions, reference streams, and adapters. Doing so results in a syntax error. However, you can partition these elements within a module that you are partitioning.
Here is an example of ROUNDROBIN partitioning on a CCL query with one input window (TradeWindow):
create input window TradeWindow schema ( Ts BIGDATETIME, Symbol STRING, Price MONEY(2), Volume INTEGER) primary key (Ts); create output window TradeOutWindow schema ( Ts BIGDATETIME, Symbol STRING, Price MONEY(2), Volume INTEGER) primary key (Ts) PARTITION by TradeWindow ROUNDROBIN PARTITIONS 2 as SELECT * FROM TradeWindow WHERE TradeWindow.Volume > 10000;This example partitions the output window, TradeOutWindow, using ROUNDROBIN partitioning and creates two parallel instances.
create input stream priceW schema (isin string, price money(2)); create output window maxPriceW schema (isin string, maxPrice money(2)) primary key deduced keep 5 minutes PARTITION by priceW HASH(isin) PARTITIONS 5 as SELECT upper(left(priceW.isin,1)) isin, max(priceW.price) maxPrice FROM priceW group by upper(left(priceW.isin,1));This example partitions the output window, maxPriceW, using HASH partitioning and creates five parallel instances.
create input window priceW schema (isin string, price float) primary key (isin) keep 5 minutes; create input window volumeW schema (isin string, volume integer) primary key (isin) keep 5 minutes; create output window vwapW primary key deduced keep 1 minute PARTITION by priceW HASH (isin) PARTITIONS 2 as SELECT priceW.isin, vwap(priceW.price, volumeW.volume) vwap_val FROM priceW LEFT JOIN volumeW ON priceW.isin = volumeW.isin group by priceW.isin;This example partitions the output window, vwapW, using HASH partitioning and creates two parallel instances.
create input window priceW schema (isin string, price float) primary key (isin) keep 5 minutes; create input window volumeW schema (isin string, volume integer) primary key (isin) keep 5 minutes; create output window vwapW schema (isin string, vwap float) primary key deduced partition by priceW { return ascii(substr(priceW.isin,1,1)) % vwapW_partitions; }, by volumeW { return ascii(substr(volumeW.isin,1,1)) % vwapW_partitions; } partitions 2 as SELECT priceW.isin, vwap(priceW.price, volumeW.volume) vwap_val FROM priceW LEFT JOIN volumeW ON priceW.isin = volumeW.isin group by priceW.isin;This example partitions the output window, vwapW, using a CUSTOM partitioning function and creates two parallel instances.