Example: Using a List Store to Preserve Order of Arrival

Use a vector and dictionary data structures to simulate the Aleri list index option as this option is not supported in CCL. .

The CCL example below shows a list store that uses a vector to store the records that need to be accessed in the order of arrival. It also maintains a list index by using a dictionary to quickly update and delete rows in the vector. Similar to a list index in Aleri, deletes are inefficient and must be kept to a minimum because they leave holes in the list.

//ListStream is the stream that you need to store as a list so that you can access the rows in the order of arrival. 

CREATE INPUT WINDOW ListStream SCHEMA ( ListKey string , ListUserKey integer , ListVal integer ) PRIMARY KEY ( ListKey );

//ListUser is the stream that accesses the list in the order of arrival.

CREATE INPUT WINDOW ListUser SCHEMA ( ListUserKey integer , ListUserVal integer ) PRIMARY KEY (ListUserKey);

//This flex operator serves two purposes. The first purpose is to maintain the data in the ListStream as a list so that data can be accessed in the order of arrival. The second purpose is to access the maintained list whenever a record arrives in the ListUser stream. 

CREATE FLEX UserOutput IN ListStream , ListUser OUT OUTPUT WINDOW UserOutput SCHEMA ( ListUserKey integer , ListUserVal integer ,
First3Total integer ) PRIMARY KEY ( ListUserKey )
BEGIN
	DECLARE
		dictionary( string , integer ) listIdx ;
		vector(typeof(ListStream)) list ;
	END;
	ON ListStream {
          integer opCode := getOpcode(ListStream);
		  typeof(ListStream) nullRecord;
	  	if(opCode = insert) {

//On an insert, put the record into the list and update the index.
            push_back(list, ListStream);
			listIdx[ListStream.ListKey] := size(list) - 1;
		} else if(opCode = update) {

//On an update, replace the existing record with the new one.
            integer idx := listIdx[ListStream.ListKey];
			list[idx] := ListStream;
		} else { //delete

//On a delete, remove the entry. Keep deletes to a minimum because this leaves a hole in the list.
        integer idx := listIdx[ListStream.ListKey];
			remove(listIdx, ListStream.ListKey);
			list[idx] := nullRecord;
		}
	};
	ON ListUser {
//For every incoming record cycle through the list, get the sum of values for the first three occurrence of the ListUserKey.  
        integer listSize := size(list);
		integer listCounter := 0;
		
		integer counter := 0;
		integer total := 0;
		typeof(ListStream) rec;

//You cannot use a for loop to iterate over the vector because the for loop stops whenever there is a null record, and there are nulls in the vector whenever rows get deleted.
        while(listCounter < listSize) {
			rec := list[listCounter];
			
			if(rec.ListUserKey = ListUser.ListUserKey){
				total := total + rec.ListVal;
				counter++;
				if(counter = 3)
					break;
			}
			listCounter++;
		}
		output setOpcode([ListUserKey=ListUser.ListUserKey;| UserVal=ListUser.ListUserVal; First3Total=total], upsert);
	};
END;