PARTITION BY Clause

Create a specific number of parallel instances of a delta stream, stream, window, or a module. Partition the input to the stream, window, or module by specifying a partition type.

Syntax

PARTITION [BY <Sourcename>] <partition-type>
[, BY <SourceName> ...]
PARTITIONS <IntConst> | <IntParameter>
where <partition-type> is HASH(<column-name>, ...) | ROUNDROBIN | {<Custom>}

Components

SourceName

(Optional for single input but required for multiple inputs) Specify the name of the source stream for the delta stream, stream, window, or module that you want to partition.

<partition-type>
(Required) Specify the type of partition function. The partition function is a demultiplexer which determines the target parallel instances for a given key. There are three valid types of partition function:
  • ROUNDROBIN – Specify the stream name. Using this type of partitioning is not recommended for stateful elements such as windows.
  • HASH – Specify the column names you wish to use as keys. They do not have to be the primary key.
  • CUSTOM – Specify a custom way to partition the input for the stream, window, or module.
IntConst

(Specify either this option or IntParameter) Specify the number of parallel instances you wish to create. This value must be a positive number. The minimum value is one and the maximum value is 65535.

IntParameter

(Specify either this option or IntConst) Specify the parameter name that is provided in the CCR project configuration file which specifies the number of parallel instances.

The value of this parameter must be a positive number. The minimum value is one and the maximum value is 65535.

Usage

Use the PARTITION BY clause to improve the performance of complex projects by automatically splitting inputs for a delta stream, stream, window, or module into a specified number of parallel instances. Add this clause within a CCL statement such as CREATE DELTA STREAM, CREATE STREAM, CREATE WINDOW, or LOAD MODULE. Specify the partitioning degree, elements to be partitioned, and a partitioning function.

The partitioning degree is the natural number of parallel instances you wish to create for a given element (delta stream, stream, window, or module). As an alternative to specifying the partitioning degree as a constant, you can specify it using an integer parameter with an optional default value. You can then provide the actual value for the parameter in the CCR project configuration file.

The partitioning function is effectively a demultiplexer which determines the target parallel instances for a given partitioning key. There are three valid types of partition functions: ROUNDROBIN, HASH, and CUSTOM. Choose a type based on the calculations you are performing on the input data. For example, ROUNDROBIN is sufficient for stateless operations like simple filters, but not for aggregation as this would produce differing results. HASH is necessary for grouping records together, but grouping may not evenly distribute the data across instances.

When using the PARTITION BY clause, partition at least one input stream of the corresponding element by specifying a partitioning function. In the case that an element accepts multiple input streams and some of these input streams do not have a partitioning function defined, those streams are broadcast to all parallel instances.

The CUSTOM partitioning function is defined as an inline function which does not take any parameters. This function creates an implicit global parameter called <targetName>_partitions where <targetName> represents the name of the current element you are partitioning and partitions is a fixed part of the parameter name. For example, if you are partitioning an output window called maxPriceW, use maxPriceW_partitions as the global parameter name. The value of this parameter is equal to the number of partitions.
create input stream priceW schema (isin string, price money(2));

create output window maxPriceW schema (isin string, maxPrice money(2)) 
primary key deduced 
keep 5 minutes 
PARTITION by priceW 
{
     integer hashValue := ascii(substr(priceW.isin,1,1));
     return hashValue % maxPriceW_partitions;	
}
PARTITIONS 2 
as
SELECT upper(left(priceW.isin,1)) isin, max(priceW.price) maxPrice 
FROM priceW group by upper(left(priceW.isin,1));
Do not explicitly provide a runtime value for this parameter. To ensure uniqueness, the compiler throws an error if you create a global variable by the same name.

The CUSTOM partitioning function returns an integer which determines the parallel instance that should receive a given event (row). A modulo operation applies to this result, which ensures that the returned instance number is greater than or equal to zero and is less than the number of available instances. This prevents runtime errors. For example, if you create three partitions, those partitions will have the IDs 0, 1, and 2.

Note: If you do not specify a return statement,.Event Stream Processor automatically adds a return null statetement to the end of the partitioning logic to ensure that the custom partitioning logic always has a return value. Input records that the custom partitioning logic evaluates to null are logged as bad records at runtime, and the ESP Server gives a warning message.

Ordering of Partitioned Results

Note that for the same input data the output of partitioned elements may differ from the output of a non-partitioned element. This is caused by the fact that:
  • operating systems schedule threads in a non-deterministic way, and
  • parallel execution of instances using multiple operating system threads introduces indeterminism, and
  • to maximize the throughput of the partitioned element, no explicit synchronization between parallel instances takes place
The stream partitions which are instantiated by the ESP Server at runtime are local and cannot be subscribed or published to. However, these streams are visible in Studio so you can view their utilization and adjust the partition count accordingly.

Restrictions

You cannot apply the PARTITION BY clause to these elements: inputs, splitters, unions, reference streams, and adapters. Doing so results in a syntax error. However, you can partition these elements within a module that you are partitioning.

Example: Roundrobin Partitioning

Here is an example of ROUNDROBIN partitioning on a CCL query with one input window (TradeWindow):

create input window TradeWindow
schema (
		Ts BIGDATETIME, 
		Symbol STRING, 
		Price MONEY(2), 
		Volume INTEGER)
primary key (Ts);

create output window TradeOutWindow
schema (
		Ts BIGDATETIME, 
		Symbol STRING, 
		Price MONEY(2), 
		Volume INTEGER)
primary key (Ts)
PARTITION
    by TradeWindow ROUNDROBIN
PARTITIONS 2
as
SELECT * FROM TradeWindow 
WHERE TradeWindow.Volume > 10000;
This example partitions the output window, TradeOutWindow, using ROUNDROBIN partitioning and creates two parallel instances.

Example: HASH Partitioning

Here is an example of HASH partitioning on a CCL query with one input window (priceW):
create input stream priceW 
schema (isin string, price money(2));

create output window maxPriceW 
schema (isin string, maxPrice money(2)) 
primary key deduced keep 5 minutes
PARTITION 
	by priceW HASH(isin)
PARTITIONS 5
as
SELECT upper(left(priceW.isin,1)) isin, max(priceW.price) maxPrice FROM priceW group by upper(left(priceW.isin,1));
This example partitions the output window, maxPriceW, using HASH partitioning and creates five parallel instances.
Here is an example of HASH partitioning on one of the input windows (priceW) on a join while the other input window (volumeW) is broadcast:
create input window priceW 
schema (isin string, price float) primary key (isin) keep 5 minutes;

create input window volumeW 
schema (isin string, volume integer) 
primary key (isin) keep 5 minutes;

create output window vwapW 
primary key deduced keep 1 minute
PARTITION
    by priceW HASH (isin)
PARTITIONS 2
as
SELECT priceW.isin, vwap(priceW.price, volumeW.volume) vwap_val
FROM priceW LEFT JOIN volumeW ON priceW.isin = volumeW.isin
group by priceW.isin;
This example partitions the output window, vwapW, using HASH partitioning and creates two parallel instances.

Example: CUSTOM Partitioning

Here is an example of CUSTOM partitioning on a CCL query with two input windows (priceW and volumeW):
create input window priceW 
schema (isin string, price float) 
primary key (isin) keep 5 minutes;

create input window volumeW 
schema (isin string, volume integer) 
primary key (isin) keep 5 minutes;


create output window vwapW 
schema (isin string, vwap float)
primary key deduced 
partition 
by priceW {
return ascii(substr(priceW.isin,1,1)) % vwapW_partitions;
},
by volumeW {
return ascii(substr(volumeW.isin,1,1)) % vwapW_partitions;
}
partitions 2
as
SELECT priceW.isin, vwap(priceW.price, volumeW.volume) vwap_val
FROM priceW LEFT JOIN volumeW ON priceW.isin = volumeW.isin
group by priceW.isin;
This example partitions the output window, vwapW, using a CUSTOM partitioning function and creates two parallel instances.