The aggregate UDF my_interpolate example is an OLAP-style aggregate UDF that attempts to fill in NULL values within a sequence by performing linear interpolation across any set of adjacent NULL values to the nearest non-NULL value in each direction.
To operate at a sensible cost, my_interpolate must run using a fixed-width, row-based window, but the user can set the width of the window based on the maximum number of adjacent NULL values expected. If the input at a given row is not NULL, the result for that row is the same as the input value. This function takes a set of double-precision floating-point values and produces a resulting set of doubles.
#include "extfnapiv4.h" #include <stdlib.h> #include <assert.h> // MY_INTERPOLATE // // OLAP-style aggregate UDF that accepts a double precision // floating point argument. If the current argument value is // not NULL, then the result value is the same as the // argument value. On the other hand, if the current row's // argument value is NULL, then the result, where possible, // will be the arithmetic interpolation across the nearest // preceding and nearest following values that are not NULL. // In all cases the result is also a double precision value. // // The start function creates a structure for maintaining the // argument values within the window including their NULLness. // The finish function then deallocates this structure. // // Since there are some strict aggregate usage restrictions // for this aggregate (must be used with a row-based window // frame that includes the current row), the corresponding // SQL declaration will look like: // // CREATE AGGREGATE FUNCTION my_interpolate(IN arg1 DOUBLE) // RETURNS DOUBLE // OVER REQUIRED // WINDOW FRAME REQUIRED // RANGE NOT ALLOWED // PRECEDING REQUIRED // UNBOUNDED PRECEDING NOT ALLOWED // FOLLOWING REQUIRED // UNBOUNDED FOLLOWING NOT ALLOWED // EXTERNAL NAME 'my_interpolate@libudfex' typedef struct my_window { int _allocated_elem; int _first_used; int _next_insert_loc; int *_is_null; double *_dbl_val; int _num_rows_in_frame; } my_window; #if defined __cplusplus extern "C" { #endif static void my_interpolate_reset(a_v3_extfn_aggregate_context *cntxt) { assert(cntxt->_user_data); my_window *cptr = (my_window *)cntxt->_user_data; cptr->_first_used = 0; cptr->_next_insert_loc = 0; cptr->_num_rows_in_frame = 0; for (int i=0; i<cptr->_allocated_elem; i++) { cptr->_is_null[i] = 1; } } static void my_interpolate_start(a_v3_extfn_aggregate_context *cntxt) { my_window *cptr = (my_window *)cntxt->_user_data; // Make sure function was defined correctly if (!cntxt->_is_window_used) { cntxt->set_error(cntxt, 20001, "Function requires window"); return; } if (cntxt->_window_has_unbounded_preceding || cntxt->_window_has_unbounded_following) { cntxt->set_error(cntxt, 20002, "Window cannot be unbounded"); return; } if (cntxt->_window_is_range_based) { cntxt->set_error(cntxt, 20003, "Window must be row based"); return; } if (!cptr) { // cptr = (my_window *)malloc(sizeof(my_window)); if (cptr) { cptr->_is_null = 0; cptr->_dbl_val = 0; cptr->_num_rows_in_frame = 0; cptr->_allocated_elem = ( int )cntxt->_max_rows_in_frame; cptr->_is_null = (int *)malloc(cptr->_allocated_elem * sizeof(int)); cptr->_dbl_val = (double *)malloc(cptr->_allocated_elem * sizeof(double)); cntxt->_user_data = cptr; } } if (!cptr || !cptr->_is_null || !cptr->_dbl_val) { // Terminate this query cntxt->set_error(cntxt, 20000, "Unable to allocate memory"); return; } my_interpolate_reset(cntxt); } static void my_interpolate_finish(a_v3_extfn_aggregate_context *cntxt) { if (cntxt->_user_data) { my_window *cptr = (my_window *)cntxt->_user_data; if (cptr->_is_null) { free(cptr->_is_null); cptr->_is_null = 0; } if (cptr->_dbl_val) { free(cptr->_dbl_val); cptr->_dbl_val = 0; } free(cntxt->_user_data); cntxt->_user_data = 0; } } static void my_interpolate_next_value(a_v3_extfn_aggregate_context *cntxt, void *arg_handle) { an_extfn_value arg; double arg1; my_window *cptr = (my_window *)cntxt->_user_data; // Get the one argument, and stash its value // within the rotating window arrays // int curr_cell_num = cptr->_next_insert_loc % cptr->_allocated_elem; if (cntxt->get_value( arg_handle, 1, &arg ) && arg.data != NULL ) { arg1 = *((double *)arg.data); cptr->_dbl_val[curr_cell_num] = arg1; cptr->_is_null[curr_cell_num] = 0; } else { cptr->_is_null[curr_cell_num] = 1; } // Then increment the insertion location and number of rows in frame cptr->_next_insert_loc = ((cptr->_next_insert_loc + 1) % cptr->_allocated_elem); cptr->_num_rows_in_frame++; } static void my_interpolate_drop_value(a_v3_extfn_aggregate_context *cntxt, void * /*arg_handle*/) { my_window *cptr = (my_window *)cntxt->_user_data; // Drop one value from the window by incrementing past it and // decrement the number of rows in the frame cptr->_first_used = ((cptr->_first_used + 1) % cptr->_allocated_elem); cptr->_num_rows_in_frame--; } static void my_interpolate_evaluate(a_v3_extfn_aggregate_context *cntxt, void *arg_handle) { an_extfn_value outval; my_window *cptr = (my_window *)cntxt->_user_data; double result; int result_is_null = 1; double preceding_value; int preceding_value_is_null = 1; double preceding_distance = 0; double following_value; int following_value_is_null = 1; double following_distance = 0; int j; // Determine which cell is the current cell int curr_cell_num = ((int)(cntxt->_result_row_from_start_of_partition-1))%cptr->_allocated_elem; int tmp_cell_num; int result_row_offset_from_start_of_frame = cptr->_first_used <= curr_cell_num ? ( curr_cell_num - cptr->_first_used ) : ( curr_cell_num + cptr->_allocated_elem - cptr->_first_used ); // Compute the result value if (cptr->_is_null[curr_cell_num] == 0) { // // If the current rows input value is not NULL, then there is // no need to interpolate, just use that input value. // result = cptr->_dbl_val[curr_cell_num]; result_is_null = 0; // } else { // // If the current rows input value is NULL, then we do // need to interpolate to find the correct result value. // First, find the nearest following non-NULL argument // value after the current row. // int rows_following = cptr->_num_rows_in_frame - result_row_offset_from_start_of_frame - 1; for (j=0; j<rows_following; j++) { tmp_cell_num = ((curr_cell_num + j + 1) % cptr->_allocated_elem); if (cptr->_is_null[tmp_cell_num] == 0) { following_value = cptr->_dbl_val[tmp_cell_num]; following_value_is_null = 0; following_distance = j + 1; break; } } // Second, find the nearest preceding non-NULL // argument value before the current row. // int rows_before = result_row_offset_from_start_of_frame; for (j=0; j<rows_before; j++) { tmp_cell_num = ((curr_cell_num + cptr->_allocated_elem - j - 1) % cptr->_allocated_elem); if (cptr->_is_null[tmp_cell_num] == 0) { preceding_value = cptr->_dbl_val[tmp_cell_num]; preceding_value_is_null = 0; preceding_distance = j + 1; break; } } // Finally, see what we can come up with for a result value // if (preceding_value_is_null && !following_value_is_null) { // // No choice but to mirror the nearest following non-NULL value // Example: // // Inputs: NULL Result of my_interpolate: 40.0 // NULL 40.0 // 40.0 40.0 // result = following_value; result_is_null = 0; // } else if (!preceding_value_is_null && following_value_is_null) { // // No choice but to mirror the nearest preceding non-NULL value // Example: // // Inputs: 10.0 Result of my_interpolate: 10.0 // NULL 10.0 // result = preceding_value; result_is_null = 0; // } else if (!preceding_value_is_null && !following_value_is_null) { // // Here we get to do real interpolation based on the // nearest preceding non-NULL value, the nearest following // non-NULL value, and the relative distances to each. // Examples: // // Inputs: 10.0 Result of my_interpolate: 10.0 // NULL 20.0 // NULL 30.0 // 40.0 40.0 // // Inputs: 10.0 Result of my_interpolate: 10.0 // NULL 25.0 // 40.0 40.0 // result = ( preceding_value + ( (following_value - preceding_value) * ( preceding_distance / (preceding_distance + following_distance)))); result_is_null = 0; } } // And last, pass the result value out outval.type = DT_DOUBLE; outval.piece_len = sizeof(double); if (result_is_null) { outval.data = 0; } else { outval.data = &result; } cntxt->set_value( arg_handle, &outval, 0 ); } static a_v3_extfn_aggregate my_interpolate_descriptor = { &my_interpolate_start, &my_interpolate_finish, &my_interpolate_reset, &my_interpolate_next_value, //( timeseries_expression ) &my_interpolate_evaluate, &my_interpolate_drop_value, NULL, // cume_eval, NULL, // next_subaggregate_extfn NULL, // drop_subaggregate_extfn NULL, // evaluate_superaggregate_extfn NULL, // reserved1_must_be_null NULL, // reserved2_must_be_null NULL, // reserved3_must_be_null NULL, // reserved4_must_be_null NULL, // reserved5_must_be_null 0, // indicators 0, // context size 0, // context alignment 0.0, //external_bytes_per_group ( double )sizeof( double ), // external bytes per row 0, // reserved6_must_be_null 0, // reserved7_must_be_null 0, // reserved8_must_be_null 0, // reserved9_must_be_null 0, // reserved10_must_be_null NULL // _for_server_internal_use }; a_v3_extfn_aggregate *my_interpolate() { return &my_interpolate_descriptor; } #if defined __cplusplus } #endif