diff --git a/dbcon/joblist/jlf_execplantojoblist.cpp b/dbcon/joblist/jlf_execplantojoblist.cpp index 7027e3213..0a226c38d 100644 --- a/dbcon/joblist/jlf_execplantojoblist.cpp +++ b/dbcon/joblist/jlf_execplantojoblist.cpp @@ -1603,12 +1603,13 @@ bool optimizeIdbPatitionSimpleFilter(SimpleFilter* sf, JobStepVector& jsv, JobIn // WIP MCOL-641 put this in dataconvert -void atoi_(const string &arg, unsigned __int128 &res) +void atoi128(const string& arg, unsigned __int128& res) { res = 0; for (size_t j = 0; j < arg.size(); j++) { - res = res*10 + arg[j] - '0'; + if (LIKELY(arg[j]-'0' >= 0)) + res = res*10 + arg[j] - '0'; } } @@ -1902,7 +1903,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo) if (ct.colDataType == CalpontSystemCatalog::DECIMAL && ct.colWidth == 16) { - atoi_(constval, val128); + atoi128(constval, val128); } else { diff --git a/utils/dataconvert/dataconvert.cpp b/utils/dataconvert/dataconvert.cpp index a28a5b394..e90471b97 100644 --- a/utils/dataconvert/dataconvert.cpp +++ b/utils/dataconvert/dataconvert.cpp @@ -36,6 +36,7 @@ using namespace boost::algorithm; #include "calpontsystemcatalog.h" #include "calpontselectexecutionplan.h" #include "columnresult.h" +#include "common/branchpred.h" using namespace execplan; #include "joblisttypes.h" @@ -1207,7 +1208,9 @@ void DataConvert::toString(unsigned __int128 i, char *p) // WIP MCOL-641 // Template this // result must be calloc-ed -void atoi_(const string &arg, int128_t &res, size_t &size) +//template +//void atoi_(const string &arg, T &res) +void atoi128(const string& arg, int128_t& res) { // WIP //char buf[40]; @@ -1215,13 +1218,13 @@ void atoi_(const string &arg, int128_t &res, size_t &size) res = 0; for (size_t j = 0; j < arg.size(); j++) { - // WIP - res = res*10 + arg[j] - '0'; + // WIP Optimize this + if (LIKELY(arg[j]-'0' >= 0)) + res = res*10 + arg[j] - '0'; } //toString(res, buf); //std::cerr << "atoi_ " << buf <(data, bigint); + atoi128(data, bigint); value = bigint; } else if (colType.colWidth == 1) @@ -2651,216 +2656,6 @@ std::string DataConvert::timeToString1( long long datetimevalue ) return buf; } -#if 0 -bool DataConvert::isNullData(ColumnResult* cr, int rownum, CalpontSystemCatalog::ColType colType) -{ - switch (colType.colDataType) - { - case CalpontSystemCatalog::TINYINT: - if (cr->GetData(rownum) == joblist::TINYINTNULL) - return true; - - return false; - - case CalpontSystemCatalog::SMALLINT: - if (cr->GetData(rownum) == joblist::SMALLINTNULL) - return true; - - return false; - - case CalpontSystemCatalog::MEDINT: - case CalpontSystemCatalog::INT: - if (cr->GetData(rownum) == joblist::INTNULL) - return true; - - return false; - - case CalpontSystemCatalog::BIGINT: - if (cr->GetData(rownum) == static_cast(joblist::BIGINTNULL)) - return true; - - return false; - - case CalpontSystemCatalog::DECIMAL: - case CalpontSystemCatalog::UDECIMAL: - { - if (colType.colWidth <= CalpontSystemCatalog::FOUR_BYTE) - { - if (cr->GetData(rownum) == joblist::SMALLINTNULL) - return true; - - return false; - } - else if (colType.colWidth <= 9) - { - if (cr->GetData(rownum) == joblist::INTNULL) - return true; - else return false; - } - else if (colType.colWidth <= 18) - { - if (cr->GetData(rownum) == static_cast(joblist::BIGINTNULL)) - return true; - - return false; - } - else - { - if (cr->GetStringData(rownum) == "\376\377\377\377\377\377\377\377") - return true; - - return false; - } - } - - case CalpontSystemCatalog::FLOAT: - case CalpontSystemCatalog::UFLOAT: - - //if (cr->GetStringData(rownum) == joblist::FLOATNULL) - if (cr->GetStringData(rownum).compare("null") == 0 ) - return true; - - return false; - - case CalpontSystemCatalog::DOUBLE: - case CalpontSystemCatalog::UDOUBLE: - - //if (cr->GetStringData(rownum) == joblist::DOUBLENULL) - if (cr->GetStringData(rownum).compare("null") == 0 ) - return true; - - return false; - - case CalpontSystemCatalog::DATE: - if (cr->GetData(rownum) == joblist::DATENULL) - return true; - - return false; - - case CalpontSystemCatalog::DATETIME: - if (cr->GetData(rownum) == static_cast(joblist::DATETIMENULL)) - return true; - - return false; - - case CalpontSystemCatalog::CHAR: - { - std::string charnull; - - if ( cr->GetStringData(rownum) == "") - { - return true; - } - - if (colType.colWidth == 1) - { - if (cr->GetStringData(rownum) == "\376") - return true; - - return false; - } - else if (colType.colWidth == 2) - { - if (cr->GetStringData(rownum) == "\377\376") - return true; - - return false; - } - else if (( colType.colWidth < 5 ) && ( colType.colWidth > 2 )) - { - if (cr->GetStringData(rownum) == "\377\377\377\376") - return true; - - return false; - } - else if (( colType.colWidth < 9 ) && ( colType.colWidth > 4 )) - { - if (cr->GetStringData(rownum) == "\377\377\377\377\377\377\377\376") - return true; - - return false; - } - else - { - if (cr->GetStringData(rownum) == "\376\377\377\377\377\377\377\377") - return true; - - return false; - } - } - - case CalpontSystemCatalog::VARCHAR: - { - std::string charnull; - - if ( cr->GetStringData(rownum) == "") - { - return true; - } - - if (colType.colWidth == 1) - { - if (cr->GetStringData(rownum) == "\377\376") - return true; - - return false; - } - else if ((colType.colWidth < 4) && (colType.colWidth > 1)) - { - if (cr->GetStringData(rownum) == "\377\377\377\376") - return true; - - return false; - } - else if ((colType.colWidth < 8) && (colType.colWidth > 3)) - { - if (cr->GetStringData(rownum) == "\377\377\377\377\377\377\377\376") - return true; - - return false; - } - else - { - WriteEngine::Token nullToken; - - // bytes reversed - if (cr->GetStringData(rownum) == "\376\377\377\377\377\377\377\377") - return true; - - return false; - } - } - - case CalpontSystemCatalog::UTINYINT: - if (cr->GetData(rownum) == joblist::UTINYINTNULL) - return true; - - return false; - - case CalpontSystemCatalog::USMALLINT: - if (cr->GetData(rownum) == joblist::USMALLINTNULL) - return true; - - return false; - - case CalpontSystemCatalog::UMEDINT: - case CalpontSystemCatalog::UINT: - if (cr->GetData(rownum) == joblist::UINTNULL) - return true; - - return false; - - case CalpontSystemCatalog::UBIGINT: - if (cr->GetData(rownum) == joblist::UBIGINTNULL) - return true; - - return false; - - default: - throw QueryDataExcept("convertColumnData: unknown column data type.", dataTypeErr); - } -} -#endif int64_t DataConvert::dateToInt(const string& date) { return stringToDate(date); diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index 15c898bc3..b5b00f81b 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2016 MariaDB Corporation + Copyright (C) 2016-2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -116,6 +116,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: RowList rows = tablePtr->get_RowList(); WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypes; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::ColValueList colValuesList; @@ -139,12 +140,18 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: { Row* rowPtr = rows.at(0); ColumnList columns = rowPtr->get_ColumnList(); + unsigned int numcols = rowPtr->get_NumberOfColumns(); + cscColTypes.reserve(numcols); + // WIP + // We presume that DictCols number is low + colStructs.reserve(numcols); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) { DMLColumn* columnPtr = *column_iterator; tableColName.column = columnPtr->get_Name(); + // WIP MCOL-641 replace with getColRidsOidsTypes() CalpontSystemCatalog::ROPair roPair = systemCatalogPtr->columnRID(tableColName); CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName); @@ -152,6 +159,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); + cscColTypes.push_back(colType); WriteEngine::ColStruct colStruct; colStruct.fColDbRoot = dbroot; WriteEngine::DctnryStruct dctnryStruct; @@ -163,6 +171,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: // Token if ( isDictCol(colType) ) { + // WIP Hardcoded value colStruct.colWidth = 8; colStruct.tokenFlag = true; } @@ -194,7 +203,6 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: ++column_iterator; } - unsigned int numcols = rowPtr->get_NumberOfColumns(); std::string tmpStr(""); for (unsigned int i = 0; i < numcols; i++) @@ -210,6 +218,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: const DMLColumn* columnPtr = rowPtr->get_ColumnAt(i); tableColName.column = columnPtr->get_Name(); + // WIP MCOL-641 remove these calls CalpontSystemCatalog::OID oid = systemCatalogPtr->lookupOID(tableColName); CalpontSystemCatalog::ColType colType; @@ -303,6 +312,8 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: { try { + // WIP What if we combine this and previous loop and fail + // after get nextAIValue ? nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); } @@ -359,6 +370,8 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: try { + // WIP + // make convertColumnData a template datavalue = DataConvert::convertColumnData(colType, indata, pushWarning, insertPkg.get_TimeZone(), isNULL, false, false); } catch (exception&) @@ -412,6 +425,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: // call the write engine to write the rows int error = NO_ERROR; + // WIP fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3); cout << "inserting a row with transaction id " << txnid.id << endl; fWEWrapper.setIsInsert(true); @@ -420,6 +434,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: //For hdfs use only uint32_t tblOid = tableRoPair.objnum; + // WIP are we saving HDFS? if (idbdatafile::IDBPolicy::useHdfs()) { @@ -523,7 +538,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: if (colValuesList[0].size() > 0) { if (NO_ERROR != - (error = fWEWrapper.insertColumnRec_Single(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum))) + (error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypes, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum))) { if (error == ERR_BRM_DEAD_LOCK) { diff --git a/writeengine/shared/we_convertor.cpp b/writeengine/shared/we_convertor.cpp index 536972637..af6723f7d 100644 --- a/writeengine/shared/we_convertor.cpp +++ b/writeengine/shared/we_convertor.cpp @@ -639,7 +639,8 @@ void Convertor::convertColType(ColStruct* curStruct) default: // WIP replace with BINARY - *internalType = WriteEngine::WR_INT128; + //*internalType = WriteEngine::WR_INT128; + *internalType = WriteEngine::WR_BINARY; break; } diff --git a/writeengine/shared/we_type.h b/writeengine/shared/we_type.h index fcbed9161..c2b20a1af 100644 --- a/writeengine/shared/we_type.h +++ b/writeengine/shared/we_type.h @@ -111,9 +111,9 @@ enum ColType /** @brief Column type enumeration*/ WR_TEXT = 17, /** @brief TEXT */ WR_MEDINT = 18, /** @brief Medium Int */ WR_UMEDINT = 19, /** @brief Unsigned Medium Int */ - WR_BINARY = 20, /** @brief BINARY */ -// WIP We might be good using WR_BINARY - WR_INT128 = 21 /** @brief __int128 */ + WR_BINARY = 20 /** @brief BINARY */ + // WIP + //WR_INT128 }; // Describes relation of field to column for a bulk load @@ -302,6 +302,7 @@ struct ColStruct /** @brief Column Interface Struct*/ typedef std::vector ColStructList; /** @brief column struct list */ typedef std::vector ColValueList; /** @brief column value list */ typedef std::vector RIDList; /** @brief RID list */ +typedef std::vector CSCTypesList; /** @brief CSC column types list */ typedef std::vector dictStr; typedef std::vector DictStrList; diff --git a/writeengine/wrapper/we_colop.cpp b/writeengine/wrapper/we_colop.cpp index a2b15b511..8a92b9d61 100644 --- a/writeengine/wrapper/we_colop.cpp +++ b/writeengine/wrapper/we_colop.cpp @@ -1684,12 +1684,13 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray, if (!bDelete) pVal = &((uint64_t*) valArray)[i]; break; - case WriteEngine::WR_INT128: - pVal = &((uint128_t*) valArray)[i]; - break; + // WIP + //case WriteEngine::WR_INT128: case WriteEngine::WR_BINARY: - if (!bDelete) pVal = (uint8_t*) valArray + i * curCol.colWidth; + // WIP CSCCol type + pVal = &((uint128_t*) valArray)[i]; + //pVal = (uint8_t*) valArray + i * curCol.colWidth; break; default : diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 871174d8b..7296654b6 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2016 MariaDB Corporation + Copyright (C) 2016-2019 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -281,19 +281,61 @@ void WriteEngineWrapper::convertValArray(const size_t totalRow, const ColType co ColTupleList::size_type i; if (bFromList) + { for (i = 0; i < curTupleList.size(); i++) { curTuple = curTupleList[i]; convertValue(colType, valArray, i, curTuple.data); - } // end of for (int i = 0 + } + } else + { for (i = 0; i < totalRow; i++) { convertValue(colType, valArray, i, curTuple.data, false); curTupleList.push_back(curTuple); } + } } +/*@convertValArray - Convert interface values to internal values + */ +/*********************************************************** + * DESCRIPTION: + * Convert interface values to internal values + * PARAMETERS: + * cscColType - CSC ColType struct list + * colStructList - column struct list + * colValueList - column value list + * RETURN: + * none + * valArray - output value array + * nullArray - output null flag array + ***********************************************************/ +void WriteEngineWrapper::convertValArray(const size_t totalRow, const CalpontSystemCatalog::ColType &cscColType, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList) +{ + ColTuple curTuple; + ColTupleList::size_type i; + + if (bFromList) + { + for (i = 0; i < curTupleList.size(); i++) + { + curTuple = curTupleList[i]; + convertValue(cscColType, colType, valArray, i, curTuple.data); + } + } + else + { + for (i = 0; i < totalRow; i++) + { + convertValue(cscColType, colType, valArray, i, curTuple.data, false); + curTupleList.push_back(curTuple); + } + } +} + + /* * @brief Convert column value to its internal representation */ @@ -433,13 +475,14 @@ void WriteEngineWrapper::convertValue(const ColType colType, void* value, boost: } break; - case WriteEngine::WR_INT128: + // Replace INT128 with WR_BINARY using CSC::ColType data + /*case WriteEngine::WR_INT128: { int128_t val = boost::any_cast(data); size = 16; // WIP Why do we use memcpy here? memcpy(value, &val, size); - } + }*/ case WriteEngine::WR_BINARY: { char val = boost::any_cast(data); @@ -453,16 +496,166 @@ void WriteEngineWrapper::convertValue(const ColType colType, void* value, boost: } // end of switch (colType) } /*@convertValue - The base for converting values */ -/*********************************************************** - * DESCRIPTION: - * The base for converting values - * PARAMETERS: - * colType - data type - * pos - array position - * data - value - * RETURN: - * none - ***********************************************************/ +// WIP this is ugly to have structs with the same name +void WriteEngineWrapper::convertValue(const execplan::CalpontSystemCatalog::ColType &fullColType, ColType colType, void* value, boost::any& data) +{ + string curStr; + int size; + + switch (colType) + { + case WriteEngine::WR_INT : + case WriteEngine::WR_MEDINT : + if (data.type() == typeid(int)) + { + int val = boost::any_cast(data); + size = sizeof(int); + memcpy(value, &val, size); + } + else + { + uint32_t val = boost::any_cast(data); + size = sizeof(uint32_t); + memcpy(value, &val, size); + } + + break; + + case WriteEngine::WR_UINT : + case WriteEngine::WR_UMEDINT : + { + uint32_t val = boost::any_cast(data); + size = sizeof(uint32_t); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR : + case WriteEngine::WR_BLOB : + case WriteEngine::WR_TEXT : + curStr = boost::any_cast(data); + + if ((int) curStr.length() > MAX_COLUMN_BOUNDARY) + curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY); + + memcpy(value, curStr.c_str(), curStr.length()); + break; + + case WriteEngine::WR_FLOAT: + { + float val = boost::any_cast(data); + +//N.B.There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan, +// but not necessarily the same bits that you put in. This only seems to be for float (double seems +// to work). + if (isnan(val)) + { + uint32_t ti = joblist::FLOATNULL; + float* tfp = (float*)&ti; + val = *tfp; + } + + size = sizeof(float); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_DOUBLE: + { + double val = boost::any_cast(data); + size = sizeof(double); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_SHORT: + { + short val = boost::any_cast(data); + size = sizeof(short); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_USHORT: + { + uint16_t val = boost::any_cast(data); + size = sizeof(uint16_t); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_BYTE: + { + char val = boost::any_cast(data); + size = sizeof(char); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_UBYTE: + { + uint8_t val = boost::any_cast(data); + size = sizeof(uint8_t); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_LONGLONG: + if (data.type() == typeid(long long)) + { + long long val = boost::any_cast(data); + size = sizeof(long long); + memcpy(value, &val, size); + } + else + { + uint64_t val = boost::any_cast(data); + size = sizeof(uint64_t); + memcpy(value, &val, size); + } + + break; + + case WriteEngine::WR_ULONGLONG: + { + uint64_t val = boost::any_cast(data); + size = sizeof(uint64_t); + memcpy(value, &val, size); + } + break; + + case WriteEngine::WR_TOKEN: + { + Token val = boost::any_cast(data); + size = sizeof(Token); + memcpy(value, &val, size); + } + break; + + // WIP + case WriteEngine::WR_BINARY: + { + size = fullColType.colWidth; + if (fullColType.colDataType == CalpontSystemCatalog::DECIMAL) + { + int128_t val = boost::any_cast(data); + memcpy(value, &val, size); + } + else + { + char val = boost::any_cast(data); + memcpy(value, &val, size); + } + + } + break; + + } // end of switch (colType) +} /*@convertValue - The base for converting values */ + +// WIP +// Legacy version void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList) { string curStr; @@ -554,14 +747,208 @@ void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, con break; case WriteEngine::WR_BINARY: + { + curStr = boost::any_cast(data); + // String length or column width? + memcpy((char*)valArray + pos * curStr.length(), + curStr.c_str(), curStr.length()); + break; + } + } // end of switch (colType) + } + else + { + switch (colType) + { + case WriteEngine::WR_INT : + case WriteEngine::WR_MEDINT : + data = ((int*)valArray)[pos]; + break; + + case WriteEngine::WR_UINT : + case WriteEngine::WR_UMEDINT : + data = ((uint64_t*)valArray)[pos]; + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR : + case WriteEngine::WR_BLOB : + case WriteEngine::WR_TEXT : + char tmp[10]; + memcpy(tmp, (char*)valArray + pos * 8, 8); + curStr = tmp; + data = curStr; + break; + +// case WriteEngine::WR_LONG : ((long*)valArray)[pos] = boost::any_cast(curTuple.data); +// break; + case WriteEngine::WR_FLOAT: + data = ((float*)valArray)[pos]; + break; + + case WriteEngine::WR_DOUBLE: + data = ((double*)valArray)[pos]; + break; + + case WriteEngine::WR_SHORT: + data = ((short*)valArray)[pos]; + break; + + case WriteEngine::WR_USHORT: + data = ((uint16_t*)valArray)[pos]; + break; + +// case WriteEngine::WR_BIT: data = ((bool*)valArray)[pos]; +// break; + case WriteEngine::WR_BYTE: + data = ((char*)valArray)[pos]; + break; + + case WriteEngine::WR_UBYTE: + data = ((uint8_t*)valArray)[pos]; + break; + + case WriteEngine::WR_LONGLONG: + data = ((long long*)valArray)[pos]; + break; + + case WriteEngine::WR_ULONGLONG: + data = ((uint64_t*)valArray)[pos]; + break; + + case WriteEngine::WR_TOKEN: + data = ((Token*)valArray)[pos]; + break; + // WIP + case WriteEngine::WR_BINARY : + { + // WIP do we need tmp here? + char *tmp = (char*)alloca (sizeof(char) * 16); + memcpy(tmp, (char*)valArray + pos * 16, 16); + curStr = tmp; + data = curStr; + } + break; + } // end of switch (colType) + } // end of if +} + + +/*********************************************************** + * DESCRIPTION: + * The base for converting values + * PARAMETERS: + * colType - data type + * pos - array position + * data - value + * RETURN: + * none + ***********************************************************/ +void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType &fullColType, const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList) +{ + string curStr; + + if (fromList) + { + switch (colType) + { + case WriteEngine::WR_INT : + case WriteEngine::WR_MEDINT : + if (data.type() == typeid(long)) + ((int*)valArray)[pos] = static_cast(boost::any_cast(data)); + else if (data.type() == typeid(int)) + ((int*)valArray)[pos] = boost::any_cast(data); + else + ((int*)valArray)[pos] = boost::any_cast(data); + + break; + + case WriteEngine::WR_UINT : + case WriteEngine::WR_UMEDINT : + ((uint32_t*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR : + case WriteEngine::WR_BLOB : + case WriteEngine::WR_TEXT : curStr = boost::any_cast(data); - memcpy((char*)valArray + pos * curStr.length(), curStr.c_str(), curStr.length()); - break; - case WriteEngine::WR_INT128: - int128_t val = boost::any_cast(data); - size_t size = 16; - // WIP Why do we use memcpy here? - memcpy((uint8_t*)valArray+pos*size, &val, size); + + if ((int) curStr.length() > MAX_COLUMN_BOUNDARY) + curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY); + + memcpy((char*)valArray + pos * MAX_COLUMN_BOUNDARY, curStr.c_str(), curStr.length()); + break; + +// case WriteEngine::WR_LONG : ((long*)valArray)[pos] = boost::any_cast(curTuple.data); +// break; + case WriteEngine::WR_FLOAT: + ((float*)valArray)[pos] = boost::any_cast(data); + + if (isnan(((float*)valArray)[pos])) + { + uint32_t ti = joblist::FLOATNULL; + float* tfp = (float*)&ti; + ((float*)valArray)[pos] = *tfp; + } + + break; + + case WriteEngine::WR_DOUBLE: + ((double*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_SHORT: + ((short*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_USHORT: + ((uint16_t*)valArray)[pos] = boost::any_cast(data); + break; + +// case WriteEngine::WR_BIT: ((bool*)valArray)[pos] = boost::any_cast(data); +// break; + case WriteEngine::WR_BYTE: + ((char*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_UBYTE: + ((uint8_t*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_LONGLONG: + if (data.type() == typeid(long long)) + ((long long*)valArray)[pos] = boost::any_cast(data); + else if (data.type() == typeid(long)) + ((long long*)valArray)[pos] = (long long)boost::any_cast(data); + else + ((long long*)valArray)[pos] = boost::any_cast(data); + + break; + + case WriteEngine::WR_ULONGLONG: + ((uint64_t*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_TOKEN: + ((Token*)valArray)[pos] = boost::any_cast(data); + break; + + case WriteEngine::WR_BINARY: + if (fullColType.colDataType != CalpontSystemCatalog::DECIMAL) + { + curStr = boost::any_cast(data); + // String length or column width? + memcpy((char*)valArray + pos * curStr.length(), curStr.c_str(), curStr.length()); + } + else + { + int128_t val = boost::any_cast(data); + size_t size = fullColType.colWidth; + // WIP Why do we use memcpy here? + memcpy((uint8_t*)valArray+pos*size, &val, size); + } + break; } // end of switch (colType) } @@ -628,22 +1015,21 @@ void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, con case WriteEngine::WR_TOKEN: data = ((Token*)valArray)[pos]; break; - - case WriteEngine::WR_INT128: - { - data = ((int128_t*)valArray)[pos]; - break; - } + // WIP case WriteEngine::WR_BINARY : - { - char tmp[16]; - //TODO:FIXME how to determine size ? 16, 32,48 ? - // WIP - memcpy(tmp, (char*)valArray + pos * 16, 16); - curStr = tmp; - data = curStr; - } - break; + if (fullColType.colDataType == CalpontSystemCatalog::DECIMAL) + { + data = ((int128_t*)valArray)[pos]; + } + else + { + // WIP do we need tmp here? + char *tmp = (char*) alloca (sizeof(char) * fullColType.colWidth); + memcpy(tmp, (char*)valArray + pos * fullColType.colWidth, fullColType.colWidth); + curStr = tmp; + data = curStr; + } + break; } // end of switch (colType) } // end of if } @@ -725,10 +1111,6 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, Dctnry* dctnry = m_dctnry[op(compressionType)]; colOpNewCol->initColumn(newCol); refColOp->initColumn(refCol); - //boost::shared_ptr dctnry; - // boost::shared_ptr refColOp; - // refColOp.reset(colOpRefCol); - // dctnry.reset(dctOp); uint16_t dbRoot = 1; //not to be used int newDataWidth = dataWidth; //Convert HWM of the reference column for the new column @@ -746,6 +1128,8 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, Convertor::convertColType(dataType, newColType, isToken); + // WIP + // replace with isDictCol if (((refColDataType == CalpontSystemCatalog::VARCHAR) && (refColWidth > 7)) || ((refColDataType == CalpontSystemCatalog::CHAR) && (refColWidth > 8)) || (refColDataType == CalpontSystemCatalog::VARBINARY) || @@ -777,23 +1161,18 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, Token nullToken; memcpy(defVal.get(), &nullToken, size); } - //Tokenization is done when we create dictionary file } else + { + // WIP convertValue(newColType, defVal.get(), defaultVal.data); + } if (rc == NO_ERROR) rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid, dataWidth, defaultValStr, autoincrement); -// colOpNewCol->clearColumn(newCol); -// colOpRefCol->clearColumn(refCol); - -// free(defVal); - // flushing files is in colOp->fillColumn() -// if (rc == NO_ERROR) -// rc = flushDataFiles(); return rc; } @@ -960,7 +1339,7 @@ int WriteEngineWrapper::deleteBadRows(const TxnID& txnid, ColStructList& colStru break; case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: + //case WriteEngine::WR_INT128: // WIP use column width here // remove all C-casts from above valArray = calloc(1, 16); @@ -3209,6 +3588,7 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, } int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, + CSCTypesList& cscColTypesList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -3329,6 +3709,7 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, } bool newFile; + // WIP cout << "Datafile " << curCol.dataFile.fSegFileName << endl; #ifdef PROFILE timer.start("allocRowId"); @@ -3429,7 +3810,7 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, // Expand initial abbreviated extent if any RID in 1st extent is > 256K. // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. //-------------------------------------------------------------------------- -// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? + // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? if ((colStructList[colId].fColPartition == 0) && (colStructList[colId].fColSegment == 0) && ((totalRow - rowsLeft) > 0) && @@ -3756,33 +4137,13 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, //-------------------------------------------------------------------------- //Mark extents invalid //-------------------------------------------------------------------------- + // WIP + // Set min/max in dmlprocprocessor if aplicable vector lbids; vector colDataTypes; bool successFlag = true; unsigned width = 0; - //BRM::LBID_t lbid; int curFbo = 0, curBio, lastFbo = -1; - /* if (totalRow-rowsLeft > 0) - { - for (unsigned i = 0; i < colStructList.size(); i++) - { - colOp = m_colOp[op(colStructList[i].fCompressionType)]; - width = colStructList[i].colWidth; - successFlag = colOp->calculateRowId(lastRid , - BYTE_PER_BLOCK/width, width, curFbo, curBio); - if (successFlag) { - if (curFbo != lastFbo) { - RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo( - colStructList[i].dataOid, - colStructList[i].fColPartition, - colStructList[i].fColSegment, curFbo, lbid)); - lbids.push_back((BRM::LBID_t)lbid); - colDataTypes.push_back(colStructList[i].colDataType); - } - } - } - } - */ lastRid = rowIdArray[totalRow - 1]; for (unsigned i = 0; i < colStructList.size(); i++) @@ -3821,13 +4182,13 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, { if (newExtent) { - rc = writeColumnRec(txnid, colStructList, colOldValueList, + rc = writeColumnRec(txnid, cscColTypesList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } else { - rc = writeColumnRec(txnid, colStructList, colValueList, + rc = writeColumnRec(txnid, cscColTypesList, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, true); // @bug 5572 HDFS tmp file } @@ -3836,18 +4197,12 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, #ifdef PROFILE timer.stop("writeColumnRec"); #endif -// for (ColTupleList::size_type i = 0; i < totalRow; i++) -// ridList.push_back((RID) rowIdArray[i]); - -// if (rc == NO_ERROR) -// rc = flushDataFiles(NO_ERROR); //-------------------------------------------------------------------------- // Update BRM //-------------------------------------------------------------------------- if ( !newExtent ) { - //flushVMCache(); bool succFlag = false; unsigned colWidth = 0; int extState; @@ -3857,7 +4212,6 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, for (unsigned i = 0; i < totalColumns; i++) { - //colOp = m_colOp[op(colStructList[i].fCompressionType)]; //Set all columns hwm together BulkSetHWMArg aHwmEntry; RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot( @@ -3908,7 +4262,6 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP( hwmVecOldext, mergeCPDataArgs)); - //flushVMCache(); #ifdef PROFILE timer.stop("flushVMCache"); #endif @@ -3917,33 +4270,6 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, #ifdef PROFILE timer.finish(); #endif - //flush PrimProc FD cache moved to we_dmlcommandproc.cpp - /* ColsExtsInfoMap colsExtsInfoMap = aTableMetaData->getColsExtsInfoMap(); - ColsExtsInfoMap::iterator it = colsExtsInfoMap.begin(); - ColExtsInfo::iterator aIt; - std::vector files; - BRM::FileInfo aFile; - while (it != colsExtsInfoMap.end()) - { - aIt = (it->second).begin(); - aFile.oid = it->first; - //cout << "OID:" << aArg.oid; - while (aIt != (it->second).end()) - { - aFile.partitionNum = aIt->partNum; - aFile.dbRoot =aIt->dbRoot; - aFile.segmentNum = aIt->segNum; - aFile.compType = aIt->compType; - files.push_back(aFile); - //cout <<"Added to files oid:dbroot:part:seg:compType = " << aFile.oid<<":"< 0)) - cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); - TableMetaData::removeTableMetaData(tableOid); */ return rc; } @@ -4641,7 +4967,7 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, // WIP case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: + //case WriteEngine::WR_INT128: // Use column width and remove all C-casts from above valArray = calloc(totalRow, 16); break; @@ -4709,6 +5035,7 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, * Write values to a column * PARAMETERS: * tableOid - table object id + * cscColTypesList - CSC ColType list * colStructList - column struct list * colValueList - column value list * colNewStructList - the new extent struct list @@ -4719,6 +5046,611 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, * NO_ERROR if success * others if something wrong in inserting the value ***********************************************************/ +int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, + const CSCTypesList& cscColTypeList, + const ColStructList& colStructList, + ColValueList& colValueList, + RID* rowIdArray, + const ColStructList& newColStructList, + ColValueList& newColValueList, + const int32_t tableOid, + bool useTmpSuffix, + bool versioning) +{ + bool bExcp; + int rc = 0; + void* valArray; + string segFile; + Column curCol; + ColTupleList oldTupleList; + ColStructList::size_type totalColumn; + ColStructList::size_type i; + ColTupleList::size_type totalRow1, totalRow2; + + setTransId(txnid); + + totalColumn = colStructList.size(); +#ifdef PROFILE + StopWatch timer; +#endif + + if (newColValueList.size() > 0) + { + totalRow1 = colValueList[0].size(); + totalRow2 = newColValueList[0].size(); + } + else + { + totalRow1 = colValueList[0].size(); + totalRow2 = 0; + } + + TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); + + for (i = 0; i < totalColumn; i++) + { + if (totalRow2 > 0) + { + RID* secondPart = rowIdArray + totalRow1; + + //@Bug 2205 Check if all rows go to the new extent + if (totalRow1 > 0) + { + //Write the first batch + valArray = NULL; + RID* firstPart = rowIdArray; + ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; + + // set params + colOp->initColumn(curCol); + // need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, colStructList[i].colWidth, + colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, + colStructList[i].fCompressionType, colStructList[i].fColDbRoot, + colStructList[i].fColPartition, colStructList[i].fColSegment); + + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) + break; + + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + + if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], + colStructList[i].colWidth, totalRow1, firstPart, rangeList); + + if (rc != NO_ERROR) + { + if (colStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + // WIP We can allocate based on column size and not colType + // have to init the size here + valArray = calloc(totalRow1, colStructList[i].colWidth); +#if 0 + switch (colStructList[i].colType) + { + // WIP we don't need type cast here only size + case WriteEngine::WR_INT: + case WriteEngine::WR_MEDINT: + valArray = (int*) calloc(sizeof(int), totalRow1); + break; + + case WriteEngine::WR_UINT: + case WriteEngine::WR_UMEDINT: + valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1); + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY); + break; + + case WriteEngine::WR_FLOAT: + valArray = (float*) calloc(sizeof(float), totalRow1); + break; + + case WriteEngine::WR_DOUBLE: + valArray = (double*) calloc(sizeof(double), totalRow1); + break; + + case WriteEngine::WR_BYTE: + valArray = (char*) calloc(sizeof(char), totalRow1); + break; + + case WriteEngine::WR_UBYTE: + valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1); + break; + + case WriteEngine::WR_SHORT: + valArray = (short*) calloc(sizeof(short), totalRow1); + break; + + case WriteEngine::WR_USHORT: + valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1); + break; + + case WriteEngine::WR_LONGLONG: + valArray = (long long*) calloc(sizeof(long long), totalRow1); + break; + + case WriteEngine::WR_ULONGLONG: + valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1); + break; + + case WriteEngine::WR_TOKEN: + valArray = (Token*) calloc(sizeof(Token), totalRow1); + break; + + // WIP + case WriteEngine::WR_BINARY: + valArray = calloc(totalRow1, colStructList[i].colWidth); + break; + } +#endif + + // convert values to valArray + // WIP + // Is this m_opType ever set to DELETE? + if (m_opType != DELETE) + { + bExcp = false; + + try + { + // WIP We convert values twice!? + // dmlcommandproc converts strings to boost::any and this converts + // into actual type value masked by *void + // It is not clear why we need to convert to boost::any b/c we can convert from the original string here + convertValArray(totalRow1, cscColTypeList[i], colStructList[i].colType, colValueList[i], valArray); + } + catch (...) + { + bExcp = true; + } + + if (bExcp) + { + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + return ERR_PARSING; + } + +#ifdef PROFILE + iimer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + else + { +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + + colOp->clearColumn(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + if (valArray != NULL) + free(valArray); + + // check error + if (rc != NO_ERROR) + break; + } + + //Process the second batch + valArray = NULL; + + ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)]; + + // set params + colOp->initColumn(curCol); + colOp->setColParam(curCol, 0, newColStructList[i].colWidth, + newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid, + newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot, + newColStructList[i].fColPartition, newColStructList[i].fColSegment); + + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment)) + break; + + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = newColStructList[i].fColDbRoot; + aExt.partNum = newColStructList[i].fColPartition; + aExt.segNum = newColStructList[i].fColSegment; + aExt.compType = newColStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo); + } + + // Pass "false" for hdfs tmp file flag. Since we only allow 1 + // extent per segment file (with HDFS), we can assume a second + // extent is going to a new file (and won't need tmp file). + rc = colOp->openColumnFile(curCol, segFile, false, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + + if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i], + newColStructList[i].colWidth, totalRow2, secondPart, rangeList); + + if (rc != NO_ERROR) + { + if (newColStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + // have to init the size here + valArray = calloc(totalRow2, newColStructList[i].colWidth); + /*switch (newColStructList[i].colType) + { + case WriteEngine::WR_INT: + case WriteEngine::WR_MEDINT: + valArray = (int*) calloc(sizeof(int), totalRow2); + break; + + case WriteEngine::WR_UINT: + case WriteEngine::WR_UMEDINT: + valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow2); + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + valArray = (char*) calloc(sizeof(char), totalRow2 * MAX_COLUMN_BOUNDARY); + break; + + case WriteEngine::WR_FLOAT: + valArray = (float*) calloc(sizeof(float), totalRow2); + break; + + case WriteEngine::WR_DOUBLE: + valArray = (double*) calloc(sizeof(double), totalRow2); + break; + + case WriteEngine::WR_BYTE: + valArray = (char*) calloc(sizeof(char), totalRow2); + break; + + case WriteEngine::WR_UBYTE: + valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow2); + break; + + case WriteEngine::WR_SHORT: + valArray = (short*) calloc(sizeof(short), totalRow2); + break; + + case WriteEngine::WR_USHORT: + valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow2); + break; + + case WriteEngine::WR_LONGLONG: + valArray = (long long*) calloc(sizeof(long long), totalRow2); + break; + + case WriteEngine::WR_ULONGLONG: + valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow2); + break; + + case WriteEngine::WR_TOKEN: + valArray = (Token*) calloc(sizeof(Token), totalRow2); + break; + + case WriteEngine::WR_BINARY: + //case WriteEngine::WR_INT128: + // WIP + valArray = calloc(totalRow2, 16); + break; + + }*/ + + // convert values to valArray + if (m_opType != DELETE) + { + bExcp = false; + + try + { + convertValArray(totalRow2, newColStructList[i].colType, newColValueList[i], valArray); + } + catch (...) + { + bExcp = true; + } + + if (bExcp) + { + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + return ERR_PARSING; + } + +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + else + { +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow2, rowIdArray, valArray, true); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + + + colOp->clearColumn(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + if (valArray != NULL) + free(valArray); + + // check error + if (rc != NO_ERROR) + break; + } + else + { + valArray = NULL; + + ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; + + // set params + colOp->initColumn(curCol); + colOp->setColParam(curCol, 0, colStructList[i].colWidth, + colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, + colStructList[i].fCompressionType, colStructList[i].fColDbRoot, + colStructList[i].fColPartition, colStructList[i].fColSegment); + + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + + //cout << " Opened file oid " << curCol.dataFile.pFile << endl; + if (rc != NO_ERROR) + break; + + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) + break; + + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + // handling versioning + vector rangeList; + + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], + colStructList[i].colWidth, totalRow1, rowIdArray, rangeList); + + if (rc != NO_ERROR) + { + if (colStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + // have to init the size here + // shared pointers or memory in a stack + valArray = calloc(totalRow1, colStructList[i].colWidth); + // WIP + /*switch (colStructList[i].colType) + { + case WriteEngine::WR_INT: + case WriteEngine::WR_MEDINT: + valArray = (int*) calloc(sizeof(int), totalRow1); + break; + + case WriteEngine::WR_UINT: + case WriteEngine::WR_UMEDINT: + valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1); + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY); + break; + + case WriteEngine::WR_FLOAT: + valArray = (float*) calloc(sizeof(float), totalRow1); + break; + + case WriteEngine::WR_DOUBLE: + valArray = (double*) calloc(sizeof(double), totalRow1); + break; + + case WriteEngine::WR_BYTE: + valArray = (char*) calloc(sizeof(char), totalRow1); + break; + + case WriteEngine::WR_UBYTE: + valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1); + break; + + case WriteEngine::WR_SHORT: + valArray = (short*) calloc(sizeof(short), totalRow1); + break; + + case WriteEngine::WR_USHORT: + valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1); + break; + + case WriteEngine::WR_LONGLONG: + valArray = (long long*) calloc(sizeof(long long), totalRow1); + break; + + case WriteEngine::WR_ULONGLONG: + valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1); + break; + + case WriteEngine::WR_TOKEN: + valArray = (Token*) calloc(sizeof(Token), totalRow1); + break; + + case WriteEngine::WR_BINARY: + //case WriteEngine::WR_INT128: + valArray = calloc(colStructList[i].colWidth, totalRow1); + break; + }*/ + + // convert values to valArray + if (m_opType != DELETE) + { + bExcp = false; + + try + { + convertValArray(totalRow1, cscColTypeList[i], colStructList[i].colType, colValueList[i], valArray); + } + catch (...) + { + bExcp = true; + } + + if (bExcp) + { + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + return ERR_PARSING; + } + +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + else + { +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + + colOp->clearColumn(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + if (valArray != NULL) + free(valArray); + + // check error + if (rc != NO_ERROR) + break; + } + } // end of for (i = 0 + +#ifdef PROFILE + timer.finish(); +#endif + return rc; +} + int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, const ColStructList& colStructList, ColValueList& colValueList, @@ -4828,11 +5760,13 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, } } - //totalRow1 -= totalRow2; + // WIP We can allocate based on column size and not colType // have to init the size here - // nullArray = (bool*) malloc(sizeof(bool) * totalRow); + valArray = calloc(totalRow1, colStructList[i].colWidth); +#if 0 switch (colStructList[i].colType) { + // WIP we don't need type cast here only size case WriteEngine::WR_INT: case WriteEngine::WR_MEDINT: valArray = (int*) calloc(sizeof(int), totalRow1); @@ -4888,18 +5822,24 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, // WIP case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: - valArray = calloc(totalRow1, 16); + valArray = calloc(totalRow1, colStructList[i].colWidth); break; } +#endif // convert values to valArray + // WIP + // Is this m_opType ever set to DELETE? if (m_opType != DELETE) { bExcp = false; try { + // WIP We convert values twice!? + // dmlcommandproc converts strings to boost::any and this converts + // into actual type value masked by *void + // It is not clear why we need to convert to boost::any b/c we can convert from the original string here convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray); } catch (...) @@ -4916,7 +5856,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, } #ifdef PROFILE - timer.start("writeRow "); + iimer.start("writeRow "); #endif rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); #ifdef PROFILE @@ -5009,9 +5949,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, } } - //totalRow1 -= totalRow2; // have to init the size here -// nullArray = (bool*) malloc(sizeof(bool) * totalRow); switch (newColStructList[i].colType) { case WriteEngine::WR_INT: @@ -5068,7 +6006,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, break; case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: + //case WriteEngine::WR_INT128: // WIP valArray = calloc(totalRow2, 16); break; @@ -5248,7 +6186,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, break; case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: + //case WriteEngine::WR_INT128: valArray = calloc(colStructList[i].colWidth, totalRow1); break; } @@ -5314,6 +6252,8 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, return rc; } + + int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, std::vector& colValueList, @@ -5472,7 +6412,8 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, break; case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: + // WIP + //case WriteEngine::WR_INT128: ((uint64_t*)valArray)[j] = curValue; //FIXME maybe break; @@ -5621,7 +6562,8 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, break; case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: + // WIP + //case WriteEngine::WR_INT128: ((uint64_t*)valArray)[j] = curValue; // FIXME maybe break; } @@ -5657,7 +6599,6 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, return rc; } - int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, @@ -5908,8 +6849,340 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, valArray = (Token*) calloc(sizeof(Token), 1); break; case WriteEngine::WR_BINARY: - case WriteEngine::WR_INT128: - valArray = (char*) calloc(sizeof(char), curColStruct.colWidth); //FIXME maybe + //case WriteEngine::WR_INT128: + valArray = calloc(sizeof(char), curColStruct.colWidth); //FIXME maybe + break; + } + + // convert values to valArray + if (m_opType != DELETE) + { + bExcp = false; + ColTuple curTuple; + curTuple = curTupleList[0]; + + try + { + convertValue(curColStruct.colType, valArray, curTuple.data); + } + catch (...) + { + bExcp = true; + } + + if (bExcp) + { + if (rangeListTot.size() > 0) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); + + return ERR_PARSING; + } + +#ifdef PROFILE + timer.start("writeRow "); +#endif + rc = colOp->writeRows(curCol, totalRow, ridList, valArray); +#ifdef PROFILE + timer.stop("writeRow "); +#endif + } + else + { +#ifdef PROFILE + timer.start("writeRows "); +#endif + rc = colOp->writeRows(curCol, totalRow, ridList, valArray, 0, true); +#ifdef PROFILE + timer.stop("writeRows "); +#endif + } + +// colOldValueList.push_back(oldValArray); +//timer.start("Delete:closefile"); + colOp->clearColumn(curCol); + +//timer.stop("Delete:closefile"); + if (valArray != NULL) + free(valArray); + + // check error + if (rc != NO_ERROR) + break; + + } // end of for (i = 0) + +// timer.start("Delete:purgePrimProcFdCache"); + if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0)) + cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); + +//if (idbdatafile::IDBPolicy::useHdfs()) +// cacheutils::dropPrimProcFdCache(); +//timer.stop("Delete:purgePrimProcFdCache"); + if (rangeListTot.size() > 0) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); + +//timer.stop("Delete:writecolrec"); +//#ifdef PROFILE +//timer.finish(); +//#endif + return rc; +} + + + +int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, + const CSCTypesList& cscColTypes, + const ColStructList& colStructList, + const ColValueList& colValueList, + vector& colOldValueList, + const RIDList& ridList, + const int32_t tableOid, + bool convertStructFlag, + ColTupleList::size_type nRows) +{ + bool bExcp; + int rc = 0; + void* valArray = NULL; + Column curCol; + ColStruct curColStruct; + ColTupleList curTupleList, oldTupleList; + ColStructList::size_type totalColumn; + ColStructList::size_type i; + ColTupleList::size_type totalRow; + + setTransId(txnid); + colOldValueList.clear(); + totalColumn = colStructList.size(); + totalRow = nRows; + +#ifdef PROFILE + StopWatch timer; +#endif + + vector rangeListTot; + std::vector freeList; + vector > fboLists; + vector > rangeLists; + rc = processBeginVBCopy(txnid, colStructList, ridList, freeList, fboLists, rangeLists, rangeListTot); + + if (rc != NO_ERROR) + { + if (rangeListTot.size() > 0) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); + + switch (rc) + { + case BRM::ERR_DEADLOCK: + return ERR_BRM_DEAD_LOCK; + + case BRM::ERR_VBBM_OVERFLOW: + return ERR_BRM_VB_OVERFLOW; + + case BRM::ERR_NETWORK: + return ERR_BRM_NETWORK; + + case BRM::ERR_READONLY: + return ERR_BRM_READONLY; + + default: + return ERR_BRM_BEGIN_COPY; + } + } + + VBRange aRange; + uint32_t blocksProcessedThisOid = 0; + uint32_t blocksProcessed = 0; + std::vector files; + TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); + + for (i = 0; i < totalColumn; i++) + { + valArray = NULL; + curColStruct = colStructList[i]; + curTupleList = colValueList[i]; //same value for all rows + ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)]; + + // convert column data type + if (convertStructFlag) + Convertor::convertColType(&curColStruct); + + // set params + colOp->initColumn(curCol); + colOp->setColParam(curCol, 0, curColStruct.colWidth, + curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, + curColStruct.fCompressionType, curColStruct.fColDbRoot, + curColStruct.fColPartition, curColStruct.fColSegment); + + + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment)) + break; + + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = curColStruct.fColDbRoot; + aExt.partNum = curColStruct.fColPartition; + aExt.segNum = curColStruct.fColSegment; + aExt.compType = curColStruct.fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + string segFile; + rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + + if (rc != NO_ERROR) + break; + + if (curColStruct.fCompressionType == 0) + { + BRM::FileInfo aFile; + aFile.oid = curColStruct.dataOid; + aFile.partitionNum = curColStruct.fColPartition; + aFile.dbRoot = curColStruct.fColDbRoot;; + aFile.segmentNum = curColStruct.fColSegment; + aFile.compType = curColStruct.fCompressionType; + files.push_back(aFile); + } + + // handling versioning + //cout << " pass to processVersionBuffer rid " << rowIdArray[0] << endl; + //cout << "dataOid:fColPartition = " << curColStruct.dataOid << ":" << curColStruct.fColPartition << endl; +//timer.start("processVersionBuffers"); + //vector rangeList; + // rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow, ridList, rangeList); + std::vector curFreeList; + uint32_t blockUsed = 0; + + if (!idbdatafile::IDBPolicy::useHdfs()) + { + if (rangeListTot.size() > 0) + { + if (freeList[0].size >= (blocksProcessed + rangeLists[i].size())) + { + aRange.vbOID = freeList[0].vbOID; + aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; + aRange.size = rangeLists[i].size(); + curFreeList.push_back(aRange); + //cout << "range size = " << aRange.size <<" and blocksProcessed = " << blocksProcessed<< endl; + } + else + { + aRange.vbOID = freeList[0].vbOID; + aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; + aRange.size = freeList[0].size - blocksProcessed; + blockUsed = aRange.size; + curFreeList.push_back(aRange); + + if (freeList.size() > 1) + { + aRange.vbOID = freeList[1].vbOID; + aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid; + aRange.size = rangeLists[i].size() - blockUsed; + curFreeList.push_back(aRange); + blocksProcessedThisOid += aRange.size; + } + else + { + rc = 1; + break; + } + + //cout << "curFreeList size = " << curFreeList.size() << endl; + + } + + blocksProcessed += rangeLists[i].size(); + + //timer.start("Delete:writeVB"); + rc = BRMWrapper::getInstance()-> + writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid, + curColStruct.dataOid, fboLists[i], rangeLists[i], + colOp, curFreeList, curColStruct.fColDbRoot, true); + } + } + + //timer.stop("Delete:writeVB"); +//timer.stop("processVersionBuffers"); + // cout << " rc for processVersionBuffer is " << rc << endl; + if (rc != NO_ERROR) + { + if (curColStruct.fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + if (rangeListTot.size() > 0) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); + + break; + } + + switch (curColStruct.colType) + { + case WriteEngine::WR_INT: + case WriteEngine::WR_MEDINT: + valArray = (int*) calloc(sizeof(int), 1); + break; + + case WriteEngine::WR_UINT: + case WriteEngine::WR_UMEDINT: + valArray = (uint32_t*) calloc(sizeof(uint32_t), 1); + break; + + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + valArray = (char*) calloc(sizeof(char), 1 * MAX_COLUMN_BOUNDARY); + break; + + case WriteEngine::WR_FLOAT: + valArray = (float*) calloc(sizeof(float), 1); + break; + + case WriteEngine::WR_DOUBLE: + valArray = (double*) calloc(sizeof(double), 1); + break; + + case WriteEngine::WR_BYTE: + valArray = (char*) calloc(sizeof(char), 1); + break; + + case WriteEngine::WR_UBYTE: + valArray = (uint8_t*) calloc(sizeof(uint8_t), 1); + break; + + case WriteEngine::WR_SHORT: + valArray = (short*) calloc(sizeof(short), 1); + break; + + case WriteEngine::WR_USHORT: + valArray = (uint16_t*) calloc(sizeof(uint16_t), 1); + break; + + case WriteEngine::WR_LONGLONG: + valArray = (long long*) calloc(sizeof(long long), 1); + break; + + case WriteEngine::WR_ULONGLONG: + valArray = (uint64_t*) calloc(sizeof(uint64_t), 1); + break; + + case WriteEngine::WR_TOKEN: + valArray = (Token*) calloc(sizeof(Token), 1); + break; + case WriteEngine::WR_BINARY: + //case WriteEngine::WR_INT128: + valArray = calloc(sizeof(char), curColStruct.colWidth); //FIXME maybe break; } @@ -6472,7 +7745,6 @@ int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, colTuple.data = nextVal; colTuples.push_back(colTuple); colValueList.push_back(colTuples); - //TxnID txnid; rc = writeColumnRecords(txnId, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false); if (rc != NO_ERROR) diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index 229d8c76e..f3ca7ec4e 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -157,7 +157,15 @@ public: /** * @brief Convert interface value list to internal value array */ - EXPORT void convertValArray(size_t totalRow, const ColType colType, + EXPORT void convertValArray(const size_t totalRow, + const execplan::CalpontSystemCatalog::ColType& cscColType, + const ColType colType, + ColTupleList& curTupleList, void* valArray, + bool bFromList = true) ; + + // WIP legacy + EXPORT void convertValArray(const size_t totalRow, + const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList = true) ; @@ -364,6 +372,7 @@ public: * @param dicStringListt dictionary values list */ EXPORT int insertColumnRec_Single(const TxnID& txnid, + CSCTypesList& cscColTypesList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -650,10 +659,8 @@ private: /** * @brief Convert interface column type to a internal column type */ - // void convertColType(void* curStruct, const FuncType curType = FUNC_WRITE_ENGINE) const; - + void convertValue(const execplan::CalpontSystemCatalog::ColType &fullColType, ColType colType, void* valArray, size_t pos, boost::any& data, bool fromList = true); void convertValue(const ColType colType, void* valArray, size_t pos, boost::any& data, bool fromList = true); - /** * @brief Convert column value to its internal representation * @@ -661,6 +668,7 @@ private: * @param value Memory pointer for storing output value. Should be pre-allocated * @param data Column data */ + void convertValue(const execplan::CalpontSystemCatalog::ColType &fullColType, const ColType colType, void* value, boost::any& data); void convertValue(const ColType colType, void* value, boost::any& data); /** @@ -690,11 +698,21 @@ private: /** * @brief Common methods to write values to a column */ - int writeColumnRec(const TxnID& txnid, const ColStructList& colStructList, + int writeColumnRec(const TxnID& txnid, + const CSCTypesList& cscColTypes, + const ColStructList& colStructList, ColValueList& colValueList, RID* rowIdArray, const ColStructList& newColStructList, ColValueList& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning = true); + // WIP + int writeColumnRec(const TxnID& txnid, + const ColStructList& colStructList, + ColValueList& colValueList, + RID* rowIdArray, const ColStructList& newColStructList, + ColValueList& newColValueList, const int32_t tableOid, + bool useTmpSuffix, bool versioning = true); + int writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, std::vector& colValueList, @@ -705,10 +723,19 @@ private: //@Bug 1886,2870 pass the address of ridList vector - int writeColumnRec(const TxnID& txnid, const ColStructList& colStructList, + int writeColumnRec(const TxnID& txnid, + const CSCTypesList& cscColTypes, + const ColStructList& colStructList, const ColValueList& colValueList, std::vector& colOldValueList, const RIDList& ridList, const int32_t tableOid, bool convertStructFlag = true, ColTupleList::size_type nRows = 0); + // WIP legacy + int writeColumnRec(const TxnID& txnid, + const ColStructList& colStructList, + const ColValueList& colValueList, std::vector& colOldValueList, + const RIDList& ridList, const int32_t tableOid, + bool convertStructFlag = true, ColTupleList::size_type nRows = 0); + //For update column from column to use int writeColumnRecords(const TxnID& txnid, std::vector& colStructList,