Guidelines for Partitioning Flex Operators

General guidelines, tips, and examples of partitioning elements using flex operators.

Local Variables

Each partition has its own locally declared variables. Therefore, a counter of all input records only counts records that arrive at a certain partition. If you wish to use a locally declared counter, it is recommended that you use it in line with the given partition key and that you verify the results of the partitioned element.

Globally Declared Variables

Globally declared variables can also be tricky. Multiple partitions in parallel can change a globally declared variable in parallel and race-conditions cannot be avoided. For example, if a window has a user-defined method which increases a global counter variable by 1 at the arrival of each record, once this window is partitioned, each parallel instance independently changes the counter at the arrival of a record. Since every instance runs in a separate thread, there is no way to determine the order in which instances update the counter. Therefore, two partitions trying to increment a globally declared counter can create inconsistencies so that increments get overwritten.

Multiple Inputs

You can partition flex operators which have multiple inputs. In such cases, if you do not specify a partitioning function, one of the inputs is automatically broadcast to the partitioned flex instances.

Transactions

Partitioning flex operators has major implications for transactions. Transactions are automatically broken down into smaller subtransactions. If one of these subtransactions fails, the others do not automatically fail with it. This can change the transaction semantics and lead to inconsistent results.

The diagram below shws a non-partitioned scenario where the transaction consists of four records. The records are processed as one block by the flex operator and an output  transaction is produced that consists of these four records.



In a flex operator, you can write code to start and end a transaction. In the partitioned case, these start and end codeblocks are called within each partition for each created subtransaction. If output events are created in the start and end transaction blocks, this can lead to problems such as key duplications, which can easily be created by partitioning. Since such duplications lead to rollbacks of transactions, whole subtransactions are rolled back.

In the non-partitioned case, there are no issues because only one event (start, start_trans) is created for each transaction. In the partitioned case, each partition creates one of these events which results in duplicate keys and one of the subtransactions is rolled back.

If you wish to partition a flex operator, do not write records in the transaction start and end blocks.

CREATE FLEX w3 
	IN w2_out
	OUT OUTPUT WINDOW w3_out
	SCHEMA s1
	Primary Key (c1) KEEP 10 ROW
//	PARTITION BY w2_out HASH(c1) PARTITIONS 2
BEGIN
    ON w2_out
    {   	
    	output setOpcode([c1= w2_out;|  c2 = 'event';], upsert);
    };
    on start transaction {  
       output setOpcode([c1=start;|  c2 = 'start_trans';], upsert);
		};		
END;

Pulsed Output

You can use pulsed output (EVERY X SECONDS) in flex operators to send events every X seconds instead of sending events with every new input record. The example of the partitioned case below shows how doing this can lead to problems. Each of the partitions sends events after 1 second. The partitions needs to ensure that the events produced every second do not collide with each other and create duplicate key records.

CREATE FLEX w3 
	IN w2_out
	OUT OUTPUT WINDOW w3_out
	SCHEMA s1
	Primary Key (c1) KEEP 10 ROW
//	PARTITION BY w2_out HASH(c1) PARTITIONS 2
BEGIN
    ON w2_out
    {   	
    	//collect records
    };
	EVERY 1 SECONDS{
		//i.e. output averages per group
	}
END;
Here is an example scenario that uses partitioning:

Create Schema TradeSchema (
   Ts INTEGER, 
   Symbol STRING); 

Create Schema TradeTotal (
   Symbol STRING,
   Counter INTEGER); 

CREATE  INPUT  WINDOW TradeWindow
	SCHEMA TradeSchema
	PRIMARY KEY (Ts) keep 5 minutes;

   
CREATE FLEX FlexStateManager IN TradeWindow
	 OUT OUTPUT WINDOW FlexOutput
	SCHEMA TradeTotal
	Primary Key (Symbol) KEEP 10 ROW
	PARTITION 
		BY TradeWindow HASH(Symbol)
	PARTITIONS 3
BEGIN
    declare
 	integer mycounter:=0;
 	dictionary(string, integer) counterMap;
 	typeof(TradeWindow) my;
    end;   
    ON TradeWindow
    {   	
    	if(isnull(counterMap[TradeWindow.Symbol])){
      		counterMap[TradeWindow.Symbol]:=1;
      	}
      	else{
      		print('counter.. ',to_string(counterMap[TradeWindow.Symbol]));
      		counterMap[TradeWindow.Symbol]:=counterMap[TradeWindow.Symbol]+1;
      	}		
    };
    EVERY 1 SECONDS{
    	for (k in counterMap) { 
      		output setOpcode([ Symbol=k;| Counter=counterMap[k]], upsert); 
  		} 
    };
END;

If the input events are inserts, for example, (0, SAP), (1, SAP), (1, SAP), (3, XY), (4, XY), (wait for 1 second), (5, XY), then the produced output would be (SAP, 3), (XY, 2), and after one second (SAP, 3) and (XY, 3). The dictionary in each partition stores local data that has been counted. Keep in mind that each partition is independent from each other.

Iterators

Normally, an iterator iterates over a complete input window. In the case of partitioning, the iterator is only able to iterate over the subset of events that are sent to a specific partition. Therefore, to create meaningful output in a partitioned case, ensure the use of iterators is in line with the defined partitioning key.