Guidelines for Partitioning Elements with Retention Policies

General guidelines, tips, and examples of partitioning elements which use retention policies.

A retention policy specifies the maximum number of rows or the maximum amount of time that data is retained in a window. If you partition a window for which you have previously specified a retention policy, the window retains the policy.

An exception to this behavior is row-based retention because the state maintained by the ESP project is larger. For example, in a non-partitioned project, specifying a retention policy for N rows stores exactly N rows. However, if using stream partitioning and you create K number of partitions, each of these partitions stores N rows.

If you specified a retention policy for a union stream, which serves as a connection point for the other downstream elements, the policy of N rows is preserved. However, there are no guarantees as to which rows will be stored in the union because the policy stores different row sets based on the order in which the events arrive at the union.

Here is a non-partitioned scenario where the window stores the exact number of specified rows (2 rows in this example):
CREATE INPUT WINDOW Trades SCHEMA (TradeId long, Brand string, Volume integer)
PRIMARY KEY (TradeId);

CREATE OUTPUT WINDOW Last2Trades SCHEMA (Brand string, AvgVolume integer)
PRIMARY KEY DEDUCED
KEEP 2 ROWS
AS
SELECT T.Brand, avg(T.Volume) as AvgVolume FROM Trades as T GROUP BY (T.Brand);


Once the output window is partitioned, each partition stores the specified number of rows (<partitionCount>*2 rows in this example). The union also carries an automatically derived retention policy of 2 rows which ensures that the same number of rows are exposed as in the non-partitioned case. However, partitions are not synchronized which may lead to different rows in the final output, and therefore skew results.
CREATE INPUT WINDOW Trades SCHEMA (TradeId long, Brand string, Volume integer)
PRIMARY KEY (TradeId);

CREATE OUTPUT WINDOW Last2Trades SCHEMA (Brand string, AvgVolume integer)
PRIMARY KEY DEDUCED
KEEP 2 ROWS
PARTITION
    BY HASH (Brand)
    PARTITIONS 3
AS
SELECT T.Brand, avg(T.Volume) as AvgVolume FROM Trades as T GROUP BY (T.Brand);