Sample syntax you can use to build your custom internal adapter implementation. This implementation incorporates extern "C" methods that enable schema discovery in a custom adapter.
/* * CustomAdapterInterface.cpp * * Author: sample */ #include "GenericAdapterInterface.h" #include <vector> #include <sstream> #include <iostream> #include <string> using namespace std; struct InputAdapter { InputAdapter(); void* connectionCallBackReference; void* schemaInformation; void* parameters; void* rowBuf; void* errorObjIdentifier; int64_t _badRows; int64_t _goodRows; int64_t _totalRows; int getColumnCount(); void setState(int st); bool discoverTables(); bool discover(string tableName); vector<string> _discoveredTableNames; vector<string> _discoveredFieldNames; vector<string> _discoveredFieldTypes; vector<vector<string> > _discoveredRows; vector<string> _row1; vector<string> _row2; bool _discoveryMode; int64_t _rowCount; }; InputAdapter::InputAdapter() { rowBuf = NULL; _badRows = 0; _goodRows = 0; _totalRows = 0; _discoveryMode = false; _discoveredTableNames.clear(); _discoveredFieldNames.clear(); _discoveredFieldTypes.clear(); _discoveredRows.clear(); _row1.clear(); _row2.clear(); _rowCount = 0; } int InputAdapter::getColumnCount() { return ::getColumnCount(schemaInformation); } void InputAdapter::setState(int st) { ::setAdapterState(connectionCallBackReference, st); } extern "C" DLLEXPORT void* createAdapter() { return new InputAdapter(); } extern "C" DLLEXPORT void setCallBackReference(void *adapter,void *connectionCallBackReference) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; inputAdapterObject->connectionCallBackReference = connectionCallBackReference; } extern "C" DLLEXPORT void setConnectionRowType(void *adapter,void *connectionRowType) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; inputAdapterObject->schemaInformation = connectionRowType; } extern "C" DLLEXPORT void setConnectionParams(void* adapter,void* connectionParams) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; inputAdapterObject->parameters = connectionParams; } extern "C" DLLEXPORT void* getNext(void *adapter) { StreamRow streamRow = NULL; InputAdapter *inputAdapterObject = (InputAdapter*)adapter; int n = inputAdapterObject->getColumnCount(); std::stringstream ss; if(inputAdapterObject->_totalRows <inputAdapterObject->_rowCount){ for (int column = 0; column < n; column++) { ss.str(""); ss << inputAdapterObject->_totalRows; std::string tempString; tempString = ss.str(); std::string row = "ROW"; row.append(tempString); ss.str(""); ss << column; tempString = ss.str(); std::string columnString = "COLUMN"; columnString.append(tempString); row.append(columnString); ::setFieldAsStringWithIndex(inputAdapterObject->rowBuf, column, row.c_str()); } inputAdapterObject->_totalRows++; streamRow = ::toRow(inputAdapterObject->rowBuf, inputAdapterObject->_totalRows, inputAdapterObject->errorObjIdentifier); if( streamRow ) { inputAdapterObject->_goodRows++; } else { inputAdapterObject->_badRows++; } } else { inputAdapterObject->setState(RS_DONE); } return streamRow; } extern "C" DLLEXPORT bool reset(void *adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; inputAdapterObject->_rowCount = ::getConnectionParamInt64_t(inputAdapterObject->parameters,"RowCount"); if(inputAdapterObject->rowBuf) deleteConnectionRow(inputAdapterObject->rowBuf); string type = "RowByOrder"; inputAdapterObject->rowBuf = ::createConnectionRow(type.c_str()); ::setStreamType(inputAdapterObject->rowBuf, inputAdapterObject->schemaInformation, false); inputAdapterObject->errorObjIdentifier =::createConnectionErrors(); inputAdapterObject->setState(RS_CONTINUOUS); return true; } extern "C" DLLEXPORT int64_t getTotalRowsProcessed(void *adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; return inputAdapterObject->_totalRows; } extern "C" DLLEXPORT int64_t getNumberOfBadRows(void *adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; return inputAdapterObject->_badRows; } extern "C" DLLEXPORT int64_t getNumberOfGoodRows(void *adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; return inputAdapterObject->_goodRows; } extern "C" DLLEXPORT bool hasError(void *adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; return !(::empty(inputAdapterObject->errorObjIdentifier)); } extern "C" DLLEXPORT void getError(void *adapter, char** errorString) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; ::getAdapterError(inputAdapterObject->errorObjIdentifier, errorString); } extern "C" DLLEXPORT void start(void* adapter){} extern "C" DLLEXPORT void stop(void* adapter){} extern "C" DLLEXPORT void cleanup(void* adapter){} extern "C" DLLEXPORT bool canDiscover(void* adapter){return true;} extern "C" DLLEXPORT void deleteAdapter(void* adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; delete inputAdapterObject; } extern "C" DLLEXPORT void commitTransaction(void *adapter){} extern "C" DLLEXPORT int getTableNames(void* adapter, char*** tables) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; if(!inputAdapterObject->discoverTables()) { return 0; } (*tables) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredTableNames.size()); for(int index=0; index < inputAdapterObject->_discoveredTableNames.size(); index++) { size_t tableNameSize = inputAdapterObject->_discoveredTableNames[index].size() + 1 ; char* tableName = new char [tableNameSize ]; strncpy(tableName, inputAdapterObject->_discoveredTableNames[index].c_str(),tableNameSize); (*tables)[index] = tableName; } return inputAdapterObject->_discoveredTableNames.size(); } extern "C" DLLEXPORT int getFieldNames(void* adapter, char*** names, const char* tableName) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; string table (tableName); if(!inputAdapterObject->discover(table)) { return 0; } (*names) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredFieldNames.size()); for(int index=0; index < inputAdapterObject->_discoveredFieldNames.size(); index++) { size_t fieldNameSize = inputAdapterObject->_discoveredFieldNames[index].size() + 1; char* fieldName = new char [ fieldNameSize ]; strncpy(fieldName, inputAdapterObject->_discoveredFieldNames[index].c_str(),fieldNameSize); (*names)[index] = fieldName; } return inputAdapterObject->_discoveredFieldNames.size(); } extern "C" DLLEXPORT int getFieldTypes(void* adapter, char*** types, const char* tableName) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; string table (tableName); if(!inputAdapterObject->discover(table)) { return 0; } (*types) = (char**) malloc(sizeof(char*)*inputAdapterObject->_discoveredFieldTypes.size()); for(int index=0; index < inputAdapterObject->_discoveredFieldTypes.size(); index++) { size_t fieldTypeSize = inputAdapterObject->_discoveredFieldTypes[index].size() + 1; char* fieldType = new char [ fieldTypeSize ]; strncpy(fieldType, inputAdapterObject->_discoveredFieldTypes[index].c_str(), fieldTypeSize); (*types)[index] = fieldType; } return inputAdapterObject->_discoveredFieldTypes.size(); } extern "C" DLLEXPORT int getSampleRow(void* adapter, char*** row, const char* tableName, int pos) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; string table (tableName); if(!inputAdapterObject->discover(table)) { return 0; } vector<string> vals; if (pos < (int)inputAdapterObject->_discoveredRows.size()) { vals = inputAdapterObject->_discoveredRows[pos]; (*row) = (char**) malloc(sizeof(char*)*vals.size()); for(int index=0; index < vals.size(); index++) { size_t columnSize = vals[index].size() + 1; char* column = new char [ columnSize ]; strncpy(column, vals[index].c_str(),columnSize); (*row)[index] = column; } } return vals.size(); } extern "C" DLLEXPORT void setDiscovery(void* adapter) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; inputAdapterObject->_discoveryMode = true; } bool InputAdapter::discoverTables() { _discoveredTableNames.push_back("Table1"); _discoveredTableNames.push_back("Table2"); _discoveredTableNames.push_back("Table3"); _discoveredTableNames.push_back("Table4"); _discoveredTableNames.push_back("Table5"); return true; } bool InputAdapter::discover(string tableName) { _discoveredFieldNames.clear(); _discoveredFieldTypes.clear(); _row1.clear(); _row2.clear(); _discoveredRows.clear(); _discoveredFieldNames.push_back("Column1"); _discoveredFieldNames.push_back("Column2"); _discoveredFieldNames.push_back("Column3"); _discoveredFieldNames.push_back("Column4"); _discoveredFieldNames.push_back("Column5"); _discoveredFieldTypes.push_back("integer"); _discoveredFieldTypes.push_back("string"); _discoveredFieldTypes.push_back("string"); _discoveredFieldTypes.push_back("float"); _discoveredFieldTypes.push_back("float"); _row1.push_back("1"); _row1.push_back("A"); _row1.push_back("B"); _row1.push_back("1.1"); _row1.push_back("2.2"); _row2.push_back("2"); _row2.push_back("X"); _row2.push_back("Y"); _row2.push_back("3.3"); _row2.push_back("4.4"); _discoveredRows.push_back(_row1); _discoveredRows.push_back(_row2); return true; } extern "C" DLLEXPORT void getStatistics(void* adapter, AdapterStatistics* adapterStatistics) { InputAdapter *inputAdapterObject = (InputAdapter*)adapter; const char* key; ostringstream value; value.str(""); key = "Total number of rows"; value << inputAdapterObject->_totalRows; addAdapterStatistics(adapterStatistics, key, value.str().c_str()); value.str(""); key = "Total number of good rows"; value << inputAdapterObject->_goodRows; addAdapterStatistics(adapterStatistics, key, value.str().c_str()); value.str(""); key = "Total number of bad rows"; value << inputAdapterObject->_badRows; addAdapterStatistics(adapterStatistics, key, value.str().c_str()); } extern "C" DLLEXPORT int64_t getLatency(void* adapter) { return 100; }