Example: Using a List Store to Preserve Order of Arrival

Use vector and dictionary data structures to simulate the Aleri list index option as this option is not supported in CCL.

The CCL example below shows a list store that uses a vector to store the records that need to be accessed in the order of arrival. It also maintains a list index by using a dictionary to quickly update and delete rows in the vector. Similar to a list index in Aleri, deletes are inefficient and must be kept to a minimum because they leave holes in the list.

//ListStream is the stream that you need to store as a list so that you can access the rows in the order of arrival. 

CREATE INPUT WINDOW ListStream SCHEMA ( ListKey string , ListUserKey integer , ListVal integer ) PRIMARY KEY ( ListKey );

//ListUser is the stream that accesses the list in the order of arrival.

CREATE INPUT WINDOW ListUser SCHEMA ( ListUserKey integer , ListUserVal integer ) PRIMARY KEY (ListUserKey);

//This flex operator serves two purposes. The first purpose is to maintain the data in the ListStream as a list so that data can be accessed in the order of arrival. The second purpose is to access the maintained list whenever a record arrives in the ListUser stream. 

CREATE FLEX UserOutput IN ListStream , ListUser OUT OUTPUT WINDOW UserOutput SCHEMA ( ListUserKey integer , ListUserVal integer ,
First3Total integer ) PRIMARY KEY ( ListUserKey )
BEGIN
	DECLARE
		dictionary( string , integer ) listIdx ;
		vector(typeof(ListStream)) list ;
	END;
	ON ListStream {
          integer opCode := getOpcode(ListStream);
		  typeof(ListStream) nullRecord;
	  	if(opCode = insert) {

//On an insert, put the record into the list and update the index.
            push_back(list, ListStream);
			listIdx[ListStream.ListKey] := size(list) - 1;
		} else if(opCode = update) {

//On an update, replace the existing record with the new one.
            integer idx := listIdx[ListStream.ListKey];
			list[idx] := ListStream;
		} else { //delete

//On a delete, remove the entry. Keep deletes to a minimum because this leaves a hole in the list.
        integer idx := listIdx[ListStream.ListKey];
			remove(listIdx, ListStream.ListKey);
			list[idx] := nullRecord;
		}
	};
	ON ListUser {
//For every incoming record cycle through the list, get the sum of values for the first three occurrence of the ListUserKey.  
        integer listSize := size(list);
		integer listCounter := 0;
		
		integer counter := 0;
		integer total := 0;
		typeof(ListStream) rec;

//You cannot use a for loop to iterate over the vector because the for loop stops whenever there is a null record, and there are nulls in the vector whenever rows get deleted.
        while(listCounter < listSize) {
			rec := list[listCounter];
			
			if(rec.ListUserKey = ListUser.ListUserKey){
				total := total + rec.ListVal;
				counter++;
				if(counter = 3)
					break;
			}
			listCounter++;
		}
		output setOpcode([ListUserKey=ListUser.ListUserKey;| UserVal=ListUser.ListUserVal; First3Total=total], upsert);
	};
END;