Guidelines for Partitioning SPLASH

General guidelines, tips, and examples of partitioning elements using SPASH logic.

Local Variables

Each partition has its own locally declared variables. Therefore, a counter of all input records only counts records that arrive at a certain partition. If you wish to use a locally declared counter, it is recommended that you use it in line with the given partition key and that you verify the results of the partitioned element.

Globally Declared Variables

Globally declared variables can also be tricky. Multiple partitions in parallel can change a globally declared variable in parallel and race-conditions cannot be avoided. For example, if a window has a user-defined method which increases a global counter variable by 1 at the arrival of each record, once this window is partitioned, each parallel instance independently changes the counter at the arrival of a record. Since every instance runs in a separate thread, there is no way to determine the order in which instances update the counter. Therefore, two partitions trying to increment a globally declared counter can create inconsistencies so that increments get overwritten.

Event Caches, Dictionaries, Vectors, and Methods

When using complex data structures like event caches, dictionaries, and vectors, be aware that partitioning may change the semantics of your original usage and lead to unexpected results.

For example, if an event cache is not defined with a key, then its original intent is to keep all records for the given time period or the limit of the number of records. However, with partitioning, records in the original event cache are distributed to several parallel event caches. Therefore, if it was required to calculate a sum() over the original event cache, partitioning would generate sum() values for each parallel event cache and as a result, break the original semantics.

Both event caches and partitioning have a concept of key. For event caches, a key defines how records in an event cache are distributed across buckets. For partitioning, a key defines how incoming records are distributed over several parallel instances. Whenever the keys for the event cache and partitioning do not match and you partition an element (stream or window) with an event cache, this can change its original semantics of the partitioned element.

Even if the event cache key matches with the partitioning key, it may still generate unexpected results with partitioning when the event cache applies a row-based retention policy and the coalesce flag is not set (see Example 1). Without setting the coalesce flat, an event cache treats records for INSERT, UPDATE, and DELETE with the same key as distinct records. When the event cache is partitioned, the same KEEP policy is used in all parallel instances. As a result, there will be n*k (where n is the number of partitions and k is the number of rows kept by the event cache) records kept by all the parallel event caches, as opposed to k records in the case of no partitioning.

Example 1:
CREATE input stream priceW schema (ts timestamp, isin string, price float);
CREATE output stream maxPriceW schema (ts timestamp, isin string, price float) 
declare
        eventCache(priceW[isin], 10 events) cache;
end
PARTITION 
    by priceW HASH(isin)
PARTITIONS 2
as
SELECT priceW.ts, priceW.isin isin, avg(cache.price) price FROM priceW;

Additionally, user-defined methods or flex elements can perform more complicated operations on an event cache. For example, you can use the keyCache() function to access any other cache bucket independent of your current key. Partition these elements carefully since this can also alter the intended semantics. In the example below, the Symbol and Symbol2 attributes of QTrades are in the same domain. When the flex TradesFlex element is hash partitioned by its input, QTrades, according to Symbol, it is possible that the bucket for Symbol2 of a record to reside in a different parallel instance and therefore, the keyCache() statement would not be able to retrieve that bucket in the current partition.

Example 2:
CREATE FLEX TradesFlex 
    IN QTrades 
OUT OUTPUT WINDOW Top3Trades SCHEMA TradesSchema PRIMARY KEY(Symbol,Price)
PARTITION
	BY QTrades HASH (Symbol)
PARTITIONS 3 
    BEGIN 
        DECLARE 
            eventCache(QTrades[Symbol], manual, Price asc) tradesCache; 
     
        ON QTrades { 
            keyCache( tradesCache, [Symbol=QTrades.Symbol2;|] ); 
            typeof(QTrades) rec := insertIntoCache( QTrades );   
            if(rec.Id) { 
                
                if(rec.Id <> QTrades.Id) { 
                    output setOpcode(rec, delete); 
                } 
                output setOpcode(QTrades, upsert); 
            } 
        }; 
END; 

Similar situation can happen for other complex data structure like dictionaries and vectors. In general, if you are writing any SPLASH logic for streams or windows that are partitioned, be aware that the logic functions only on a subset of the data and therefore, the partitioned results may be different from non partitioned streams or windows.