Aggregation

Aggregation collects input records based on the values in the columns specified with the GROUP BY clause, applies the specified aggregation function such as min, max, sum, count and so forth, and produces one row of output per group.

Records in a group have the same values for the columns specified in the GROUP BY clause. The columns specified in the GROUP BY clause also needs to be included in the SELECT clause because these columns form the key for the target. This is the reason why the primary key for the aggregate window must use the PRIMARY KEY DEDUCED clause instead of explicitly specifying a primary key.

In addition to the GROUP BY clause, a GROUP FILTER and GROUP ORDER BY clause can be specified. The GROUP ORDER BY clause orders the records in a group by the specified columns before applying the GROUP FILTER clause and the aggregation functions. With the records ordered, aggregation functions sensitive to the order of the records such as first, last, and nth can be used meaningfully.

The GROUP FILTER clause is executed after the GROUP ORDER BY clause and eliminates any rows in the group that do not meet the filter condition. The filter condition that is specified is similar to the one in the WHERE clause. The only exception being that a special rank function can be specified. The rank function is used in conjunction with the GROUP ORDER BY clause. After the GROUP ORDER BY clause is executed every row in the group is ranked from 1 to N. Now in the GROUP FILTER clause one can say rank() < 11, which means that the aggregation function is only applied to the first 10 rows in the group after it has been ordered by the columns specified in the GROUP ORDER BY clause.

Finally an optional HAVING clause can also be specified. The HAVING clause filters records based on the results of applying aggregation functions on the records in a given group. The primary difference is that a HAVING clause aggregation operation is allowed and a WHERE clause aggregation operation is not.

Note: The GROUP ORDER BY, GROUP FILTER, and HAVING clauses can only be specified in conjunction with a GROUP BY clause.

When using aggregation, you must consider the memory usage implications. All of the input records for which an aggregate function is to be calculated have to be stored in memory. The data structure that holds all the records in memory is called the aggregation index.

If a stream feeds input to an aggregation window directly, the memory usage of the aggregation index increases without bound. To prevent such unbounded growth, insert an intermediate window between the stream and the aggregation window. In this intermediate window, use a GROUP BY clause to set one or more of the stream columns as the primary key, then set a retention policy to prevent runaway memory usage. Note that although this intermediate window is an aggregation window, it does not perform any aggregation functions, so its index does not grow indefinitely.

The intermediate aggregation window acts as the stream and feeds input into the aggregation window directly. The aggregation window performs its aggregation functions using the input retained from the intermediate aggregation window.

Example

The following example computes the total number of trades, maximum trade price, and total shares traded for every Symbol. The target window only has those Symbols where the total traded volume is greater than 5000.

CREATE INPUT STREAM Trades
SCHEMA (TradeId integer, Symbol string, Price float, Shares integer);

CREATE WINDOW TradeRetention
PRIMARY KEY DEDUCED
KEEP 1 DAY
AS
SELECT trd.TradeID, trd.Symbol, trd.Price, trd.Shares
FROM Trades trd
GROUP BY trd.TradeId;

CREATE OUTPUT WINDOW TradeSummary
PRIMARY KEY DEDUCED
KEEP 1 DAY
AS
SELECT tr.Symbol, count (tr.TradeId) NoOfTrades, max (tr.Price)
MaxPrice, sum(tr.Shares) TotalShares
FROM TradeRetention tr
GROUP BY tr.Symbol
HAVING sum(tr.Shares) > 5000;