Guidelines for Partitioning Aggregation

General guidelines, tips, and examples of partitioning elements using aggregation.

When using HASH partitioning over any key aside from the GROUP BY key, the elements with the same value for the GROUP BY clause may not be located in the same partition and this may break the aggregation semantically.

The example below uses events that make up a part identifier for industrial parts. This identifier contains a part number and a group ID for the group to which a part belongs. For example, AA0001 is a part that belongs to the group AA (for example, engine) and has the part number 0001.

For example, to count all parts of a certain group, specify GROUP BY ( left(T.PartId,2) ). To ensure that the aggregation result is correct, you would have to send all the elements with the same group ID (as indicated by the two leftmost character of the PartId) to the same partition. However, this is not possible because you would have to specify PARTITION BY HASH ( left(PartId,2)).
CREATE INPUT WINDOW Trades SCHEMA (TradeId long, PartId string, Volume integer)
PRIMARY KEY ( TradeId );

CREATE OUTPUT WINDOW TradeVolumePerGroup SCHEMA (PartGroup string, Volume integer)
PRIMARY KEY DEDUCED
PARTITION
     BY HASH ( PartId )
     PARTITIONS 2
AS
SELECT left(T.PartId,2) as PartGroup, sum(T.Volume) as Volume FROM Trades as T GROUP BY ( left(T.PartId,2) );
You can work around this limitation by introducing a new column through an intermediate stream which represents the GROUP BY KEY explicitly and can, therefore, be used as a HASH key.
CREATE INPUT WINDOW Trades SCHEMA (TradeId long, PartId string, Volume integer)
PRIMARY KEY ( TradeId );

CREATE OUTPUT WINDOW Trades1 SCHEMA (TradeId long, PartId string, PartGroup string, Volume integer)
PRIMARY KEY ( TradeId )
AS
SELECT T.TradeId as TradeId, T.PartId as PartId, left(T.PartId,2) as PartGroup, T.Volume as Volume FROM Trades as T;

CREATE OUTPUT WINDOW TradeVolumePerGroup SCHEMA (PartGroup string, Volume integer)
PRIMARY KEY DEDUCED
PARTITION
     BY HASH ( PartGroup )
     PARTITIONS 2
AS
SELECT T.PartGroup as PartGroup, sum(T.Volume) as Volume FROM Trades1 as T GROUP BY ( T.PartGroup );