From 93170c3b316e3a85aed2f10983467db7e9a55a8a Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Mon, 17 Feb 2020 19:52:05 -0500 Subject: [PATCH] MCOL-641 Basic support for multi-value inserts, and deletes. --- dbcon/ddlpackageproc/ddlpackageprocessor.cpp | 13 +- dbcon/dmlpackageproc/dmlpackageprocessor.h | 2 +- primitives/linux-port/column.cpp | 4 +- writeengine/server/we_ddlcommandproc.cpp | 117 +- writeengine/server/we_dmlcommandproc.cpp | 36 +- writeengine/server/we_dmlcommandproc.h | 4 +- writeengine/shared/we_blockop.cpp | 5 +- writeengine/shared/we_convertor.cpp | 2 +- writeengine/wrapper/we_colop.cpp | 32 +- writeengine/wrapper/writeengine.cpp | 1554 +----------------- writeengine/wrapper/writeengine.h | 48 +- 11 files changed, 266 insertions(+), 1551 deletions(-) diff --git a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp index b26392b0c..97712bf38 100644 --- a/dbcon/ddlpackageproc/ddlpackageprocessor.cpp +++ b/dbcon/ddlpackageproc/ddlpackageprocessor.cpp @@ -1550,17 +1550,20 @@ void DDLPackageProcessor::updateSyscolumns(execplan::CalpontSystemCatalog::SCN t if (result.result != NO_ERROR) return; - WriteEngine::ColStructList colStructs; + WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; //std::vector colStructs; WriteEngine::ColStruct colStruct; + execplan::CalpontSystemCatalog::ColType colType; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; //Build column structure for COLUMNPOS_COL - colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS; - colStruct.colWidth = 4; + colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS; + colType.colWidth = colStruct.colWidth = 4; colStruct.tokenFlag = false; - colStruct.colDataType = CalpontSystemCatalog::INT; + colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT; colStructs.push_back(colStruct); + cscColTypeList.push_back(colType); int error; std::string err; std::vector colOldValuesList1; @@ -1568,7 +1571,7 @@ void DDLPackageProcessor::updateSyscolumns(execplan::CalpontSystemCatalog::SCN t try { //@Bug 3051 use updateColumnRecs instead of updateColumnRec to use different value for diffrent rows. - if (NO_ERROR != (error = fWriteEngine.updateColumnRecs( txnID, colStructs, colValuesList, ridList ))) + if (NO_ERROR != (error = fWriteEngine.updateColumnRecs( txnID, cscColTypeList, colStructs, colValuesList, ridList ))) { // build the logging message WErrorCodes ec; diff --git a/dbcon/dmlpackageproc/dmlpackageprocessor.h b/dbcon/dmlpackageproc/dmlpackageprocessor.h index cc213c347..05c428781 100644 --- a/dbcon/dmlpackageproc/dmlpackageprocessor.h +++ b/dbcon/dmlpackageproc/dmlpackageprocessor.h @@ -478,7 +478,7 @@ protected: { if (((colType.colDataType == execplan::CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) || ((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) - || ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 18)) + || ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 38)) || (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY) || (colType.colDataType == execplan::CalpontSystemCatalog::BLOB) || (colType.colDataType == execplan::CalpontSystemCatalog::TEXT)) diff --git a/primitives/linux-port/column.cpp b/primitives/linux-port/column.cpp index 4d19940e5..c8fd6673d 100644 --- a/primitives/linux-port/column.cpp +++ b/primitives/linux-port/column.cpp @@ -286,9 +286,7 @@ inline bool isEmptyVal<16>(uint8_t type, const uint8_t* ival) // For BINARY { const uint64_t* val = reinterpret_cast(ival); // WIP ugly speed hack - return (((val[0] == joblist::BINARYEMPTYROW) && (val[1] == joblist::BINARYEMPTYROW)) - || ((val[0] == joblist::BIGINTEMPTYROW) && (val[1] == joblist::BIGINTEMPTYROW))) -; + return ((val[0] == joblist::BINARYEMPTYROW) && (val[1] == joblist::BINARYEMPTYROW)); } diff --git a/writeengine/server/we_ddlcommandproc.cpp b/writeengine/server/we_ddlcommandproc.cpp index 988bb340a..8356ffc22 100644 --- a/writeengine/server/we_ddlcommandproc.cpp +++ b/writeengine/server/we_ddlcommandproc.cpp @@ -140,6 +140,7 @@ uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) WriteEngine::ColTuple colTuple; WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColTupleList colTuples; WriteEngine::dictStr dctColTuples; WriteEngine::DctnryStruct dctnryStruct; @@ -255,6 +256,7 @@ uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) } colStructs.push_back(colStruct); + cscColTypeList.push_back(column.colType); oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); @@ -293,7 +295,7 @@ uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) // TODO: This may be redundant static boost::mutex dbrmMutex; boost::mutex::scoped_lock lk(dbrmMutex); - error = fWEWrapper.insertColumnRec_SYS(txnID, colStructs, colValuesList, + error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList, dctnryStructList, dctnryValueList, SYSCOLUMN_BASE); if (error != WriteEngine::NO_ERROR) @@ -392,6 +394,7 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err WriteEngine::ColTuple colTuple; WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColTupleList colTuples; WriteEngine::dictStr dctColTuples; WriteEngine::DctnryStruct dctnryStruct; @@ -702,6 +705,7 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err { colStructs.push_back(colStruct); dctnryStructList.push_back (dctnryStruct); + cscColTypeList.push_back(column.colType); } colList[i].push_back(colTuple); @@ -733,7 +737,7 @@ uint8_t WE_DDLCommandProc::writeCreateSyscolumn(ByteStream& bs, std::string& err } //fWEWrapper.setDebugLevel(WriteEngine::DEBUG_3); - error = fWEWrapper.insertColumnRec_SYS(txnID, colStructs, colValuesList, + error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList, dctnryStructList, dctnryValueList, SYSCOLUMN_BASE); if (idbdatafile::IDBPolicy::useHdfs()) @@ -819,6 +823,7 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err) WriteEngine::ColStruct colStruct; WriteEngine::ColTuple colTuple; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColTupleList colTuples; WriteEngine::DctColTupleList dctColTuples; WriteEngine::ColValueList colValuesList; @@ -1091,6 +1096,7 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err) colStructs.push_back(colStruct); dctnryStructList.push_back (dctnryStruct); + cscColTypeList.push_back(column.colType); colList[i].push_back(colTuple); //colList.push_back(WriteEngine::ColTupleList()); //colList.back().push_back(colTuple); @@ -1118,7 +1124,7 @@ uint8_t WE_DDLCommandProc::writeSyscolumn(ByteStream& bs, std::string& err) fWEWrapper.startTransaction(txnID); int rc1 = 0; - error = fWEWrapper.insertColumnRec_SYS(txnID, colStructs, colValuesList, + error = fWEWrapper.insertColumnRec_SYS(txnID, cscColTypeList, colStructs, colValuesList, dctnryStructList, dctnryValueList, SYSCOLUMN_BASE); if (idbdatafile::IDBPolicy::useHdfs()) @@ -1368,7 +1374,9 @@ uint8_t WE_DDLCommandProc::deleteSyscolumn(ByteStream& bs, std::string& err) WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; @@ -1403,17 +1411,19 @@ uint8_t WE_DDLCommandProc::deleteSyscolumn(ByteStream& bs, std::string& err) oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); + cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); + colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { - int error = fWEWrapper.deleteRow(txnID, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); int rc1 = 0; @@ -1508,7 +1518,9 @@ uint8_t WE_DDLCommandProc::deleteSyscolumnRow(ByteStream& bs, std::string& err) WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; @@ -1537,17 +1549,20 @@ uint8_t WE_DDLCommandProc::deleteSyscolumnRow(ByteStream& bs, std::string& err) oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); + cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); + colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { - int error = fWEWrapper.deleteRow(txnID, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) @@ -1644,7 +1659,9 @@ uint8_t WE_DDLCommandProc::deleteSystable(ByteStream& bs, std::string& err) WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; @@ -1672,17 +1689,20 @@ uint8_t WE_DDLCommandProc::deleteSystable(ByteStream& bs, std::string& err) oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); + cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); + colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { - int error = fWEWrapper.deleteRow(txnID, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) @@ -1755,7 +1775,9 @@ uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err) systemCatalogPtr->identity(CalpontSystemCatalog::EC); WriteEngine::ColStruct colStruct; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector colValuesList; WriteEngine::RIDList ridList; std::vector ridLists; @@ -1804,16 +1826,19 @@ uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err) oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); + cscColTypeList.push_back(column.colType); ++column_iterator; } colExtentsStruct.push_back(colStructs); + colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); { - int error = fWEWrapper.deleteRow(txnID, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) @@ -1867,7 +1892,9 @@ uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err) CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(userTableName); colStructs.clear(); + cscColTypeList.clear(); colExtentsStruct.clear(); + colExtentsColType.clear(); colValuesList.clear(); ridList.clear(); ridLists.clear(); @@ -1902,17 +1929,20 @@ uint8_t WE_DDLCommandProc::deleteSystables(ByteStream& bs, std::string& err) colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column.colType); //oidsToFlush.push_back(colStruct.dataOid); ++column_iterator; } colExtentsStruct.push_back(colStructs); + colExtentsColType.push_back(cscColTypeList); ridLists.push_back(ridList); if (0 != colStructs.size() && 0 != ridLists[0].size()) { - int error = fWEWrapper.deleteRow(txnID, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int error = fWEWrapper.deleteRow(txnID, colExtentsColType, colExtentsStruct, colValuesList, ridLists, SYSCOLUMN_BASE); + int rc1 = 0; if (idbdatafile::IDBPolicy::useHdfs()) @@ -2035,6 +2065,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err) WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; //std::vector oidsToFlush; @@ -2075,6 +2106,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err) oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { @@ -2083,6 +2115,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err) colValuesList.push_back(aColList); std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector dctnryExtentsStruct; std::vector extentsinfo; extentInfo aExtentinfo; @@ -2125,13 +2158,14 @@ uint8_t WE_DDLCommandProc::updateSyscolumnAuto(ByteStream& bs, std::string& err) colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row if (idbdatafile::IDBPolicy::useHdfs()) fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -2226,6 +2260,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; //std::vector oidsToFlush; @@ -2259,6 +2294,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string //oidsToFlush.push_back(colStruct.dataOid); colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { @@ -2296,6 +2332,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string std::vector ridLists; std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector dctnryExtentsStruct; ridLists.push_back(ridList); @@ -2314,13 +2351,14 @@ uint8_t WE_DDLCommandProc::updateSyscolumnNextvalCol(ByteStream& bs, std::string colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row fWEWrapper.setTransId(txnID); fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -2396,6 +2434,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; tableName.schema = CALPONT_SCHEMA; tableName.table = SYSCOLUMN_TABLE; @@ -2463,6 +2502,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { @@ -2485,6 +2525,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& std::vector extentsinfo; extentInfo aExtentinfo; std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector dctnryExtentsStruct; for (unsigned int i = 0; i < roList.size(); i++) @@ -2521,6 +2562,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row @@ -2529,7 +2571,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnTablename(ByteStream& bs, std::string& fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -2625,6 +2667,7 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err) WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; //std::vector oidsToFlush; @@ -2658,12 +2701,14 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err) } colStructs.push_back(colStruct); + cscColTypeList.push_back(column.colType); oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); dctnryStructList.push_back(dctnryStruct); aColList.push_back(colTuple); colValuesList.push_back(aColList); std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector dctnryExtentsStruct; @@ -2701,6 +2746,7 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err) } colExtentsStruct.push_back(colStructs); + colExtentsColType.push_back(cscColTypeList); dctnryExtentsStruct.push_back(dctnryStructList); } @@ -2710,7 +2756,7 @@ uint8_t WE_DDLCommandProc::updateSystableAuto(ByteStream& bs, std::string& err) fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -2804,6 +2850,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; //std::vector oidsToFlush; @@ -2853,6 +2900,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -2865,6 +2913,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& colValuesList.push_back(aColList); std::vector colExtentsStruct; std::vector dctnryExtentsStruct; + std::vector colExtentsColType; dctColList = dictTuple; dctRowList.push_back(dctColList); @@ -2900,6 +2949,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row @@ -2908,7 +2958,7 @@ uint8_t WE_DDLCommandProc::updateSystableTablename(ByteStream& bs, std::string& fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -3032,6 +3082,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; //std::vector oidsToFlush; @@ -3090,6 +3141,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -3102,6 +3154,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& colValuesList.push_back(aColList); std::vector colExtentsStruct; std::vector dctnryExtentsStruct; + std::vector colExtentsColType; dctColList = dictTuple; dctRowList.push_back(dctColList); @@ -3137,6 +3190,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row @@ -3145,7 +3199,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& fWEWrapper.setBulkFlag(false); fWEWrapper.startTransaction(txnID); - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -3207,6 +3261,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& colValuesList.clear(); aColList.clear(); colStructs.clear(); + cscColTypeList.clear(); colOldValuesList.clear(); oids.clear(); tableName.schema = CALPONT_SCHEMA; @@ -3287,6 +3342,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(column.colType); for (unsigned int i = 0; i < roList.size(); i++) { @@ -3307,6 +3363,7 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& dctnryValueList.push_back(dctRowList); extentsinfo.clear(); colExtentsStruct.clear(); + colExtentsColType.clear(); dctnryExtentsStruct.clear(); oid = 1021; @@ -3344,10 +3401,11 @@ uint8_t WE_DDLCommandProc::updateSystablesTablename(ByteStream& bs, std::string& colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row - rc = fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + rc = fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE); if (rc != NO_ERROR) @@ -3481,11 +3539,13 @@ uint8_t WE_DDLCommandProc::updateSyscolumnColumnposCol(messageqcpp::ByteStream& WriteEngine::ColStruct colStruct; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; + WriteEngine::CSCTypesList cscColTypeList; + CalpontSystemCatalog::ColType colType; //Build column structure for COLUMNPOS_COL - colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS; - colStruct.colWidth = 4; + colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_COLUMNPOS; + colType.colWidth = colStruct.colWidth = 4; colStruct.tokenFlag = false; - colStruct.colDataType = CalpontSystemCatalog::INT; + colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::INT; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) @@ -3494,9 +3554,10 @@ uint8_t WE_DDLCommandProc::updateSyscolumnColumnposCol(messageqcpp::ByteStream& } colStructs.push_back(colStruct); + cscColTypeList.push_back(colType); oids[colStruct.dataOid] = colStruct.dataOid; //oidsToFlush.push_back(colStruct.dataOid); - rc = fWEWrapper.updateColumnRecs( txnID, colStructs, colValuesList, ridList, SYSCOLUMN_BASE ); + rc = fWEWrapper.updateColumnRecs( txnID, cscColTypeList, colStructs, colValuesList, ridList, SYSCOLUMN_BASE ); } int rc1 = 0; @@ -3592,7 +3653,7 @@ uint8_t WE_DDLCommandProc::fillNewColumn(ByteStream& bs, std::string& err) std::map oids; oids[dataOid] = dataOid; oids[refColOID] = refColOID; - rc = fWEWrapper.fillColumn(txnID, dataOid, dataType, dataWidth, defaultVal, refColOID, refColDataType, + rc = fWEWrapper.fillColumn(txnID, dataOid, colType, defaultVal, refColOID, refColDataType, refColWidth, refCompressionType, isNULL, compressionType, defaultValStr, dictOid, autoincrement); if ( rc != 0 ) @@ -4163,6 +4224,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList1; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; @@ -4268,6 +4330,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -4299,6 +4362,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs std::vector colExtentsStruct; std::vector dctnryExtentsStruct; + std::vector colExtentsColType; std::vector ridLists; ridLists.push_back(ridList); @@ -4329,11 +4393,12 @@ uint8_t WE_DDLCommandProc::updateSyscolumnSetDefault(messageqcpp::ByteStream& bs colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row - if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE)) { err = "WE: Update failed on: " + atableName.table; @@ -4439,6 +4504,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& WriteEngine::ColValueList colValuesList; WriteEngine::ColTupleList aColList1; WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; std::vector colOldValuesList; std::map oids; //std::vector oidsToFlush; @@ -4557,6 +4623,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column1.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -4594,6 +4661,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column2.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -4627,6 +4695,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column3.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -4661,6 +4730,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colStructs.push_back(colStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column4.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -4770,6 +4840,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); oids[colStruct.dataOid] = colStruct.dataOid; + cscColTypeList.push_back(column5.colType); //oidsToFlush.push_back(colStruct.dataOid); if (dctnryStruct.dctnryOid > 0) @@ -4797,6 +4868,7 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& dctRowList.push_back(dctColList); dctnryValueList.push_back(dctRowList); std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector dctnryExtentsStruct; std::vector ridLists; ridLists.push_back(ridList); @@ -4828,10 +4900,11 @@ uint8_t WE_DDLCommandProc::updateSyscolumnRenameColumn(messageqcpp::ByteStream& colExtentsStruct.push_back(colStructs); dctnryExtentsStruct.push_back(dctnryStructList); + colExtentsColType.push_back(cscColTypeList); } // call the write engine to update the row - if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsStruct, colValuesList, colOldValuesList, + if (NO_ERROR != fWEWrapper.updateColumnRec(txnID, colExtentsColType, colExtentsStruct, colValuesList, colOldValuesList, ridLists, dctnryExtentsStruct, dctnryValueList, SYSCOLUMN_BASE)) { err = "WE: Update failed on: " + atableName.table; diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index b5b00f81b..bbef08857 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -116,7 +116,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: RowList rows = tablePtr->get_RowList(); WriteEngine::ColStructList colStructs; - WriteEngine::CSCTypesList cscColTypes; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::ColValueList colValuesList; @@ -141,7 +141,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: Row* rowPtr = rows.at(0); ColumnList columns = rowPtr->get_ColumnList(); unsigned int numcols = rowPtr->get_NumberOfColumns(); - cscColTypes.reserve(numcols); + cscColTypeList.reserve(numcols); // WIP // We presume that DictCols number is low colStructs.reserve(numcols); @@ -159,7 +159,6 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: CalpontSystemCatalog::ColType colType; colType = systemCatalogPtr->colType(oid); - cscColTypes.push_back(colType); WriteEngine::ColStruct colStruct; colStruct.fColDbRoot = dbroot; WriteEngine::DctnryStruct dctnryStruct; @@ -169,7 +168,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: colStruct.fCompressionType = colType.compressionType; // Token - if ( isDictCol(colType) ) + if (isDictCol(colType) ) { // WIP Hardcoded value colStruct.colWidth = 8; @@ -199,6 +198,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(colType); ++column_iterator; } @@ -538,7 +538,7 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: if (colValuesList[0].size() > 0) { if (NO_ERROR != - (error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypes, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum))) + (error = fWEWrapper.insertColumnRec_Single(txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList, dicStringList, tableRoPair.objnum))) { if (error == ERR_BRM_DEAD_LOCK) { @@ -842,6 +842,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: bool isInsertSelect = insertPkg.get_isInsertSelect(); WriteEngine::ColStructList colStructs; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryValueList dctnryValueList; WriteEngine::ColValueList colValuesList; @@ -1046,7 +1047,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: colStruct.fCompressionType = colType.compressionType; // Token - if ( isDictCol(colType) ) + if (isDictCol(colType) ) { colStruct.colWidth = 8; colStruct.tokenFlag = true; @@ -1075,6 +1076,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: colStructs.push_back(colStruct); dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(colType); ++column_iterator; } @@ -1324,7 +1326,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: */ if (NO_ERROR != - (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, + (error = fWEWrapper.insertColumnRecs(txnid.id, cscColTypeList, colStructs, colValuesList, dctnryStructList, dicStringList, dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm))) { if (error == ERR_BRM_DEAD_LOCK) @@ -2691,6 +2693,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, WriteEngine::ColStruct colStruct; WriteEngine::ColValueList colValueList; WriteEngine::RIDList rowIDLists; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::DctnryStructList dctnryStructList; WriteEngine::DctnryStruct dctnryStruct; @@ -2858,7 +2861,7 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, colStruct.fCompressionType = colType.compressionType; tableColName.column = columnsUpdated[j]->get_Name(); - if ( !ridsFetched) + if (!ridsFetched) { // querystats uint64_t relativeRID = 0; @@ -3806,12 +3809,13 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, colStructList.push_back(colStruct); colValueList.push_back (colTupleList); + cscColTypeList.push_back(colType); } //end of bulding values and column structure. //timer.stop("fetch values"); if (rowIDLists.size() > 0) { - error = fWEWrapper.updateColumnRecs(txnId, colStructList, colValueList, rowIDLists, tableRO.objnum); + error = fWEWrapper.updateColumnRecs(txnId, cscColTypeList, colStructList, colValueList, rowIDLists, tableRO.objnum); } if (error != NO_ERROR) @@ -4116,13 +4120,14 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, for (uint32_t j = 0; j < row.getColumnCount(); j++) { preBlkNums[j] = -1; - colWidth[j] = (row.getColumnWidth(j) >= 8 ? 8 : row.getColumnWidth(j)); + colWidth[j] = (row.getColumnWidth(j) >= 16 ? 16 : row.getColumnWidth(j)); } //Get the file information from rowgroup dbRoot = rowGroups[txnId]->getDBRoot(); rowGroups[txnId]->getLocation(&partition, &segment, &extentNum, &blockNum); WriteEngine::ColStructList colStructList; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColStruct colStruct; colStruct.fColPartition = partition; colStruct.fColSegment = segment; @@ -4158,7 +4163,9 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, colStruct.tokenFlag = false; colStruct.fCompressionType = colType.compressionType; - if (colType.colWidth > 8) //token + if (colType.colWidth > 8 && + !(colType.colDataType == CalpontSystemCatalog::DECIMAL || + colType.colDataType == CalpontSystemCatalog::UDECIMAL)) //token { colStruct.colWidth = 8; colStruct.tokenFlag = true; @@ -4170,7 +4177,8 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, colStruct.colDataType = colType.colDataType; - colStructList.push_back( colStruct ); + colStructList.push_back(colStruct); + cscColTypeList.push_back(colType); } } catch (exception& ex) @@ -4181,13 +4189,15 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, } std::vector colExtentsStruct; + std::vector colExtentsColType; std::vector colOldValueList; std::vector ridLists; colExtentsStruct.push_back(colStructList); + colExtentsColType.push_back(cscColTypeList); ridLists.push_back(rowIDList); int error = 0; - error = fWEWrapper.deleteRow( txnId, colExtentsStruct, colOldValueList, ridLists, roPair.objnum ); + error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists, roPair.objnum); if (error != NO_ERROR) { diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index 3786052b1..e80597d68 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -110,8 +110,8 @@ private: { if (((colType.colDataType == execplan::CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) || ((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) - || ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 65)) - || ((colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL) && (colType.precision > 65)) + || ((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 38)) + || ((colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL) && (colType.precision > 38)) || (colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY) || (colType.colDataType == execplan::CalpontSystemCatalog::BLOB) || (colType.colDataType == execplan::CalpontSystemCatalog::TEXT)) diff --git a/writeengine/shared/we_blockop.cpp b/writeengine/shared/we_blockop.cpp index da104ad9b..01a8cd6f1 100644 --- a/writeengine/shared/we_blockop.cpp +++ b/writeengine/shared/we_blockop.cpp @@ -82,6 +82,7 @@ bool BlockOp::calculateRowId( * RETURN: * emptyVal - the value of empty row ***********************************************************/ +// TODO MCOL-641 Add support here uint64_t BlockOp::getEmptyRowValue( const CalpontSystemCatalog::ColDataType colDataType, const int width ) const { @@ -138,8 +139,10 @@ uint64_t BlockOp::getEmptyRowValue( emptyVal = joblist::SMALLINTEMPTYROW; else if ( width <= 4 ) emptyVal = joblist::INTEMPTYROW; - else + else if ( width <= 8 ) emptyVal = joblist::BIGINTEMPTYROW; + else + emptyVal = joblist::BINARYEMPTYROW; break; diff --git a/writeengine/shared/we_convertor.cpp b/writeengine/shared/we_convertor.cpp index af6723f7d..9c5c63454 100644 --- a/writeengine/shared/we_convertor.cpp +++ b/writeengine/shared/we_convertor.cpp @@ -328,6 +328,7 @@ void Convertor::mapErrnoToString(int errNum, std::string& errString) * none ******************************************************************************/ /* static */ +// TODO MCOL-641 void Convertor::convertColType(CalpontSystemCatalog::ColDataType dataType, ColType& internalType, bool isToken) { @@ -778,7 +779,6 @@ int Convertor::getCorrectRowWidth(CalpontSystemCatalog::ColDataType dataType, in newWidth = 8; else newWidth = 16; - break; case CalpontSystemCatalog::DATE: diff --git a/writeengine/wrapper/we_colop.cpp b/writeengine/wrapper/we_colop.cpp index 24ce85d0f..ce091b0ea 100644 --- a/writeengine/wrapper/we_colop.cpp +++ b/writeengine/wrapper/we_colop.cpp @@ -85,6 +85,7 @@ ColumnOp::~ColumnOp() * NO_ERROR if success * rowIdArray - allocation of the row id left here ***********************************************************/ +// TODO MCOL-641 add support here int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, Column& column, uint64_t totalRow, RID* rowIdArray, HWM& hwm, bool& newExtent, uint64_t& rowsLeft, HWM& newHwm, bool& newFile, ColStructList& newColStructList, DctnryStructList& newDctnryStructList, std::vector >& dbRootExtentTrackers, @@ -126,6 +127,9 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, unsigned char buf[BYTE_PER_BLOCK]; unsigned char* curVal; int64_t emptyVal = getEmptyRowValue(column.colDataType, column.colWidth); // Seems is ok have it here and just once + // TODO MCOL-641 consolidate the emptyvalue logic + //__int128 bigEmptyVal; + //dataconvert::DataConvert::uint128Max(bigEmptyVal); if (useStartingExtent) { @@ -1455,6 +1459,7 @@ void ColumnOp::initColumn(Column& column) const // It is called at just 4 places on allocRowId() but all the time inside extend scanning loops // WIP Template this method +// TODO MCOL-641 Add support here. inline bool ColumnOp::isEmptyRow(uint64_t* curVal, uint64_t emptyVal, const int colWidth) { switch(colWidth){ @@ -1742,6 +1747,7 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray, break; } + // TODO MCOL-641 do we need support here? if (bDelete) { emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth); @@ -1793,6 +1799,13 @@ int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridLis uint64_t emptyVal; int rc = NO_ERROR; + int w = 0, incr = 8; + + if (curCol.colType == WriteEngine::WR_BINARY) + w = incr = curCol.colWidth; + else + w = curCol.colWidth > 8 ? 8 : curCol.colWidth; + while (!bExit) { curRowId = ridList[i]; @@ -1922,12 +1935,25 @@ int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridLis if (bDelete) { - emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth); - pVal = &emptyVal; + if (curCol.colType != WriteEngine::WR_BINARY) + { + emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth); + pVal = &emptyVal; + } + else + { + // fix this + uint128_t bigEmptyVal; + emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth); + *(reinterpret_cast(&bigEmptyVal)) = emptyVal; + *(reinterpret_cast(&bigEmptyVal) + 1) = emptyVal; + //dataconvert::DataConvert::uint128Max(bigEmptyVal); + pVal = &bigEmptyVal; + } } // This is the write stuff - for(int b = 0, w = curCol.colWidth > 8 ? 8 : curCol.colWidth; b < curCol.colWidth; b += 8) //FIXME for no loop + for (int b = 0; b < curCol.colWidth; b += incr) //FIXME for no loop writeBufValue(dataBuf + dataBio + b, pVal, w); i++; diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 29edee9d4..7d295b397 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -223,83 +223,7 @@ void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colSt using int128_t = __int128; using uint128_t = unsigned __int128; -struct uint128_pod -{ - uint64_t lo; - uint64_t hi; -}; - - - -inline void toString(uint128_t i, char *p) -{ - uint64_t div = 10000000000000000000ULL; - size_t div_log = 19; - uint128_t high = i; - uint128_t low; - low = high % div; - high /= div; - uint128_t mid; - mid = high % div; - high /= div; - - uint128_pod *high_pod = reinterpret_cast(&high); - uint128_pod *mid_pod = reinterpret_cast(&mid); - uint128_pod *low_pod = reinterpret_cast(&low); - int printed_chars = 0; - - // WIP replace snprintf with streams - if (high_pod->lo != 0) { - printed_chars = snprintf(p, div_log+1, "%ld", high_pod->lo); - p += printed_chars; - printed_chars = snprintf(p, div_log+1, "%019lu", mid_pod->lo); - p += printed_chars; - } else if (mid_pod->lo != 0) { - printed_chars = snprintf(p, div_log+1, "%lu", mid_pod->lo); - p += printed_chars; - } - snprintf(p, div_log+1, "%019ld", low_pod->lo); -} - - - -/*@convertValArray - Convert interface values to internal values - */ -/*********************************************************** - * DESCRIPTION: - * Convert interface values to internal values - * PARAMETERS: - * colStructList - column struct list - * colValueList - column value list - * RETURN: - * none - * valArray - output value array - * nullArray - output null flag array - ***********************************************************/ -void WriteEngineWrapper::convertValArray(const size_t totalRow, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList) -{ - ColTuple curTuple; - ColTupleList::size_type i; - - if (bFromList) - { - for (i = 0; i < curTupleList.size(); i++) - { - curTuple = curTupleList[i]; - convertValue(colType, valArray, i, curTuple.data); - } - } - else - { - for (i = 0; i < totalRow; i++) - { - convertValue(colType, valArray, i, curTuple.data, false); - curTupleList.push_back(curTuple); - } - } -} - -/*@convertValArray - Convert interface values to internal values +/*@convertValArray - Convert interface values to internal values */ /*********************************************************** * DESCRIPTION: @@ -313,7 +237,7 @@ void WriteEngineWrapper::convertValArray(const size_t totalRow, const ColType co * valArray - output value array * nullArray - output null flag array ***********************************************************/ -void WriteEngineWrapper::convertValArray(const size_t totalRow, const CalpontSystemCatalog::ColType &cscColType, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList) +void WriteEngineWrapper::convertValArray(const size_t totalRow, const CalpontSystemCatalog::ColType& cscColType, const ColType colType, ColTupleList& curTupleList, void* valArray, bool bFromList) { ColTuple curTuple; ColTupleList::size_type i; @@ -336,11 +260,10 @@ void WriteEngineWrapper::convertValArray(const size_t totalRow, const CalpontSys } } - /* * @brief Convert column value to its internal representation */ -void WriteEngineWrapper::convertValue(const ColType colType, void* value, boost::any& data) +void WriteEngineWrapper::convertValue(const execplan::CalpontSystemCatalog::ColType& cscColType, ColType colType, void* value, boost::any& data) { string curStr; int size; @@ -476,169 +399,11 @@ void WriteEngineWrapper::convertValue(const ColType colType, void* value, boost: } break; - // Replace INT128 with WR_BINARY using CSC::ColType data - /*case WriteEngine::WR_INT128: - { - int128_t val = boost::any_cast(data); - size = 16; - // WIP Why do we use memcpy here? - memcpy(value, &val, size); - }*/ + // WIP MCOL-641 case WriteEngine::WR_BINARY: { - char val = boost::any_cast(data); - //TODO:FIXME how to determine size ? 16, 32,48 ? - // WIP - size = 16; - memcpy(value, &val, size); - } - break; - - } // end of switch (colType) -} /*@convertValue - The base for converting values */ - -// WIP this is ugly to have structs with the same name -void WriteEngineWrapper::convertValue(const execplan::CalpontSystemCatalog::ColType &fullColType, ColType colType, void* value, boost::any& data) -{ - string curStr; - int size; - - switch (colType) - { - case WriteEngine::WR_INT : - case WriteEngine::WR_MEDINT : - if (data.type() == typeid(int)) - { - int val = boost::any_cast(data); - size = sizeof(int); - memcpy(value, &val, size); - } - else - { - uint32_t val = boost::any_cast(data); - size = sizeof(uint32_t); - memcpy(value, &val, size); - } - - break; - - case WriteEngine::WR_UINT : - case WriteEngine::WR_UMEDINT : - { - uint32_t val = boost::any_cast(data); - size = sizeof(uint32_t); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR : - case WriteEngine::WR_BLOB : - case WriteEngine::WR_TEXT : - curStr = boost::any_cast(data); - - if ((int) curStr.length() > MAX_COLUMN_BOUNDARY) - curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY); - - memcpy(value, curStr.c_str(), curStr.length()); - break; - - case WriteEngine::WR_FLOAT: - { - float val = boost::any_cast(data); - -//N.B.There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan, -// but not necessarily the same bits that you put in. This only seems to be for float (double seems -// to work). - if (isnan(val)) - { - uint32_t ti = joblist::FLOATNULL; - float* tfp = (float*)&ti; - val = *tfp; - } - - size = sizeof(float); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_DOUBLE: - { - double val = boost::any_cast(data); - size = sizeof(double); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_SHORT: - { - short val = boost::any_cast(data); - size = sizeof(short); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_USHORT: - { - uint16_t val = boost::any_cast(data); - size = sizeof(uint16_t); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_BYTE: - { - char val = boost::any_cast(data); - size = sizeof(char); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_UBYTE: - { - uint8_t val = boost::any_cast(data); - size = sizeof(uint8_t); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_LONGLONG: - if (data.type() == typeid(long long)) - { - long long val = boost::any_cast(data); - size = sizeof(long long); - memcpy(value, &val, size); - } - else - { - uint64_t val = boost::any_cast(data); - size = sizeof(uint64_t); - memcpy(value, &val, size); - } - - break; - - case WriteEngine::WR_ULONGLONG: - { - uint64_t val = boost::any_cast(data); - size = sizeof(uint64_t); - memcpy(value, &val, size); - } - break; - - case WriteEngine::WR_TOKEN: - { - Token val = boost::any_cast(data); - size = sizeof(Token); - memcpy(value, &val, size); - } - break; - - // WIP - case WriteEngine::WR_BINARY: - { - size = fullColType.colWidth; - if (fullColType.colDataType == CalpontSystemCatalog::DECIMAL) + size = cscColType.colWidth; + if (cscColType.colDataType == CalpontSystemCatalog::DECIMAL) { int128_t val = boost::any_cast(data); memcpy(value, &val, size); @@ -648,192 +413,12 @@ void WriteEngineWrapper::convertValue(const execplan::CalpontSystemCatalog::ColT char val = boost::any_cast(data); memcpy(value, &val, size); } - } break; } // end of switch (colType) } /*@convertValue - The base for converting values */ -// WIP -// Legacy version -void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList) -{ - string curStr; - - if (fromList) - { - switch (colType) - { - case WriteEngine::WR_INT : - case WriteEngine::WR_MEDINT : - if (data.type() == typeid(long)) - ((int*)valArray)[pos] = static_cast(boost::any_cast(data)); - else if (data.type() == typeid(int)) - ((int*)valArray)[pos] = boost::any_cast(data); - else - ((int*)valArray)[pos] = boost::any_cast(data); - - break; - - case WriteEngine::WR_UINT : - case WriteEngine::WR_UMEDINT : - ((uint32_t*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR : - case WriteEngine::WR_BLOB : - case WriteEngine::WR_TEXT : - curStr = boost::any_cast(data); - - if ((int) curStr.length() > MAX_COLUMN_BOUNDARY) - curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY); - - memcpy((char*)valArray + pos * MAX_COLUMN_BOUNDARY, curStr.c_str(), curStr.length()); - break; - -// case WriteEngine::WR_LONG : ((long*)valArray)[pos] = boost::any_cast(curTuple.data); -// break; - case WriteEngine::WR_FLOAT: - ((float*)valArray)[pos] = boost::any_cast(data); - - if (isnan(((float*)valArray)[pos])) - { - uint32_t ti = joblist::FLOATNULL; - float* tfp = (float*)&ti; - ((float*)valArray)[pos] = *tfp; - } - - break; - - case WriteEngine::WR_DOUBLE: - ((double*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_SHORT: - ((short*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_USHORT: - ((uint16_t*)valArray)[pos] = boost::any_cast(data); - break; - -// case WriteEngine::WR_BIT: ((bool*)valArray)[pos] = boost::any_cast(data); -// break; - case WriteEngine::WR_BYTE: - ((char*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_UBYTE: - ((uint8_t*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_LONGLONG: - if (data.type() == typeid(long long)) - ((long long*)valArray)[pos] = boost::any_cast(data); - else if (data.type() == typeid(long)) - ((long long*)valArray)[pos] = (long long)boost::any_cast(data); - else - ((long long*)valArray)[pos] = boost::any_cast(data); - - break; - - case WriteEngine::WR_ULONGLONG: - ((uint64_t*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_TOKEN: - ((Token*)valArray)[pos] = boost::any_cast(data); - break; - - case WriteEngine::WR_BINARY: - { - curStr = boost::any_cast(data); - // String length or column width? - memcpy((char*)valArray + pos * curStr.length(), - curStr.c_str(), curStr.length()); - break; - } - } // end of switch (colType) - } - else - { - switch (colType) - { - case WriteEngine::WR_INT : - case WriteEngine::WR_MEDINT : - data = ((int*)valArray)[pos]; - break; - - case WriteEngine::WR_UINT : - case WriteEngine::WR_UMEDINT : - data = ((uint64_t*)valArray)[pos]; - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR : - case WriteEngine::WR_BLOB : - case WriteEngine::WR_TEXT : - char tmp[10]; - memcpy(tmp, (char*)valArray + pos * 8, 8); - curStr = tmp; - data = curStr; - break; - -// case WriteEngine::WR_LONG : ((long*)valArray)[pos] = boost::any_cast(curTuple.data); -// break; - case WriteEngine::WR_FLOAT: - data = ((float*)valArray)[pos]; - break; - - case WriteEngine::WR_DOUBLE: - data = ((double*)valArray)[pos]; - break; - - case WriteEngine::WR_SHORT: - data = ((short*)valArray)[pos]; - break; - - case WriteEngine::WR_USHORT: - data = ((uint16_t*)valArray)[pos]; - break; - -// case WriteEngine::WR_BIT: data = ((bool*)valArray)[pos]; -// break; - case WriteEngine::WR_BYTE: - data = ((char*)valArray)[pos]; - break; - - case WriteEngine::WR_UBYTE: - data = ((uint8_t*)valArray)[pos]; - break; - - case WriteEngine::WR_LONGLONG: - data = ((long long*)valArray)[pos]; - break; - - case WriteEngine::WR_ULONGLONG: - data = ((uint64_t*)valArray)[pos]; - break; - - case WriteEngine::WR_TOKEN: - data = ((Token*)valArray)[pos]; - break; - // WIP - case WriteEngine::WR_BINARY : - { - // WIP do we need tmp here? - char *tmp = (char*)alloca (sizeof(char) * 16); - memcpy(tmp, (char*)valArray + pos * 16, 16); - curStr = tmp; - data = curStr; - } - break; - } // end of switch (colType) - } // end of if -} - /*********************************************************** * DESCRIPTION: @@ -845,7 +430,7 @@ void WriteEngineWrapper::convertValue(const ColType colType, void* valArray, con * RETURN: * none ***********************************************************/ -void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType &fullColType, const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList) +void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType& cscColType, const ColType colType, void* valArray, const size_t pos, boost::any& data, bool fromList) { string curStr; @@ -936,7 +521,7 @@ void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType &fullC break; case WriteEngine::WR_BINARY: - if (fullColType.colDataType != CalpontSystemCatalog::DECIMAL) + if (cscColType.colDataType != CalpontSystemCatalog::DECIMAL) { curStr = boost::any_cast(data); // String length or column width? @@ -945,7 +530,7 @@ void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType &fullC else { int128_t val = boost::any_cast(data); - size_t size = fullColType.colWidth; + size_t size = cscColType.colWidth; // WIP Why do we use memcpy here? memcpy((uint8_t*)valArray+pos*size, &val, size); } @@ -1018,15 +603,15 @@ void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType &fullC break; // WIP case WriteEngine::WR_BINARY : - if (fullColType.colDataType == CalpontSystemCatalog::DECIMAL) + if (cscColType.colDataType == CalpontSystemCatalog::DECIMAL) { data = ((int128_t*)valArray)[pos]; } else { // WIP do we need tmp here? - char *tmp = (char*) alloca (sizeof(char) * fullColType.colWidth); - memcpy(tmp, (char*)valArray + pos * fullColType.colWidth, fullColType.colWidth); + char *tmp = (char*) alloca (sizeof(char) * cscColType.colWidth); + memcpy(tmp, (char*)valArray + pos * cscColType.colWidth, cscColType.colWidth); curStr = tmp; data = curStr; } @@ -1093,7 +678,7 @@ int WriteEngineWrapper::createColumn( * @brief Fill column with default values */ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, - const CalpontSystemCatalog::ColDataType dataType, int dataWidth, + const CalpontSystemCatalog::ColType& colType, ColTuple defaultVal, const OID& refColOID, const CalpontSystemCatalog::ColDataType refColDataType, int refColWidth, int refCompressionType, @@ -1113,36 +698,36 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, colOpNewCol->initColumn(newCol); refColOp->initColumn(refCol); uint16_t dbRoot = 1; //not to be used - int newDataWidth = dataWidth; + int newDataWidth = colType.colWidth; //Convert HWM of the reference column for the new column //Bug 1703,1705 bool isToken = false; - if (((dataType == CalpontSystemCatalog::VARCHAR) && (dataWidth > 7)) || - ((dataType == CalpontSystemCatalog::CHAR) && (dataWidth > 8)) || - (dataType == CalpontSystemCatalog::VARBINARY) || - (dataType == CalpontSystemCatalog::BLOB) || - (dataType == CalpontSystemCatalog::TEXT)) + if (((colType.colDataType == CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) || + ((colType.colDataType == CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) || + (colType.colDataType == CalpontSystemCatalog::VARBINARY) || + (colType.colDataType == CalpontSystemCatalog::BLOB) || + (colType.colDataType == CalpontSystemCatalog::TEXT)) { isToken = true; } - Convertor::convertColType(dataType, newColType, isToken); + Convertor::convertColType(colType.colDataType, newColType, isToken); // WIP // replace with isDictCol if (((refColDataType == CalpontSystemCatalog::VARCHAR) && (refColWidth > 7)) || ((refColDataType == CalpontSystemCatalog::CHAR) && (refColWidth > 8)) || (refColDataType == CalpontSystemCatalog::VARBINARY) || - (dataType == CalpontSystemCatalog::BLOB) || - (dataType == CalpontSystemCatalog::TEXT)) + (colType.colDataType == CalpontSystemCatalog::BLOB) || + (colType.colDataType == CalpontSystemCatalog::TEXT)) { isToken = true; } - newDataWidth = colOpNewCol->getCorrectRowWidth(dataType, dataWidth); + newDataWidth = colOpNewCol->getCorrectRowWidth(colType.colDataType, colType.colWidth); // MCOL-1347 CS doubles the width for ALTER TABLE..ADD COLUMN - if ( dataWidth < 4 && dataType == CalpontSystemCatalog::VARCHAR ) + if ( colType.colWidth < 4 && colType.colDataType == CalpontSystemCatalog::VARCHAR ) { newDataWidth >>= 1; } @@ -1151,7 +736,7 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, refColOp->setColParam(refCol, 0, refColOp->getCorrectRowWidth(refColDataType, refColWidth), refColDataType, refColType, (FID)refColOID, refCompressionType, dbRoot); colOpNewCol->setColParam(newCol, 0, newDataWidth, - dataType, newColType, (FID)dataOid, compressionType, dbRoot); + colType.colDataType, newColType, (FID)dataOid, compressionType, dbRoot); int size = sizeof(Token); @@ -1167,28 +752,31 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, else { // WIP - convertValue(newColType, defVal.get(), defaultVal.data); + convertValue(colType, newColType, defVal.get(), defaultVal.data); } if (rc == NO_ERROR) - rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid, dataWidth, defaultValStr, autoincrement); + rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid, colType.colWidth, defaultValStr, autoincrement); // flushing files is in colOp->fillColumn() return rc; } -int WriteEngineWrapper::deleteRow(const TxnID& txnid, vector& colExtentsStruct, vector& colOldValueList, +int WriteEngineWrapper::deleteRow(const TxnID& txnid, const vector& colExtentsColType, + vector& colExtentsStruct, vector& colOldValueList, vector& ridLists, const int32_t tableOid) { ColTuple curTuple; ColStruct curColStruct; + CalpontSystemCatalog::ColType cscColType; DctnryStruct dctnryStruct; ColValueList colValueList; ColTupleList curTupleList; DctnryStructList dctnryStructList; DctnryValueList dctnryValueList; ColStructList colStructList; + CSCTypesList cscColTypeList; uint64_t emptyVal; int rc; string tmpStr(""); @@ -1204,17 +792,33 @@ int WriteEngineWrapper::deleteRow(const TxnID& txnid, vector& col for (unsigned extent = 0; extent < numExtents; extent++) { colStructList = colExtentsStruct[extent]; + cscColTypeList = colExtentsColType[extent]; for (ColStructList::size_type i = 0; i < colStructList.size(); i++) { curTupleList.clear(); curColStruct = colStructList[i]; - emptyVal = m_colOp[op(curColStruct.fCompressionType)]-> - getEmptyRowValue(curColStruct.colDataType, curColStruct.colWidth); + cscColType = cscColTypeList[i]; + Convertor::convertColType(&curColStruct); + + if (curColStruct.colType == WriteEngine::WR_BINARY) + { + uint128_t bigEmptyVal; + emptyVal = m_colOp[op(curColStruct.fCompressionType)]-> + getEmptyRowValue(curColStruct.colDataType, curColStruct.colWidth); + *(reinterpret_cast(&bigEmptyVal)) = emptyVal; + *(reinterpret_cast(&bigEmptyVal) + 1) = emptyVal; + //dataconvert::DataConvert::uint128Max(bigEmptyVal); + curTuple.data = bigEmptyVal; + } + else + { + emptyVal = m_colOp[op(curColStruct.fCompressionType)]-> + getEmptyRowValue(curColStruct.colDataType, curColStruct.colWidth); + + curTuple.data = emptyVal; + } - curTuple.data = emptyVal; - //for (RIDList::size_type j = 0; j < ridLists[extent].size(); j++) - // curTupleList.push_back(curTuple); curTupleList.push_back(curTuple); colValueList.push_back(curTupleList); @@ -1239,7 +843,7 @@ int WriteEngineWrapper::deleteRow(const TxnID& txnid, vector& col // unfortunately I don't have a better way to instruct without passing too many parameters m_opType = DELETE; - rc = updateColumnRec(txnid, colExtentsStruct, colValueList, colOldValueList, ridLists, dctnryExtentsStruct, dctnryValueList, tableOid); + rc = updateColumnRec(txnid, colExtentsColType, colExtentsStruct, colValueList, colOldValueList, ridLists, dctnryExtentsStruct, dctnryValueList, tableOid); m_opType = NOOP; return rc; @@ -1417,6 +1021,7 @@ void WriteEngineWrapper::flushVMCache() const ***********************************************************/ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, + const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -1746,7 +1351,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, //-------------------------------------------------------------------------- // allocate row id(s) //-------------------------------------------------------------------------- - curColStruct = colStructList[colId]; + curColStruct = colStructList[colId]; colOp = m_colOp[op(curColStruct.fCompressionType)]; colOp->initColumn(curCol); @@ -1757,12 +1362,12 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, vector fileInfo; dbRoot = curColStruct.fColDbRoot; //use the first column to calculate row id - ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid); ColExtsInfo::iterator it = aColExtsInfo.begin(); while (it != aColExtsInfo.end()) { - if ((it->dbRoot == colStructList[colId].fColDbRoot) && + if ((it->dbRoot == colStructList[colId].fColDbRoot) && (it->partNum == colStructList[colId].fColPartition) && (it->segNum == colStructList[colId].fColSegment) && it->current ) { @@ -2179,7 +1784,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid, //---------------------------------------------------------------------- // Write row(s) to database file(s) //---------------------------------------------------------------------- - rc = writeColumnRec(txnid, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix); // @bug 5572 HDFS tmp file + rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix); // @bug 5572 HDFS tmp file } return rc; @@ -2963,6 +2568,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, + const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -3544,11 +3150,11 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, if (newExtent) { - rc = writeColumnRec(txnid, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file + rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } else { - rc = writeColumnRec(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file + rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } } @@ -3625,7 +3231,7 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, } int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, - CSCTypesList& cscColTypesList, + const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -4219,13 +3825,13 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, { if (newExtent) { - rc = writeColumnRec(txnid, cscColTypesList, colStructList, colOldValueList, + rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList, colNewValueList, tableOid, false); // @bug 5572 HDFS tmp file } else { - rc = writeColumnRec(txnid, cscColTypesList, colStructList, colValueList, + rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, true); // @bug 5572 HDFS tmp file } @@ -4635,6 +4241,7 @@ void WriteEngineWrapper::writeVBEnd(const TxnID& txnid, std::vector& colExtentsColType, vector& colExtentsStruct, ColValueList& colValueList, vector& colOldValueList, @@ -4644,40 +4251,20 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, const int32_t tableOid) { int rc = 0; - //RID* rowIdArray = NULL; - //RIDList::size_type i; unsigned numExtents = colExtentsStruct.size(); - // ColValueList tmpColValueList; - RIDList::const_iterator ridsIter; ColStructList colStructList; DctnryStructList dctnryStructList; + WriteEngine::CSCTypesList cscColTypeList; ColumnOp* colOp = NULL; for (unsigned extent = 0; extent < numExtents; extent++) { - ridsIter = ridLists[extent].begin(); - - //rowIdArray = (RID*)calloc(sizeof(RID), ridLists[extent].size()); - colStructList = colExtentsStruct[extent]; dctnryStructList = dctnryExtentsStruct[extent]; + cscColTypeList = colExtentsColType[extent]; if (m_opType != DELETE) { - - /* ColTuple colTuple; - ColTupleList colTupleList; - for (i=0; i < colValueList.size(); i++) - { - colTupleList = colValueList[i]; - colTuple = colTupleList[0]; - for (unsigned i = 1; i < ridLists[extent].size(); i++) - { - colTupleList.push_back(colTuple); - } - tmpColValueList.push_back(colTupleList); - } - */ //Tokenize data if needed vector tokenList; @@ -4690,7 +4277,6 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, { // only need to tokenize once dctCol_iter = dctnryValueList[i].begin(); - //col_iter = colValueList[i].begin(); Token token; if (!dctCol_iter->isNull) @@ -4703,14 +4289,7 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, //timer.stop("tokenize"); #endif } - else - { - //if (dctnryStructList[i].dctnryOid == 2001) - // std::cout << " got null token for string " << dctCol_iter->sigValue <sigValue << " op:fbo = " << token.op <<":"< lbids; vector colDataTypes; bool successFlag = true; unsigned width = 0; - int curFbo = 0, curBio, lastFbo = -1; + int curFbo = 0, curBio, lastFbo = -1; rid_iter = ridLists[extent].begin(); RID aRid = *rid_iter; @@ -4777,24 +4347,18 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, } } - //cout << "lbids size = " << lbids.size()<< endl; //#ifdef PROFILE //timer.start("markExtentsInvalid"); //#endif if (lbids.size() > 0) rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes); - //} - - if ( m_opType != DELETE) + if (m_opType != DELETE) m_opType = UPDATE; - rc = writeColumnRec(txnid, colStructList, colValueList, colOldValueList, + rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, colOldValueList, ridLists[extent], tableOid, true, ridLists[extent].size()); -// if (rowIdArray) -// free(rowIdArray); - m_opType = NOOP; if (rc != NO_ERROR) @@ -4805,6 +4369,7 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, } int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid, + const CSCTypesList& cscColTypeList, vector& colExtentsStruct, ColValueList& colValueList, const RIDList& ridLists, @@ -4857,12 +4422,13 @@ int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid, if ( m_opType != DELETE) m_opType = UPDATE; - rc = writeColumnRecords (txnid, colExtentsStruct, colValueList, ridLists, tableOid); + rc = writeColumnRecords (txnid, cscColTypeList, colExtentsStruct, colValueList, ridLists, tableOid); m_opType = NOOP; return rc; } int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, + const CSCTypesList& cscColTypeList, vector& colStructList, ColValueList& colValueList, const RIDList& ridLists, const int32_t tableOid, bool versioning) @@ -5016,7 +4582,7 @@ int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, try { - convertValArray(totalRow, curColStruct.colType, curTupleList, valArray); + convertValArray(totalRow, cscColTypeList[i], curColStruct.colType, curTupleList, valArray); } catch (...) { @@ -5383,6 +4949,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, } // have to init the size here + // TODO MCOL-641 is commenting out the switch statement below correct? valArray = calloc(totalRow2, newColStructList[i].colWidth); /*switch (newColStructList[i].colType) { @@ -5454,7 +5021,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, try { - convertValArray(totalRow2, newColStructList[i].colType, newColValueList[i], valArray); + convertValArray(totalRow2, cscColTypeList[i], newColStructList[i].colType, newColValueList[i], valArray); } catch (...) { @@ -5564,6 +5131,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, // have to init the size here // shared pointers or memory in a stack + // TODO MCOL-641 is commenting out the switch statement below correct? valArray = calloc(totalRow1, colStructList[i].colWidth); // WIP /*switch (colStructList[i].colType) @@ -5688,609 +5256,6 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, return rc; } -int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, - const ColStructList& colStructList, - ColValueList& colValueList, - RID* rowIdArray, - const ColStructList& newColStructList, - ColValueList& newColValueList, - const int32_t tableOid, - bool useTmpSuffix, - bool versioning) -{ - bool bExcp; - int rc = 0; - void* valArray; - string segFile; - Column curCol; - ColTupleList oldTupleList; - ColStructList::size_type totalColumn; - ColStructList::size_type i; - ColTupleList::size_type totalRow1, totalRow2; - - setTransId(txnid); - - totalColumn = colStructList.size(); -#ifdef PROFILE - StopWatch timer; -#endif - - if (newColValueList.size() > 0) - { - totalRow1 = colValueList[0].size(); - totalRow2 = newColValueList[0].size(); - } - else - { - totalRow1 = colValueList[0].size(); - totalRow2 = 0; - } - - TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); - - for (i = 0; i < totalColumn; i++) - { - if (totalRow2 > 0) - { - RID* secondPart = rowIdArray + totalRow1; - - //@Bug 2205 Check if all rows go to the new extent - if (totalRow1 > 0) - { - //Write the first batch - valArray = NULL; - RID* firstPart = rowIdArray; - ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - - // set params - colOp->initColumn(curCol); - // need to pass real dbRoot, partition, and segment to setColParam - colOp->setColParam(curCol, 0, colStructList[i].colWidth, - colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, - colStructList[i].fCompressionType, colStructList[i].fColDbRoot, - colStructList[i].fColPartition, colStructList[i].fColSegment); - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - - while (it != aColExtsInfo.end()) - { - if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) - break; - - it++; - } - - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot = colStructList[i].fColDbRoot; - aExt.partNum = colStructList[i].fColPartition; - aExt.segNum = colStructList[i].fColSegment; - aExt.compType = colStructList[i].fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); - } - - rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file - - if (rc != NO_ERROR) - break; - - // handling versioning - vector rangeList; - - if (versioning) - { - rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], - colStructList[i].colWidth, totalRow1, firstPart, rangeList); - - if (rc != NO_ERROR) - { - if (colStructList[i].fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); - } - - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - break; - } - } - - // WIP We can allocate based on column size and not colType - // have to init the size here - valArray = calloc(totalRow1, colStructList[i].colWidth); -#if 0 - switch (colStructList[i].colType) - { - // WIP we don't need type cast here only size - case WriteEngine::WR_INT: - case WriteEngine::WR_MEDINT: - valArray = (int*) calloc(sizeof(int), totalRow1); - break; - - case WriteEngine::WR_UINT: - case WriteEngine::WR_UMEDINT: - valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1); - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY); - break; - - case WriteEngine::WR_FLOAT: - valArray = (float*) calloc(sizeof(float), totalRow1); - break; - - case WriteEngine::WR_DOUBLE: - valArray = (double*) calloc(sizeof(double), totalRow1); - break; - - case WriteEngine::WR_BYTE: - valArray = (char*) calloc(sizeof(char), totalRow1); - break; - - case WriteEngine::WR_UBYTE: - valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1); - break; - - case WriteEngine::WR_SHORT: - valArray = (short*) calloc(sizeof(short), totalRow1); - break; - - case WriteEngine::WR_USHORT: - valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1); - break; - - case WriteEngine::WR_LONGLONG: - valArray = (long long*) calloc(sizeof(long long), totalRow1); - break; - - case WriteEngine::WR_ULONGLONG: - valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1); - break; - - case WriteEngine::WR_TOKEN: - valArray = (Token*) calloc(sizeof(Token), totalRow1); - break; - - // WIP - case WriteEngine::WR_BINARY: - valArray = calloc(totalRow1, colStructList[i].colWidth); - break; - } -#endif - - // convert values to valArray - // WIP - // Is this m_opType ever set to DELETE? - if (m_opType != DELETE) - { - bExcp = false; - - try - { - // WIP We convert values twice!? - // dmlcommandproc converts strings to boost::any and this converts - // into actual type value masked by *void - // It is not clear why we need to convert to boost::any b/c we can convert from the original string here - convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray); - } - catch (...) - { - bExcp = true; - } - - if (bExcp) - { - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - return ERR_PARSING; - } - -#ifdef PROFILE - iimer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - else - { -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - - colOp->clearColumn(curCol); - - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - if (valArray != NULL) - free(valArray); - - // check error - if (rc != NO_ERROR) - break; - } - - //Process the second batch - valArray = NULL; - - ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)]; - - // set params - colOp->initColumn(curCol); - colOp->setColParam(curCol, 0, newColStructList[i].colWidth, - newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid, - newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot, - newColStructList[i].fColPartition, newColStructList[i].fColSegment); - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - - while (it != aColExtsInfo.end()) - { - if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == newColStructList[i].fColSegment)) - break; - - it++; - } - - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot = newColStructList[i].fColDbRoot; - aExt.partNum = newColStructList[i].fColPartition; - aExt.segNum = newColStructList[i].fColSegment; - aExt.compType = newColStructList[i].fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo); - } - - // Pass "false" for hdfs tmp file flag. Since we only allow 1 - // extent per segment file (with HDFS), we can assume a second - // extent is going to a new file (and won't need tmp file). - rc = colOp->openColumnFile(curCol, segFile, false, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file - - if (rc != NO_ERROR) - break; - - // handling versioning - vector rangeList; - - if (versioning) - { - rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i], - newColStructList[i].colWidth, totalRow2, secondPart, rangeList); - - if (rc != NO_ERROR) - { - if (newColStructList[i].fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); - } - - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - break; - } - } - - // have to init the size here - switch (newColStructList[i].colType) - { - case WriteEngine::WR_INT: - case WriteEngine::WR_MEDINT: - valArray = (int*) calloc(sizeof(int), totalRow2); - break; - - case WriteEngine::WR_UINT: - case WriteEngine::WR_UMEDINT: - valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow2); - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - valArray = (char*) calloc(sizeof(char), totalRow2 * MAX_COLUMN_BOUNDARY); - break; - - case WriteEngine::WR_FLOAT: - valArray = (float*) calloc(sizeof(float), totalRow2); - break; - - case WriteEngine::WR_DOUBLE: - valArray = (double*) calloc(sizeof(double), totalRow2); - break; - - case WriteEngine::WR_BYTE: - valArray = (char*) calloc(sizeof(char), totalRow2); - break; - - case WriteEngine::WR_UBYTE: - valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow2); - break; - - case WriteEngine::WR_SHORT: - valArray = (short*) calloc(sizeof(short), totalRow2); - break; - - case WriteEngine::WR_USHORT: - valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow2); - break; - - case WriteEngine::WR_LONGLONG: - valArray = (long long*) calloc(sizeof(long long), totalRow2); - break; - - case WriteEngine::WR_ULONGLONG: - valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow2); - break; - - case WriteEngine::WR_TOKEN: - valArray = (Token*) calloc(sizeof(Token), totalRow2); - break; - - case WriteEngine::WR_BINARY: - //case WriteEngine::WR_INT128: - // WIP - valArray = calloc(totalRow2, 16); - break; - - } - - // convert values to valArray - if (m_opType != DELETE) - { - bExcp = false; - - try - { - convertValArray(totalRow2, newColStructList[i].colType, newColValueList[i], valArray); - } - catch (...) - { - bExcp = true; - } - - if (bExcp) - { - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - return ERR_PARSING; - } - -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - else - { -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow2, rowIdArray, valArray, true); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - - - colOp->clearColumn(curCol); - - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - if (valArray != NULL) - free(valArray); - - // check error - if (rc != NO_ERROR) - break; - } - else - { - valArray = NULL; - - ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; - - // set params - colOp->initColumn(curCol); - colOp->setColParam(curCol, 0, colStructList[i].colWidth, - colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, - colStructList[i].fCompressionType, colStructList[i].fColDbRoot, - colStructList[i].fColPartition, colStructList[i].fColSegment); - - rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file - - //cout << " Opened file oid " << curCol.dataFile.pFile << endl; - if (rc != NO_ERROR) - break; - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - - while (it != aColExtsInfo.end()) - { - if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) - break; - - it++; - } - - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot = colStructList[i].fColDbRoot; - aExt.partNum = colStructList[i].fColPartition; - aExt.segNum = colStructList[i].fColSegment; - aExt.compType = colStructList[i].fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); - } - - // handling versioning - vector rangeList; - - if (versioning) - { - rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], - colStructList[i].colWidth, totalRow1, rowIdArray, rangeList); - - if (rc != NO_ERROR) - { - if (colStructList[i].fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); - } - - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - break; - } - } - - // have to init the size here -// nullArray = (bool*) malloc(sizeof(bool) * totalRow); - switch (colStructList[i].colType) - { - case WriteEngine::WR_INT: - case WriteEngine::WR_MEDINT: - valArray = (int*) calloc(sizeof(int), totalRow1); - break; - - case WriteEngine::WR_UINT: - case WriteEngine::WR_UMEDINT: - valArray = (uint32_t*) calloc(sizeof(uint32_t), totalRow1); - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - valArray = (char*) calloc(sizeof(char), totalRow1 * MAX_COLUMN_BOUNDARY); - break; - - case WriteEngine::WR_FLOAT: - valArray = (float*) calloc(sizeof(float), totalRow1); - break; - - case WriteEngine::WR_DOUBLE: - valArray = (double*) calloc(sizeof(double), totalRow1); - break; - - case WriteEngine::WR_BYTE: - valArray = (char*) calloc(sizeof(char), totalRow1); - break; - - case WriteEngine::WR_UBYTE: - valArray = (uint8_t*) calloc(sizeof(uint8_t), totalRow1); - break; - - case WriteEngine::WR_SHORT: - valArray = (short*) calloc(sizeof(short), totalRow1); - break; - - case WriteEngine::WR_USHORT: - valArray = (uint16_t*) calloc(sizeof(uint16_t), totalRow1); - break; - - case WriteEngine::WR_LONGLONG: - valArray = (long long*) calloc(sizeof(long long), totalRow1); - break; - - case WriteEngine::WR_ULONGLONG: - valArray = (uint64_t*) calloc(sizeof(uint64_t), totalRow1); - break; - - case WriteEngine::WR_TOKEN: - valArray = (Token*) calloc(sizeof(Token), totalRow1); - break; - - case WriteEngine::WR_BINARY: - //case WriteEngine::WR_INT128: - valArray = calloc(colStructList[i].colWidth, totalRow1); - break; - } - - // convert values to valArray - if (m_opType != DELETE) - { - bExcp = false; - - try - { - convertValArray(totalRow1, colStructList[i].colType, colValueList[i], valArray); - } - catch (...) - { - bExcp = true; - } - - if (bExcp) - { - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - return ERR_PARSING; - } - -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - else - { -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, true); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - - colOp->clearColumn(curCol); - - if (versioning) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); - - if (valArray != NULL) - free(valArray); - - // check error - if (rc != NO_ERROR) - break; - } - } // end of for (i = 0 - -#ifdef PROFILE - timer.finish(); -#endif - return rc; -} - - - int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, std::vector& colValueList, @@ -6637,6 +5602,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, } int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, + const CSCTypesList& cscColTypeList, const ColStructList& colStructList, const ColValueList& colValueList, vector& colOldValueList, @@ -6759,11 +5725,6 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, } // handling versioning - //cout << " pass to processVersionBuffer rid " << rowIdArray[0] << endl; - //cout << "dataOid:fColPartition = " << curColStruct.dataOid << ":" << curColStruct.fColPartition << endl; -//timer.start("processVersionBuffers"); - //vector rangeList; - // rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow, ridList, rangeList); std::vector curFreeList; uint32_t blockUsed = 0; @@ -6777,7 +5738,6 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; aRange.size = rangeLists[i].size(); curFreeList.push_back(aRange); - //cout << "range size = " << aRange.size <<" and blocksProcessed = " << blocksProcessed<< endl; } else { @@ -6800,14 +5760,10 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, rc = 1; break; } - - //cout << "curFreeList size = " << curFreeList.size() << endl; - } blocksProcessed += rangeLists[i].size(); - //timer.start("Delete:writeVB"); rc = BRMWrapper::getInstance()-> writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid, curColStruct.dataOid, fboLists[i], rangeLists[i], @@ -6815,9 +5771,6 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, } } - //timer.stop("Delete:writeVB"); -//timer.stop("processVersionBuffers"); - // cout << " rc for processVersionBuffer is " << rc << endl; if (rc != NO_ERROR) { if (curColStruct.fCompressionType == 0) @@ -6886,8 +5839,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, valArray = (Token*) calloc(sizeof(Token), 1); break; case WriteEngine::WR_BINARY: - //case WriteEngine::WR_INT128: - valArray = calloc(sizeof(char), curColStruct.colWidth); //FIXME maybe + valArray = calloc(1, curColStruct.colWidth); break; } @@ -6900,7 +5852,7 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, try { - convertValue(curColStruct.colType, valArray, curTuple.data); + convertValue(cscColTypeList[i], curColStruct.colType, valArray, curTuple.data); } catch (...) { @@ -6934,7 +5886,6 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, #endif } -// colOldValueList.push_back(oldValArray); //timer.start("Delete:closefile"); colOp->clearColumn(curCol); @@ -6952,340 +5903,6 @@ int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0)) cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); -//if (idbdatafile::IDBPolicy::useHdfs()) -// cacheutils::dropPrimProcFdCache(); -//timer.stop("Delete:purgePrimProcFdCache"); - if (rangeListTot.size() > 0) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); - -//timer.stop("Delete:writecolrec"); -//#ifdef PROFILE -//timer.finish(); -//#endif - return rc; -} - - - -int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, - const CSCTypesList& cscColTypes, - const ColStructList& colStructList, - const ColValueList& colValueList, - vector& colOldValueList, - const RIDList& ridList, - const int32_t tableOid, - bool convertStructFlag, - ColTupleList::size_type nRows) -{ - bool bExcp; - int rc = 0; - void* valArray = NULL; - Column curCol; - ColStruct curColStruct; - ColTupleList curTupleList, oldTupleList; - ColStructList::size_type totalColumn; - ColStructList::size_type i; - ColTupleList::size_type totalRow; - - setTransId(txnid); - colOldValueList.clear(); - totalColumn = colStructList.size(); - totalRow = nRows; - -#ifdef PROFILE - StopWatch timer; -#endif - - vector rangeListTot; - std::vector freeList; - vector > fboLists; - vector > rangeLists; - rc = processBeginVBCopy(txnid, colStructList, ridList, freeList, fboLists, rangeLists, rangeListTot); - - if (rc != NO_ERROR) - { - if (rangeListTot.size() > 0) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); - - switch (rc) - { - case BRM::ERR_DEADLOCK: - return ERR_BRM_DEAD_LOCK; - - case BRM::ERR_VBBM_OVERFLOW: - return ERR_BRM_VB_OVERFLOW; - - case BRM::ERR_NETWORK: - return ERR_BRM_NETWORK; - - case BRM::ERR_READONLY: - return ERR_BRM_READONLY; - - default: - return ERR_BRM_BEGIN_COPY; - } - } - - VBRange aRange; - uint32_t blocksProcessedThisOid = 0; - uint32_t blocksProcessed = 0; - std::vector files; - TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); - - for (i = 0; i < totalColumn; i++) - { - valArray = NULL; - curColStruct = colStructList[i]; - curTupleList = colValueList[i]; //same value for all rows - ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)]; - - // convert column data type - if (convertStructFlag) - Convertor::convertColType(&curColStruct); - - // set params - colOp->initColumn(curCol); - colOp->setColParam(curCol, 0, curColStruct.colWidth, - curColStruct.colDataType, curColStruct.colType, curColStruct.dataOid, - curColStruct.fCompressionType, curColStruct.fColDbRoot, - curColStruct.fColPartition, curColStruct.fColSegment); - - - ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid); - ColExtsInfo::iterator it = aColExtsInfo.begin(); - - while (it != aColExtsInfo.end()) - { - if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) && (it->segNum == curColStruct.fColSegment)) - break; - - it++; - } - - if (it == aColExtsInfo.end()) //add this one to the list - { - ColExtInfo aExt; - aExt.dbRoot = curColStruct.fColDbRoot; - aExt.partNum = curColStruct.fColPartition; - aExt.segNum = curColStruct.fColSegment; - aExt.compType = curColStruct.fCompressionType; - aColExtsInfo.push_back(aExt); - aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); - } - - string segFile; - rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file - - if (rc != NO_ERROR) - break; - - if (curColStruct.fCompressionType == 0) - { - BRM::FileInfo aFile; - aFile.oid = curColStruct.dataOid; - aFile.partitionNum = curColStruct.fColPartition; - aFile.dbRoot = curColStruct.fColDbRoot;; - aFile.segmentNum = curColStruct.fColSegment; - aFile.compType = curColStruct.fCompressionType; - files.push_back(aFile); - } - - // handling versioning - //cout << " pass to processVersionBuffer rid " << rowIdArray[0] << endl; - //cout << "dataOid:fColPartition = " << curColStruct.dataOid << ":" << curColStruct.fColPartition << endl; -//timer.start("processVersionBuffers"); - //vector rangeList; - // rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow, ridList, rangeList); - std::vector curFreeList; - uint32_t blockUsed = 0; - - if (!idbdatafile::IDBPolicy::useHdfs()) - { - if (rangeListTot.size() > 0) - { - if (freeList[0].size >= (blocksProcessed + rangeLists[i].size())) - { - aRange.vbOID = freeList[0].vbOID; - aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; - aRange.size = rangeLists[i].size(); - curFreeList.push_back(aRange); - //cout << "range size = " << aRange.size <<" and blocksProcessed = " << blocksProcessed<< endl; - } - else - { - aRange.vbOID = freeList[0].vbOID; - aRange.vbFBO = freeList[0].vbFBO + blocksProcessed; - aRange.size = freeList[0].size - blocksProcessed; - blockUsed = aRange.size; - curFreeList.push_back(aRange); - - if (freeList.size() > 1) - { - aRange.vbOID = freeList[1].vbOID; - aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid; - aRange.size = rangeLists[i].size() - blockUsed; - curFreeList.push_back(aRange); - blocksProcessedThisOid += aRange.size; - } - else - { - rc = 1; - break; - } - - //cout << "curFreeList size = " << curFreeList.size() << endl; - - } - - blocksProcessed += rangeLists[i].size(); - - //timer.start("Delete:writeVB"); - rc = BRMWrapper::getInstance()-> - writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid, - curColStruct.dataOid, fboLists[i], rangeLists[i], - colOp, curFreeList, curColStruct.fColDbRoot, true); - } - } - - //timer.stop("Delete:writeVB"); -//timer.stop("processVersionBuffers"); - // cout << " rc for processVersionBuffer is " << rc << endl; - if (rc != NO_ERROR) - { - if (curColStruct.fCompressionType == 0) - { - curCol.dataFile.pFile->flush(); - } - - if (rangeListTot.size() > 0) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); - - break; - } - - switch (curColStruct.colType) - { - case WriteEngine::WR_INT: - case WriteEngine::WR_MEDINT: - valArray = (int*) calloc(sizeof(int), 1); - break; - - case WriteEngine::WR_UINT: - case WriteEngine::WR_UMEDINT: - valArray = (uint32_t*) calloc(sizeof(uint32_t), 1); - break; - - case WriteEngine::WR_VARBINARY : // treat same as char for now - case WriteEngine::WR_CHAR: - case WriteEngine::WR_BLOB: - case WriteEngine::WR_TEXT: - valArray = (char*) calloc(sizeof(char), 1 * MAX_COLUMN_BOUNDARY); - break; - - case WriteEngine::WR_FLOAT: - valArray = (float*) calloc(sizeof(float), 1); - break; - - case WriteEngine::WR_DOUBLE: - valArray = (double*) calloc(sizeof(double), 1); - break; - - case WriteEngine::WR_BYTE: - valArray = (char*) calloc(sizeof(char), 1); - break; - - case WriteEngine::WR_UBYTE: - valArray = (uint8_t*) calloc(sizeof(uint8_t), 1); - break; - - case WriteEngine::WR_SHORT: - valArray = (short*) calloc(sizeof(short), 1); - break; - - case WriteEngine::WR_USHORT: - valArray = (uint16_t*) calloc(sizeof(uint16_t), 1); - break; - - case WriteEngine::WR_LONGLONG: - valArray = (long long*) calloc(sizeof(long long), 1); - break; - - case WriteEngine::WR_ULONGLONG: - valArray = (uint64_t*) calloc(sizeof(uint64_t), 1); - break; - - case WriteEngine::WR_TOKEN: - valArray = (Token*) calloc(sizeof(Token), 1); - break; - case WriteEngine::WR_BINARY: - //case WriteEngine::WR_INT128: - valArray = calloc(sizeof(char), curColStruct.colWidth); //FIXME maybe - break; - } - - // convert values to valArray - if (m_opType != DELETE) - { - bExcp = false; - ColTuple curTuple; - curTuple = curTupleList[0]; - - try - { - convertValue(curColStruct.colType, valArray, curTuple.data); - } - catch (...) - { - bExcp = true; - } - - if (bExcp) - { - if (rangeListTot.size() > 0) - BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); - - return ERR_PARSING; - } - -#ifdef PROFILE - timer.start("writeRow "); -#endif - rc = colOp->writeRows(curCol, totalRow, ridList, valArray); -#ifdef PROFILE - timer.stop("writeRow "); -#endif - } - else - { -#ifdef PROFILE - timer.start("writeRows "); -#endif - rc = colOp->writeRows(curCol, totalRow, ridList, valArray, 0, true); -#ifdef PROFILE - timer.stop("writeRows "); -#endif - } - -// colOldValueList.push_back(oldValArray); -//timer.start("Delete:closefile"); - colOp->clearColumn(curCol); - -//timer.stop("Delete:closefile"); - if (valArray != NULL) - free(valArray); - - // check error - if (rc != NO_ERROR) - break; - - } // end of for (i = 0) - -// timer.start("Delete:purgePrimProcFdCache"); - if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0)) - cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID()); - -//if (idbdatafile::IDBPolicy::useHdfs()) -// cacheutils::dropPrimProcFdCache(); //timer.stop("Delete:purgePrimProcFdCache"); if (rangeListTot.size() > 0) BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot); @@ -7750,17 +6367,20 @@ int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, ColValueList colValueList; WriteEngine::ColTupleList colTuples; ColStructList colStructList; + WriteEngine::CSCTypesList cscColTypeList; WriteEngine::ColStruct colStruct; - colStruct.dataOid = OID_SYSCOLUMN_NEXTVALUE; - colStruct.colWidth = 8; + CalpontSystemCatalog::ColType colType; + colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_NEXTVALUE; + colType.colWidth = colStruct.colWidth = 8; colStruct.tokenFlag = false; - colStruct.colDataType = CalpontSystemCatalog::UBIGINT; + colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::UBIGINT; colStruct.fColDbRoot = dbRoot; if (idbdatafile::IDBPolicy::useHdfs()) colStruct.fCompressionType = 2; colStructList.push_back(colStruct); + cscColTypeList.push_back(colType); ColTuple colTuple; systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); systemCatalogPtr->identity(CalpontSystemCatalog::EC); @@ -7782,7 +6402,7 @@ int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, colTuple.data = nextVal; colTuples.push_back(colTuple); colValueList.push_back(colTuples); - rc = writeColumnRecords(txnId, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false); + rc = writeColumnRecords(txnId, cscColTypeList, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false); if (rc != NO_ERROR) return rc; diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index f3ca7ec4e..95de97e8d 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -163,12 +163,6 @@ public: ColTupleList& curTupleList, void* valArray, bool bFromList = true) ; - // WIP legacy - EXPORT void convertValArray(const size_t totalRow, - const ColType colType, - ColTupleList& curTupleList, void* valArray, - bool bFromList = true) ; - /** * @brief Create a column, include object ids for column data and bitmap files * @param dataOid column datafile object id @@ -195,8 +189,8 @@ public: * @param refColDataType Data-type of the referecne column * @param refColWidth Width of the reference column */ - EXPORT int fillColumn(const TxnID& txnid, const OID& dataOid, execplan::CalpontSystemCatalog::ColDataType dataType, - int dataWidth, ColTuple defaultVal, + EXPORT int fillColumn(const TxnID& txnid, const OID& dataOid, const execplan::CalpontSystemCatalog::ColType& colType, + ColTuple defaultVal, const OID& refColOID, execplan::CalpontSystemCatalog::ColDataType refColDataType, int refColWidth, int refCompressionType, bool isNULL, int compressionType, const std::string& defaultValStr, const OID& dictOid = 0, bool autoincrement = false); @@ -230,7 +224,7 @@ public: * @param colOldValueList column old values list (return value) * @param rowIdList row id list */ - EXPORT int deleteRow(const TxnID& txnid, std::vector& colExtentsStruct, + EXPORT int deleteRow(const TxnID& txnid, const std::vector& colExtentsColType, std::vector& colExtentsStruct, std::vector& colOldValueList, std::vector& ridLists, const int32_t tableOid); /** @@ -326,6 +320,7 @@ public: * @param isFirstBatchPm to track if this batch is first batch for this PM. */ EXPORT int insertColumnRecs(const TxnID& txnid, + const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -359,6 +354,7 @@ public: * @param dicStringListt dictionary values list */ EXPORT int insertColumnRec_SYS(const TxnID& txnid, + const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -372,7 +368,7 @@ public: * @param dicStringListt dictionary values list */ EXPORT int insertColumnRec_Single(const TxnID& txnid, - CSCTypesList& cscColTypesList, + const CSCTypesList& cscColTypeList, ColStructList& colStructList, ColValueList& colValueList, DctnryStructList& dctnryStructList, @@ -550,6 +546,7 @@ public: * @param ridList row id list */ EXPORT int updateColumnRec(const TxnID& txnid, + const std::vector& colExtentsColType, std::vector& colExtentsStruct, ColValueList& colValueList, std::vector& colOldValueList, @@ -566,6 +563,7 @@ public: */ EXPORT int updateColumnRecs(const TxnID& txnid, + const CSCTypesList& cscColTypeList, std::vector& colStructList, ColValueList& colValueList, const RIDList& ridLists, @@ -657,10 +655,10 @@ private: void findSmallestColumn(uint32_t &colId, ColStructList colStructList); /** - * @brief Convert interface column type to a internal column type + * @brief Convert interface column type to an internal column type */ - void convertValue(const execplan::CalpontSystemCatalog::ColType &fullColType, ColType colType, void* valArray, size_t pos, boost::any& data, bool fromList = true); - void convertValue(const ColType colType, void* valArray, size_t pos, boost::any& data, bool fromList = true); + void convertValue(const execplan::CalpontSystemCatalog::ColType& cscColType, ColType colType, void* valArray, size_t pos, boost::any& data, bool fromList = true); + /** * @brief Convert column value to its internal representation * @@ -668,8 +666,7 @@ private: * @param value Memory pointer for storing output value. Should be pre-allocated * @param data Column data */ - void convertValue(const execplan::CalpontSystemCatalog::ColType &fullColType, const ColType colType, void* value, boost::any& data); - void convertValue(const ColType colType, void* value, boost::any& data); + void convertValue(const execplan::CalpontSystemCatalog::ColType& cscColType, const ColType colType, void* value, boost::any& data); /** * @brief Print input value from DDL/DML processors @@ -705,14 +702,6 @@ private: RID* rowIdArray, const ColStructList& newColStructList, ColValueList& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning = true); - // WIP - int writeColumnRec(const TxnID& txnid, - const ColStructList& colStructList, - ColValueList& colValueList, - RID* rowIdArray, const ColStructList& newColStructList, - ColValueList& newColValueList, const int32_t tableOid, - bool useTmpSuffix, bool versioning = true); - int writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, std::vector& colValueList, @@ -721,24 +710,17 @@ private: const int32_t tableOid, bool useTmpSuffix, bool versioning = true); - //@Bug 1886,2870 pass the address of ridList vector int writeColumnRec(const TxnID& txnid, - const CSCTypesList& cscColTypes, + const CSCTypesList& cscColTypeList, const ColStructList& colStructList, const ColValueList& colValueList, std::vector& colOldValueList, const RIDList& ridList, const int32_t tableOid, bool convertStructFlag = true, ColTupleList::size_type nRows = 0); - // WIP legacy - int writeColumnRec(const TxnID& txnid, - const ColStructList& colStructList, - const ColValueList& colValueList, std::vector& colOldValueList, - const RIDList& ridList, const int32_t tableOid, - bool convertStructFlag = true, ColTupleList::size_type nRows = 0); - //For update column from column to use - int writeColumnRecords(const TxnID& txnid, std::vector& colStructList, + int writeColumnRecords(const TxnID& txnid, const CSCTypesList& cscColTypeList, + std::vector& colStructList, ColValueList& colValueList, const RIDList& ridLists, const int32_t tableOid, bool versioning = true);