Sample syntax you can use to build your custom internal adapter implementation. This implementation incorporates extern "C" methods that enable schema discovery in a custom adapter.
/*
* CustomAdapterInterface.cpp
*
* Author: sample
*/
#include "GenericAdapterInterface.h"
#include <vector>
#include <sstream>
#include <iostream>
#include <string>
using namespace std;
struct InputAdapter
{
InputAdapter();
void* connectionCallBackReference;
void* schemaInformation;
void* parameters;
void* rowBuf;
void* errorObjIdentifier;
int64_t _badRows;
int64_t _goodRows;
int64_t _totalRows;
int getColumnCount();
void setState(int st);
bool discoverTables();
bool discover(string tableName);
vector<string> _discoveredTableNames;
vector<string> _discoveredFieldNames;
vector<string> _discoveredFieldTypes;
vector<vector<string> > _discoveredRows;
vector<string> _row1;
vector<string> _row2;
bool _discoveryMode;
int64_t _rowCount;
};
InputAdapter::InputAdapter()
{
rowBuf = NULL;
_badRows = 0;
_goodRows = 0;
_totalRows = 0;
_discoveryMode = false;
_discoveredTableNames.clear();
_discoveredFieldNames.clear();
_discoveredFieldTypes.clear();
_discoveredRows.clear();
_row1.clear();
_row2.clear();
_rowCount = 0;
}
int InputAdapter::getColumnCount()
{
return ::getColumnCount(schemaInformation);
}
void InputAdapter::setState(int st)
{
::setAdapterState(connectionCallBackReference, st);
}
extern "C" DLLEXPORT
void* createAdapter()
{
return new InputAdapter();
}
extern "C" DLLEXPORT
void setCallBackReference(void *adapter,void *connectionCallBackReference)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
inputAdapterObject->connectionCallBackReference = connectionCallBackReference;
}
extern "C" DLLEXPORT
void setConnectionRowType(void *adapter,void *connectionRowType)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
inputAdapterObject->schemaInformation = connectionRowType;
}
extern "C" DLLEXPORT
void setConnectionParams(void* adapter,void* connectionParams)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
inputAdapterObject->parameters = connectionParams;
}
extern "C" DLLEXPORT
void* getNext(void *adapter)
{
StreamRow streamRow = NULL;
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
int n = inputAdapterObject->getColumnCount();
std::stringstream ss;
if(inputAdapterObject->_totalRows <inputAdapterObject->_rowCount){
for (int column = 0; column < n; column++) {
ss.str("");
ss << inputAdapterObject->_totalRows;
std::string tempString;
tempString = ss.str();
std::string row = "ROW";
row.append(tempString);
ss.str("");
ss << column;
tempString = ss.str();
std::string columnString = "COLUMN";
columnString.append(tempString);
row.append(columnString);
::setFieldAsStringWithIndex(inputAdapterObject->rowBuf, column, row.c_str());
}
inputAdapterObject->_totalRows++;
streamRow = ::toRow(inputAdapterObject->rowBuf, inputAdapterObject->_totalRows, inputAdapterObject->errorObjIdentifier);
if( streamRow )
{
inputAdapterObject->_goodRows++;
} else
{
inputAdapterObject->_badRows++;
}
} else {
inputAdapterObject->setState(RS_DONE);
}
return streamRow;
}
extern "C" DLLEXPORT
bool reset(void *adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
inputAdapterObject->_rowCount = ::getConnectionParamInt64_t(inputAdapterObject->parameters,"RowCount");
if(inputAdapterObject->rowBuf)
deleteConnectionRow(inputAdapterObject->rowBuf);
string type = "RowByOrder";
inputAdapterObject->rowBuf = ::createConnectionRow(type.c_str());
::setStreamType(inputAdapterObject->rowBuf, inputAdapterObject->schemaInformation, false);
inputAdapterObject->errorObjIdentifier =::createConnectionErrors();
inputAdapterObject->setState(RS_CONTINUOUS);
return true;
}
extern "C" DLLEXPORT
int64_t getTotalRowsProcessed(void *adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
return inputAdapterObject->_totalRows;
}
extern "C" DLLEXPORT
int64_t getNumberOfBadRows(void *adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
return inputAdapterObject->_badRows;
}
extern "C" DLLEXPORT
int64_t getNumberOfGoodRows(void *adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
return inputAdapterObject->_goodRows;
}
extern "C" DLLEXPORT
bool hasError(void *adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
return !(::empty(inputAdapterObject->errorObjIdentifier));
}
extern "C" DLLEXPORT
void getError(void *adapter, char** errorString)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
::getAdapterError(inputAdapterObject->errorObjIdentifier, errorString);
}
extern "C" DLLEXPORT
void start(void* adapter){}
extern "C" DLLEXPORT
void stop(void* adapter){}
extern "C" DLLEXPORT
void cleanup(void* adapter){}
extern "C" DLLEXPORT
bool canDiscover(void* adapter){return true;}
extern "C" DLLEXPORT
void deleteAdapter(void* adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
delete inputAdapterObject;
}
extern "C" DLLEXPORT
void commitTransaction(void *adapter){}
extern "C" DLLEXPORT
int getTableNames(void* adapter, char*** tables)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
if(!inputAdapterObject->discoverTables())
{
return 0;
}
(*tables) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredTableNames.size());
for(int index=0; index < inputAdapterObject->_discoveredTableNames.size(); index++)
{
size_t tableNameSize = inputAdapterObject->_discoveredTableNames[index].size() + 1 ;
char* tableName = new char [tableNameSize ];
strncpy(tableName, inputAdapterObject->_discoveredTableNames[index].c_str(),tableNameSize);
(*tables)[index] = tableName;
}
return inputAdapterObject->_discoveredTableNames.size();
}
extern "C" DLLEXPORT
int getFieldNames(void* adapter, char*** names, const char* tableName)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
string table (tableName);
if(!inputAdapterObject->discover(table))
{
return 0;
}
(*names) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredFieldNames.size());
for(int index=0; index < inputAdapterObject->_discoveredFieldNames.size(); index++)
{
size_t fieldNameSize = inputAdapterObject->_discoveredFieldNames[index].size() + 1;
char* fieldName = new char [ fieldNameSize ];
strncpy(fieldName, inputAdapterObject->_discoveredFieldNames[index].c_str(),fieldNameSize);
(*names)[index] = fieldName;
}
return inputAdapterObject->_discoveredFieldNames.size();
}
extern "C" DLLEXPORT
int getFieldTypes(void* adapter, char*** types, const char* tableName)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
string table (tableName);
if(!inputAdapterObject->discover(table))
{
return 0;
}
(*types) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredFieldTypes.size());
for(int index=0; index < inputAdapterObject->_discoveredFieldTypes.size(); index++)
{
size_t fieldTypeSize = inputAdapterObject->_discoveredFieldTypes[index].size() + 1;
char* fieldType = new char [ fieldTypeSize ];
strncpy(fieldType, inputAdapterObject->_discoveredFieldTypes[index].c_str(), fieldTypeSize);
(*types)[index] = fieldType;
}
return inputAdapterObject->_discoveredFieldTypes.size();
}
extern "C" DLLEXPORT
int getSampleRow(void* adapter, char*** row, const char* tableName, int pos)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
string table (tableName);
if(!inputAdapterObject->discover(table))
{
return 0;
}
vector<string> vals;
if (pos < (int)inputAdapterObject->_discoveredRows.size())
{
vals = inputAdapterObject->_discoveredRows[pos];
(*row) = (char**) malloc(sizeof(char*)*vals.size());
for(int index=0; index < vals.size(); index++)
{
size_t columnSize = vals[index].size() + 1;
char* column = new char [ columnSize ];
strncpy(column, vals[index].c_str(),columnSize);
(*row)[index] = column;
}
}
return vals.size();
}
extern "C" DLLEXPORT
void setDiscovery(void* adapter)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
inputAdapterObject->_discoveryMode = true;
}
bool InputAdapter::discoverTables()
{
_discoveredTableNames.push_back("Table1");
_discoveredTableNames.push_back("Table2");
_discoveredTableNames.push_back("Table3");
_discoveredTableNames.push_back("Table4");
_discoveredTableNames.push_back("Table5");
return true;
}
bool InputAdapter::discover(string tableName)
{
_discoveredFieldNames.clear();
_discoveredFieldTypes.clear();
_row1.clear();
_row2.clear();
_discoveredRows.clear();
_discoveredFieldNames.push_back("Column1");
_discoveredFieldNames.push_back("Column2");
_discoveredFieldNames.push_back("Column3");
_discoveredFieldNames.push_back("Column4");
_discoveredFieldNames.push_back("Column5");
_discoveredFieldTypes.push_back("integer");
_discoveredFieldTypes.push_back("string");
_discoveredFieldTypes.push_back("string");
_discoveredFieldTypes.push_back("float");
_discoveredFieldTypes.push_back("float");
_row1.push_back("1");
_row1.push_back("A");
_row1.push_back("B");
_row1.push_back("1.1");
_row1.push_back("2.2");
_row2.push_back("2");
_row2.push_back("X");
_row2.push_back("Y");
_row2.push_back("3.3");
_row2.push_back("4.4");
_discoveredRows.push_back(_row1);
_discoveredRows.push_back(_row2);
return true;
}
extern "C" DLLEXPORT
void getStatistics(void* adapter, AdapterStatistics* adapterStatistics)
{
InputAdapter *inputAdapterObject = (InputAdapter*)adapter;
const char* key;
ostringstream value;
value.str("");
key = "Total number of rows";
value << inputAdapterObject->_totalRows;
addAdapterStatistics(adapterStatistics, key, value.str().c_str());
value.str("");
key = "Total number of good rows";
value << inputAdapterObject->_goodRows;
addAdapterStatistics(adapterStatistics, key, value.str().c_str());
value.str("");
key = "Total number of bad rows";
value << inputAdapterObject->_badRows;
addAdapterStatistics(adapterStatistics, key, value.str().c_str());
}
extern "C" DLLEXPORT
int64_t getLatency(void* adapter)
{
return 100;
}