Sample Custom Internal Adapter Implementation

Sample syntax you can use to build your custom internal adapter implementation. This implementation incorporates extern "C" methods that enable schema discovery in a custom adapter.

/*
 * CustomAdapterInterface.cpp
 *
 *      Author: sample
 */

#include "GenericAdapterInterface.h"
#include <vector>
#include <sstream>
#include <iostream>
#include <string>

using namespace std;

struct InputAdapter
{
	InputAdapter();
	void* connectionCallBackReference;
	void* schemaInformation;
	void* parameters;
	void* rowBuf;
	void* errorObjIdentifier;
	int64_t _badRows;
	int64_t _goodRows;
	int64_t _totalRows;
	int getColumnCount();
	void setState(int st);
	bool discoverTables();
	bool discover(string tableName);
	vector<string> _discoveredTableNames;
	vector<string> _discoveredFieldNames;
	vector<string> _discoveredFieldTypes;
	vector<vector<string> > _discoveredRows;
	vector<string> _row1;
	vector<string> _row2;
	bool _discoveryMode;
	int64_t _rowCount;
};

InputAdapter::InputAdapter()
{
	rowBuf = NULL;
	_badRows = 0;
	_goodRows = 0;
	_totalRows = 0;
	_discoveryMode = false;
	_discoveredTableNames.clear();
	_discoveredFieldNames.clear();
	_discoveredFieldTypes.clear();
	_discoveredRows.clear();
	_row1.clear();
	_row2.clear();
	_rowCount = 0;
}

int InputAdapter::getColumnCount()
{
	return ::getColumnCount(schemaInformation);
}

void InputAdapter::setState(int st)
{
	::setAdapterState(connectionCallBackReference, st);
}

extern "C" DLLEXPORT
void* createAdapter()
{
	return new InputAdapter();
}

extern "C" DLLEXPORT
void setCallBackReference(void *adapter,void *connectionCallBackReference)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	inputAdapterObject->connectionCallBackReference = connectionCallBackReference;
}

extern "C" DLLEXPORT
void setConnectionRowType(void *adapter,void *connectionRowType)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	inputAdapterObject->schemaInformation = connectionRowType;
}

extern "C" DLLEXPORT
void  setConnectionParams(void* adapter,void* connectionParams)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	inputAdapterObject->parameters = connectionParams;
}

extern "C" DLLEXPORT
void* getNext(void *adapter)
{
	StreamRow streamRow = NULL;
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	int n = inputAdapterObject->getColumnCount();
	std::stringstream ss;
	if(inputAdapterObject->_totalRows <inputAdapterObject->_rowCount){

		for (int column = 0; column < n; column++) {
			ss.str("");
			ss << inputAdapterObject->_totalRows;
			std::string tempString;
			tempString = ss.str();
			std::string row = "ROW";
			row.append(tempString);
			ss.str("");
			ss << column;
			tempString = ss.str();
			std::string columnString = "COLUMN";
			columnString.append(tempString);
			row.append(columnString);
			::setFieldAsStringWithIndex(inputAdapterObject->rowBuf, column, row.c_str());

		}

		inputAdapterObject->_totalRows++;
		streamRow = ::toRow(inputAdapterObject->rowBuf, inputAdapterObject->_totalRows, inputAdapterObject->errorObjIdentifier);
		if( streamRow )
		{
			inputAdapterObject->_goodRows++;
		} else
		{
			inputAdapterObject->_badRows++;
		}


	} else {
		inputAdapterObject->setState(RS_DONE);
	}
	return streamRow;

}

extern "C" DLLEXPORT
bool reset(void *adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	inputAdapterObject->_rowCount = ::getConnectionParamInt64_t(inputAdapterObject->parameters,"RowCount");
	if(inputAdapterObject->rowBuf)
		deleteConnectionRow(inputAdapterObject->rowBuf);
	string type = "RowByOrder";
	inputAdapterObject->rowBuf = ::createConnectionRow(type.c_str());
	::setStreamType(inputAdapterObject->rowBuf, inputAdapterObject->schemaInformation, false);
	inputAdapterObject->errorObjIdentifier =::createConnectionErrors();
	inputAdapterObject->setState(RS_CONTINUOUS);
	return true;
}

extern "C" DLLEXPORT
int64_t getTotalRowsProcessed(void *adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	return inputAdapterObject->_totalRows;
}

extern "C" DLLEXPORT
int64_t getNumberOfBadRows(void *adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	return inputAdapterObject->_badRows;
}

extern "C" DLLEXPORT
int64_t getNumberOfGoodRows(void *adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	return inputAdapterObject->_goodRows;
}

extern "C" DLLEXPORT
bool hasError(void *adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	return !(::empty(inputAdapterObject->errorObjIdentifier));
}

extern "C" DLLEXPORT
void getError(void *adapter, char** errorString)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	::getAdapterError(inputAdapterObject->errorObjIdentifier, errorString);
}

extern "C" DLLEXPORT
void  start(void* adapter){}

extern "C" DLLEXPORT
void  stop(void* adapter){}

extern "C" DLLEXPORT
void  cleanup(void* adapter){}

extern "C" DLLEXPORT
bool  canDiscover(void* adapter){return true;}

extern "C" DLLEXPORT
void deleteAdapter(void* adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	delete inputAdapterObject;
}

extern "C" DLLEXPORT
void commitTransaction(void *adapter){}



extern "C" DLLEXPORT
int getTableNames(void* adapter, char*** tables)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;

	if(!inputAdapterObject->discoverTables())
	{
		return 0;
	}

	(*tables) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredTableNames.size());

	for(int index=0; index < inputAdapterObject->_discoveredTableNames.size(); index++)
	{
		size_t tableNameSize = inputAdapterObject->_discoveredTableNames[index].size() + 1 ;
		char* tableName = new char [tableNameSize ];
		strncpy(tableName, inputAdapterObject->_discoveredTableNames[index].c_str(),tableNameSize);
		(*tables)[index] = tableName;
	}

	return inputAdapterObject->_discoveredTableNames.size();
}

extern "C" DLLEXPORT
int getFieldNames(void* adapter, char*** names, const char* tableName)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;

	string table (tableName);

	if(!inputAdapterObject->discover(table))
	{
	    return 0;
	}

	(*names) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredFieldNames.size());

	for(int index=0; index < inputAdapterObject->_discoveredFieldNames.size(); index++)
	{
		size_t fieldNameSize = inputAdapterObject->_discoveredFieldNames[index].size() + 1;
		char* fieldName = new char [ fieldNameSize ];
		strncpy(fieldName, inputAdapterObject->_discoveredFieldNames[index].c_str(),fieldNameSize);
		(*names)[index] = fieldName;
	}

	return inputAdapterObject->_discoveredFieldNames.size();

}

extern "C" DLLEXPORT
int getFieldTypes(void* adapter, char*** types, const char* tableName)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;

	string table (tableName);

	if(!inputAdapterObject->discover(table))
	{
	    return 0;
	}

	(*types) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredFieldTypes.size());


	for(int index=0; index < inputAdapterObject->_discoveredFieldTypes.size(); index++)
	{
		size_t fieldTypeSize = inputAdapterObject->_discoveredFieldTypes[index].size() + 1;
		char* fieldType = new char [ fieldTypeSize ];
		strncpy(fieldType, inputAdapterObject->_discoveredFieldTypes[index].c_str(), fieldTypeSize);
		(*types)[index] = fieldType;
	}

	return inputAdapterObject->_discoveredFieldTypes.size();
}

extern "C" DLLEXPORT
int getSampleRow(void* adapter, char*** row, const char* tableName, int pos)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;

	string table (tableName);

	if(!inputAdapterObject->discover(table))
	{
	    return 0;
	}

	vector<string> vals;

	if (pos < (int)inputAdapterObject->_discoveredRows.size())
	{
	    vals = inputAdapterObject->_discoveredRows[pos];

		(*row) = (char**) malloc(sizeof(char*)*vals.size());

		for(int index=0; index < vals.size(); index++)
		{
			size_t columnSize = vals[index].size() + 1;
			char* column = new char [ columnSize ];
			strncpy(column, vals[index].c_str(),columnSize);
			(*row)[index] = column;
		}
	}

	return vals.size();

}

extern "C" DLLEXPORT
void setDiscovery(void* adapter)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	inputAdapterObject->_discoveryMode = true;
}

bool InputAdapter::discoverTables()
{
	_discoveredTableNames.push_back("Table1");
	_discoveredTableNames.push_back("Table2");
	_discoveredTableNames.push_back("Table3");
	_discoveredTableNames.push_back("Table4");
	_discoveredTableNames.push_back("Table5");
	return true;
}

bool InputAdapter::discover(string tableName)
{
	_discoveredFieldNames.clear();
	_discoveredFieldTypes.clear();
	_row1.clear();
	_row2.clear();
	_discoveredRows.clear();
	_discoveredFieldNames.push_back("Column1");
	_discoveredFieldNames.push_back("Column2");
	_discoveredFieldNames.push_back("Column3");
	_discoveredFieldNames.push_back("Column4");
	_discoveredFieldNames.push_back("Column5");
	_discoveredFieldTypes.push_back("integer");
	_discoveredFieldTypes.push_back("string");
	_discoveredFieldTypes.push_back("string");
	_discoveredFieldTypes.push_back("float");
	_discoveredFieldTypes.push_back("float");
	_row1.push_back("1");
	_row1.push_back("A");
	_row1.push_back("B");
	_row1.push_back("1.1");
	_row1.push_back("2.2");
	_row2.push_back("2");
	_row2.push_back("X");
	_row2.push_back("Y");
	_row2.push_back("3.3");
	_row2.push_back("4.4");
	_discoveredRows.push_back(_row1);
	_discoveredRows.push_back(_row2);
	return true;
}

extern "C" DLLEXPORT
void getStatistics(void* adapter, AdapterStatistics* adapterStatistics)
{
	InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
	const char* key;
	ostringstream value;
	value.str("");
	key = "Total number of rows";
	value << inputAdapterObject->_totalRows;
	addAdapterStatistics(adapterStatistics, key, value.str().c_str());
	value.str("");
	key = "Total number of good rows";
	value << inputAdapterObject->_goodRows;
	addAdapterStatistics(adapterStatistics, key, value.str().c_str());
	value.str("");
	key = "Total number of bad rows";
	value << inputAdapterObject->_badRows;
	addAdapterStatistics(adapterStatistics, key, value.str().c_str());
}

extern "C" DLLEXPORT
int64_t getLatency(void* adapter)
{
	return 100;
}