The aggregate UDF my_interpolate example is an OLAP-style aggregate UDF that attempts to fill in NULL values within a sequence by performing linear interpolation across any set of adjacent NULL values to the nearest non-NULL value in each direction.
To operate at a sensible cost, my_interpolate must run using a fixed-width, row-based window, but the user can set the width of the window based on the maximum number of adjacent NULL values expected. If the input at a given row is not NULL, the result for that row is the same as the input value. This function takes a set of double-precision floating-point values and produces a resulting set of doubles.
#include "extfnapiv4.h"
#include <stdlib.h>
#include <assert.h>
// MY_INTERPOLATE
//
// OLAP-style aggregate UDF that accepts a double precision
// floating point argument. If the current argument value is
// not NULL, then the result value is the same as the
// argument value. On the other hand, if the current row's
// argument value is NULL, then the result, where possible,
// will be the arithmetic interpolation across the nearest
// preceding and nearest following values that are not NULL.
// In all cases the result is also a double precision value.
//
// The start function creates a structure for maintaining the
// argument values within the window including their NULLness.
// The finish function then deallocates this structure.
//
// Since there are some strict aggregate usage restrictions
// for this aggregate (must be used with a row-based window
// frame that includes the current row), the corresponding
// SQL declaration will look like:
//
// CREATE AGGREGATE FUNCTION my_interpolate(IN arg1 DOUBLE)
// RETURNS DOUBLE
// OVER REQUIRED
// WINDOW FRAME REQUIRED
// RANGE NOT ALLOWED
// PRECEDING REQUIRED
// UNBOUNDED PRECEDING NOT ALLOWED
// FOLLOWING REQUIRED
// UNBOUNDED FOLLOWING NOT ALLOWED
// EXTERNAL NAME 'my_interpolate@libudfex'
typedef struct my_window {
int _allocated_elem;
int _first_used;
int _next_insert_loc;
int *_is_null;
double *_dbl_val;
int _num_rows_in_frame;
} my_window;
#if defined __cplusplus
extern "C" {
#endif
static void my_interpolate_reset(a_v3_extfn_aggregate_context *cntxt)
{
assert(cntxt->_user_data);
my_window *cptr = (my_window *)cntxt->_user_data;
cptr->_first_used = 0;
cptr->_next_insert_loc = 0;
cptr->_num_rows_in_frame = 0;
for (int i=0; i<cptr->_allocated_elem; i++) {
cptr->_is_null[i] = 1;
}
}
static void my_interpolate_start(a_v3_extfn_aggregate_context *cntxt)
{
my_window *cptr = (my_window *)cntxt->_user_data;
// Make sure function was defined correctly
if (!cntxt->_is_window_used)
{
cntxt->set_error(cntxt, 20001, "Function requires window");
return;
}
if (cntxt->_window_has_unbounded_preceding ||
cntxt->_window_has_unbounded_following)
{
cntxt->set_error(cntxt, 20002, "Window cannot be unbounded");
return;
}
if (cntxt->_window_is_range_based)
{
cntxt->set_error(cntxt, 20003, "Window must be row based");
return;
}
if (!cptr) {
//
cptr = (my_window *)malloc(sizeof(my_window));
if (cptr) {
cptr->_is_null = 0;
cptr->_dbl_val = 0;
cptr->_num_rows_in_frame = 0;
cptr->_allocated_elem = ( int )cntxt->_max_rows_in_frame;
cptr->_is_null = (int *)malloc(cptr->_allocated_elem
* sizeof(int));
cptr->_dbl_val = (double *)malloc(cptr->_allocated_elem
* sizeof(double));
cntxt->_user_data = cptr;
}
}
if (!cptr || !cptr->_is_null || !cptr->_dbl_val) {
// Terminate this query
cntxt->set_error(cntxt, 20000, "Unable to allocate memory");
return;
}
my_interpolate_reset(cntxt);
}
static void my_interpolate_finish(a_v3_extfn_aggregate_context *cntxt)
{
if (cntxt->_user_data) {
my_window *cptr = (my_window *)cntxt->_user_data;
if (cptr->_is_null) {
free(cptr->_is_null);
cptr->_is_null = 0;
}
if (cptr->_dbl_val) {
free(cptr->_dbl_val);
cptr->_dbl_val = 0;
}
free(cntxt->_user_data);
cntxt->_user_data = 0;
}
}
static void my_interpolate_next_value(a_v3_extfn_aggregate_context *cntxt,
void *arg_handle)
{
an_extfn_value arg;
double arg1;
my_window *cptr = (my_window *)cntxt->_user_data;
// Get the one argument, and stash its value
// within the rotating window arrays
//
int curr_cell_num = cptr->_next_insert_loc % cptr->_allocated_elem;
if (cntxt->get_value( arg_handle, 1, &arg ) && arg.data != NULL ) {
arg1 = *((double *)arg.data);
cptr->_dbl_val[curr_cell_num] = arg1;
cptr->_is_null[curr_cell_num] = 0;
} else {
cptr->_is_null[curr_cell_num] = 1;
}
// Then increment the insertion location and number of rows in frame
cptr->_next_insert_loc = ((cptr->_next_insert_loc + 1)
% cptr->_allocated_elem);
cptr->_num_rows_in_frame++;
}
static void my_interpolate_drop_value(a_v3_extfn_aggregate_context *cntxt,
void * /*arg_handle*/)
{
my_window *cptr = (my_window *)cntxt->_user_data;
// Drop one value from the window by incrementing past it and
// decrement the number of rows in the frame
cptr->_first_used = ((cptr->_first_used + 1) % cptr->_allocated_elem);
cptr->_num_rows_in_frame--;
}
static void my_interpolate_evaluate(a_v3_extfn_aggregate_context *cntxt,
void *arg_handle)
{
an_extfn_value outval;
my_window *cptr = (my_window *)cntxt->_user_data;
double result;
int result_is_null = 1;
double preceding_value;
int preceding_value_is_null = 1;
double preceding_distance = 0;
double following_value;
int following_value_is_null = 1;
double following_distance = 0;
int j;
// Determine which cell is the current cell
int curr_cell_num =
((int)(cntxt->_result_row_from_start_of_partition-1))%cptr->_allocated_elem;
int tmp_cell_num;
int result_row_offset_from_start_of_frame = cptr->_first_used <= curr_cell_num ?
( curr_cell_num - cptr->_first_used ) :
( curr_cell_num + cptr->_allocated_elem - cptr->_first_used );
// Compute the result value
if (cptr->_is_null[curr_cell_num] == 0) {
//
// If the current rows input value is not NULL, then there is
// no need to interpolate, just use that input value.
//
result = cptr->_dbl_val[curr_cell_num];
result_is_null = 0;
//
} else {
//
// If the current rows input value is NULL, then we do
// need to interpolate to find the correct result value.
// First, find the nearest following non-NULL argument
// value after the current row.
//
int rows_following = cptr->_num_rows_in_frame -
result_row_offset_from_start_of_frame - 1;
for (j=0; j<rows_following; j++) {
tmp_cell_num = ((curr_cell_num + j + 1) % cptr->_allocated_elem);
if (cptr->_is_null[tmp_cell_num] == 0) {
following_value = cptr->_dbl_val[tmp_cell_num];
following_value_is_null = 0;
following_distance = j + 1;
break;
}
}
// Second, find the nearest preceding non-NULL
// argument value before the current row.
//
int rows_before = result_row_offset_from_start_of_frame;
for (j=0; j<rows_before; j++) {
tmp_cell_num = ((curr_cell_num + cptr->_allocated_elem - j - 1)
% cptr->_allocated_elem);
if (cptr->_is_null[tmp_cell_num] == 0) {
preceding_value = cptr->_dbl_val[tmp_cell_num];
preceding_value_is_null = 0;
preceding_distance = j + 1;
break;
}
}
// Finally, see what we can come up with for a result value
//
if (preceding_value_is_null && !following_value_is_null) {
//
// No choice but to mirror the nearest following non-NULL value
// Example:
//
// Inputs: NULL Result of my_interpolate: 40.0
// NULL 40.0
// 40.0 40.0
//
result = following_value;
result_is_null = 0;
//
} else if (!preceding_value_is_null && following_value_is_null) {
//
// No choice but to mirror the nearest preceding non-NULL value
// Example:
//
// Inputs: 10.0 Result of my_interpolate: 10.0
// NULL 10.0
//
result = preceding_value;
result_is_null = 0;
//
} else if (!preceding_value_is_null && !following_value_is_null) {
//
// Here we get to do real interpolation based on the
// nearest preceding non-NULL value, the nearest following
// non-NULL value, and the relative distances to each.
// Examples:
//
// Inputs: 10.0 Result of my_interpolate: 10.0
// NULL 20.0
// NULL 30.0
// 40.0 40.0
//
// Inputs: 10.0 Result of my_interpolate: 10.0
// NULL 25.0
// 40.0 40.0
//
result = ( preceding_value
+ ( (following_value - preceding_value)
* ( preceding_distance
/ (preceding_distance + following_distance))));
result_is_null = 0;
}
}
// And last, pass the result value out
outval.type = DT_DOUBLE;
outval.piece_len = sizeof(double);
if (result_is_null) {
outval.data = 0;
} else {
outval.data = &result;
}
cntxt->set_value( arg_handle, &outval, 0 );
}
static a_v3_extfn_aggregate my_interpolate_descriptor =
{
&my_interpolate_start,
&my_interpolate_finish,
&my_interpolate_reset,
&my_interpolate_next_value, //( timeseries_expression )
&my_interpolate_evaluate,
&my_interpolate_drop_value,
NULL, // cume_eval,
NULL, // next_subaggregate_extfn
NULL, // drop_subaggregate_extfn
NULL, // evaluate_superaggregate_extfn
NULL, // reserved1_must_be_null
NULL, // reserved2_must_be_null
NULL, // reserved3_must_be_null
NULL, // reserved4_must_be_null
NULL, // reserved5_must_be_null
0, // indicators
0, // context size
0, // context alignment
0.0, //external_bytes_per_group
( double )sizeof( double ), // external bytes per row
0, // reserved6_must_be_null
0, // reserved7_must_be_null
0, // reserved8_must_be_null
0, // reserved9_must_be_null
0, // reserved10_must_be_null
NULL // _for_server_internal_use
};
a_v3_extfn_aggregate *my_interpolate()
{ return &my_interpolate_descriptor; }
#if defined __cplusplus
}
#endif