diff --git a/dbcon/ddlpackageproc/altertableprocessor.cpp b/dbcon/ddlpackageproc/altertableprocessor.cpp index 86457fb57..0bc6b26b9 100644 --- a/dbcon/ddlpackageproc/altertableprocessor.cpp +++ b/dbcon/ddlpackageproc/altertableprocessor.cpp @@ -682,7 +682,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC throw std::runtime_error(err); } - if (columnOid > 0) // Column exists already + if ((columnOid > 0) && (columnOid != OID_SYSTABLE_AUXCOLUMNOID)) // Column exists already { err = err + "Internal add column error for " + tableColName.schema + "." + tableColName.table + "." + tableColName.column + ". Column exists already. Your table is probably out-of-sync"; @@ -747,6 +747,22 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC { fStartingColOID = OID_SYSTABLE_AUTOINCREMENT; } + else if ((inTableName.fName == SYSTABLE_TABLE) && (columnDefPtr->fName == AUXCOLUMNOID_COL)) + { + fStartingColOID = OID_SYSTABLE_AUXCOLUMNOID; + + if (!columnDefPtr->fDefaultValue || columnDefPtr->fDefaultValue->fValue != "0" || + columnDefPtr->fConstraints.size() != 1 || !columnDefPtr->fConstraints[0] || + columnDefPtr->fConstraints[0]->fConstraintType != ddlpackage::DDL_NOT_NULL || + columnDefPtr->fType->fType != CalpontSystemCatalog::INT || + columnDefPtr->fType->fLength != 4) + { + std::ostringstream oss; + oss << "Error adding " << AUXCOLUMNOID_COL << " column to calpontsys table."; + oss << " Column should be of type integer, added with NOT NULL constraint and default value of 0"; + throw std::runtime_error(oss.str()); + } + } else if ((inTableName.fName == SYSCOLUMN_TABLE) && (columnDefPtr->fName == COMPRESSIONTYPE_COL)) { fStartingColOID = OID_SYSCOLUMN_COMPRESSIONTYPE; @@ -1003,12 +1019,6 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC else bs << (uint32_t)0; - // new column info - bs << (ByteStream::byte)dataType; - bs << (ByteStream::byte)autoincrement; - bs << (uint32_t)columnDefPtr->fType->fLength; - bs << (uint32_t)columnDefPtr->fType->fScale; - bs << (uint32_t)columnDefPtr->fType->fPrecision; std::string tmpStr(""); if (columnDefPtr->fDefaultValue) @@ -1016,6 +1026,13 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC tmpStr = columnDefPtr->fDefaultValue->fValue; } + // new column info + bs << (ByteStream::byte)dataType; + bs << (ByteStream::byte)autoincrement; + bs << (uint32_t)columnDefPtr->fType->fLength; + bs << (uint32_t)columnDefPtr->fType->fScale; + bs << (uint32_t)columnDefPtr->fType->fPrecision; + bs << tmpStr; bs << (ByteStream::byte)columnDefPtr->fType->fCompressiontype; // ref column info diff --git a/dbcon/ddlpackageproc/createtableprocessor.cpp b/dbcon/ddlpackageproc/createtableprocessor.cpp index f0ff32be7..5677cb569 100644 --- a/dbcon/ddlpackageproc/createtableprocessor.cpp +++ b/dbcon/ddlpackageproc/createtableprocessor.cpp @@ -267,12 +267,17 @@ keepGoing: numDictCols++; } - fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + - 1); // include column, oids,dictionary oids and tableoid + // Include column oids, dictionary oids, tableoid, and + // also include AUX oid as of MCOL-5021 + fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 2); #ifdef IDB_DDL_DEBUG cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl; #endif + uint32_t size = numColumns + numDictCols; + idbassert(size > 0); + size += 1; // MCOL-5021 + if (fStartingColOID < 0) { result.result = CREATE_ERROR; @@ -295,6 +300,7 @@ keepGoing: bytestream << (uint32_t)createTableStmt.fSessionID; bytestream << (uint32_t)txnID.id; bytestream << (uint32_t)fStartingColOID; + bytestream << (uint32_t)(fStartingColOID + size); bytestream << (uint32_t)createTableStmt.fTableWithAutoi; uint16_t dbRoot; BRM::OID_t sysOid = 1001; @@ -537,7 +543,7 @@ keepGoing: bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES; bytestream << uniqueId; bytestream << (uint32_t)txnID.id; - bytestream << (numColumns + numDictCols); + bytestream << size; unsigned colNum = 0; unsigned dictNum = 0; @@ -601,6 +607,15 @@ keepGoing: ++iter; } + // MCOL-5021 + // TODO compressionType is hardcoded to 2 (SNAPPY) + bytestream << (fStartingColOID + size); + bytestream << (uint8_t)datatypes::SystemCatalog::UTINYINT; + bytestream << (uint8_t) false; + bytestream << (uint32_t)1; + bytestream << (uint16_t)useDBRoot; + bytestream << (uint32_t)2; + //@Bug 4176. save oids to a log file for cleanup after fail over. std::vector oidList; @@ -616,6 +631,9 @@ keepGoing: oidList.push_back(fStartingColOID + numColumns + i + 1); } + // MCOL-5021 + oidList.push_back(fStartingColOID + size); + try { createWriteDropLogFile(fStartingColOID, uniqueId, oidList); @@ -683,9 +701,9 @@ keepGoing: bytestream.restart(); bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES; bytestream << uniqueId; - bytestream << (uint32_t)(numColumns + numDictCols); + bytestream << (uint32_t)size; - for (unsigned i = 0; i < (numColumns + numDictCols); i++) + for (unsigned i = 0; i < size; i++) { bytestream << (uint32_t)(fStartingColOID + i + 1); } diff --git a/dbcon/ddlpackageproc/droptableprocessor.cpp b/dbcon/ddlpackageproc/droptableprocessor.cpp index 4cfc79467..9de3389b3 100644 --- a/dbcon/ddlpackageproc/droptableprocessor.cpp +++ b/dbcon/ddlpackageproc/droptableprocessor.cpp @@ -89,6 +89,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( CalpontSystemCatalog::RIDList tableColRidList; CalpontSystemCatalog::DictOIDList dictOIDList; execplan::CalpontSystemCatalog::ROPair roPair; + CalpontSystemCatalog::OID tableAUXColOid; std::string errorMsg; ByteStream bytestream; uint64_t uniqueId = 0; @@ -145,6 +146,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( try { roPair = systemCatalogPtr->tableRID(tableName); + tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); } catch (IDBExcept& ie) { @@ -580,6 +582,18 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage( return result; } + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + if (tableAUXColOid > 3000) + { + oidList.push_back(tableAUXColOid); + CalpontSystemCatalog::ROPair auxRoPair; + auxRoPair.rid = 0; + auxRoPair.objnum = tableAUXColOid; + tableColRidList.push_back(auxRoPair); + } + // Save the oids to a file try { diff --git a/dbcon/execplan/calpontsystemcatalog.cpp b/dbcon/execplan/calpontsystemcatalog.cpp index 2be3c6c7b..8304a7fdc 100644 --- a/dbcon/execplan/calpontsystemcatalog.cpp +++ b/dbcon/execplan/calpontsystemcatalog.cpp @@ -3607,6 +3607,140 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::tableRID(const TableNam throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args); } +// This function is similar to CalpontSystemCatalog::tableRID, except that +// instead of returning a ROPair for the table, it returns the OID for the +// AUX column for the table +CalpontSystemCatalog::OID CalpontSystemCatalog::tableAUXColumnOID(const TableName& tableName, + int lower_case_table_names) +{ + TableName aTableName; + aTableName.schema = tableName.schema; + aTableName.table = tableName.table; + if (lower_case_table_names) + { + boost::algorithm::to_lower(aTableName.schema); + boost::algorithm::to_lower(aTableName.table); + } + + if (aTableName.schema.compare(CALPONT_SCHEMA) == 0) + { + std::ostringstream oss; + oss << "tableAUXColumnOID() cannot be called on a "; + oss << CALPONT_SCHEMA << " schema table"; + throw runtime_error(oss.str()); + } + + DEBUG << "Enter tableAUXColumnOID: " << tableName.schema << "|" << tableName.table << endl; + + // look up the OID in the cache first + OID auxColOid; + + checkSysCatVer(); + + boost::mutex::scoped_lock lk1(fTableAUXColumnOIDMapLock); + TableOIDmap::const_iterator iter = fTableAUXColumnOIDMap.find(aTableName); + + if (iter != fTableAUXColumnOIDMap.end()) + { + auxColOid = iter->second; + return auxColOid; + } + + lk1.unlock(); + + // select auxcolumnoid from systable where schema = tableName.schema and tablename = tableName.table; + CalpontSelectExecutionPlan csep; + CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList; + CalpontSelectExecutionPlan::FilterTokenList filterTokenList; + CalpontSelectExecutionPlan::ColumnMap colMap; + + SimpleColumn* c1 = + new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, fSessionID); + SimpleColumn* c2 = + new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + SCHEMA_COL, fSessionID); + SimpleColumn* c3 = + new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + TABLENAME_COL, fSessionID); + + SRCP srcp; + srcp.reset(c1); + colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, srcp)); + srcp.reset(c2); + colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + SCHEMA_COL, srcp)); + srcp.reset(c3); + colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + TABLENAME_COL, srcp)); + csep.columnMapNonStatic(colMap); + + srcp.reset(c1->clone()); + returnedColumnList.push_back(srcp); + csep.returnedCols(returnedColumnList); + OID oid = c1->oid(); + + // Filters + SimpleFilter* f1 = + new SimpleFilter(opeq, c2->clone(), new ConstantColumn(aTableName.schema, ConstantColumn::LITERAL)); + filterTokenList.push_back(f1); + filterTokenList.push_back(new Operator("and")); + + SimpleFilter* f2 = + new SimpleFilter(opeq, c3->clone(), new ConstantColumn(aTableName.table, ConstantColumn::LITERAL)); + filterTokenList.push_back(f2); + csep.filterTokenList(filterTokenList); + + ostringstream oss; + oss << "select auxcolumnoid from systable where schema='" << aTableName.schema << "' and tablename='" + << aTableName.table << "' --tableAUXColumnOID/"; + + csep.data(oss.str()); //@bug 6078. Log the statement + + if (fIdentity == EC) + oss << "EC"; + else + oss << "FE"; + + NJLSysDataList sysDataList; + + try + { + getSysData(csep, sysDataList, SYSTABLE_TABLE); + } + catch (IDBExcept&) + { + throw; + } + catch (runtime_error& e) + { + throw runtime_error(e.what()); + } + + vector::const_iterator it; + + for (it = sysDataList.begin(); it != sysDataList.end(); it++) + { + if ((*it)->dataCount() == 0) + { + Message::Args args; + args.add("'" + tableName.schema + "." + tableName.table + "'"); + // throw logging::NoTableExcept(msg); + throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args); + } + + if ((*it)->ColumnOID() == oid) + { + auxColOid = (OID)((*it)->GetData(0)); + + // populate cache + lk1.lock(); + fTableAUXColumnOIDMap[aTableName] = auxColOid; + return auxColOid; + } + } + + Message::Args args; + args.add("'" + tableName.schema + "." + tableName.table + "'"); + // throw logging::NoTableExcept(msg); + throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args); +} + #if 0 const CalpontSystemCatalog::IndexNameList CalpontSystemCatalog::indexNames(const TableName& tableName) { @@ -5640,6 +5774,10 @@ void CalpontSystemCatalog::flushCache() buildSysTablemap(); lk3.unlock(); + boost::mutex::scoped_lock auxlk(fTableAUXColumnOIDMapLock); + fTableAUXColumnOIDMap.clear(); + auxlk.unlock(); + boost::recursive_mutex::scoped_lock lk4(fDctTokenMapLock); fDctTokenMap.clear(); buildSysDctmap(); @@ -5774,6 +5912,14 @@ void CalpontSystemCatalog::buildSysColinfomap() aCol.columnOID = OID_SYSTABLE_AUTOINCREMENT; fColinfomap[aCol.columnOID] = aCol; + aCol.colWidth = 4; + aCol.constraintType = NOTNULL_CONSTRAINT; + aCol.colDataType = INT; + aCol.ddn = notDict; + aCol.colPosition++; + aCol.columnOID = OID_SYSTABLE_AUXCOLUMNOID; + fColinfomap[aCol.columnOID] = aCol; + fTablemap[make_table(CALPONT_SCHEMA, SYSCOLUMN_TABLE)] = SYSCOLUMN_BASE; aCol.colWidth = 129; // @bug 4433 @@ -5984,6 +6130,7 @@ void CalpontSystemCatalog::buildSysOIDmap() fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AVGROWLEN_COL)] = OID_SYSTABLE_AVGROWLEN; fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, NUMOFBLOCKS_COL)] = OID_SYSTABLE_NUMOFBLOCKS; fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AUTOINC_COL)] = OID_SYSTABLE_AUTOINCREMENT; + fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AUXCOLUMNOID_COL)] = OID_SYSTABLE_AUXCOLUMNOID; fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, SCHEMA_COL)] = OID_SYSCOLUMN_SCHEMA; fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, TABLENAME_COL)] = OID_SYSCOLUMN_TABLENAME; fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, COLNAME_COL)] = OID_SYSCOLUMN_COLNAME; diff --git a/dbcon/execplan/calpontsystemcatalog.h b/dbcon/execplan/calpontsystemcatalog.h index 6c2c19773..eabce30a4 100644 --- a/dbcon/execplan/calpontsystemcatalog.h +++ b/dbcon/execplan/calpontsystemcatalog.h @@ -659,6 +659,12 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog */ const ROPair tableRID(const TableName& tableName, int lower_case_table_names = 0); + /** return the OID of the table's AUX column + * + * returns the OID of the table's AUX column + */ + OID tableAUXColumnOID(const TableName& tableName, int lower_case_table_names = 0); + /** return the RID of the index for a table * * returns the RID of the indexes for a table @@ -863,6 +869,10 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog typedef std::map Tablemap; Tablemap fTablemap; + typedef std::map TableOIDmap; + TableOIDmap fTableAUXColumnOIDMap; + boost::mutex fTableAUXColumnOIDMapLock; + typedef std::map Colinfomap; Colinfomap fColinfomap; boost::mutex fColinfomapLock; @@ -1159,6 +1169,7 @@ const std::string MINVALUE_COL = "minvalue"; const std::string MAXVALUE_COL = "maxvalue"; const std::string COMPRESSIONTYPE_COL = "compressiontype"; const std::string NEXTVALUE_COL = "nextvalue"; +const std::string AUXCOLUMNOID_COL = "auxcolumnoid"; /***************************************************** * System tables OID definition @@ -1182,7 +1193,8 @@ const int OID_SYSTABLE_NUMOFROWS = SYSTABLE_BASE + 8; /** @brief total num const int OID_SYSTABLE_AVGROWLEN = SYSTABLE_BASE + 9; /** @brief avg. row length column */ const int OID_SYSTABLE_NUMOFBLOCKS = SYSTABLE_BASE + 10; /** @brief num. of blocks column */ const int OID_SYSTABLE_AUTOINCREMENT = SYSTABLE_BASE + 11; /** @brief AUTOINCREMENT column */ -const int SYSTABLE_MAX = SYSTABLE_BASE + 12; // be sure this is one more than the highest # +const int OID_SYSTABLE_AUXCOLUMNOID = SYSTABLE_BASE + 12; /** @brief AUXCOLUMNOID column */ +const int SYSTABLE_MAX = SYSTABLE_BASE + 13; // be sure this is one more than the highest # /***************************************************** * SYSCOLUMN columns OID definition diff --git a/dbcon/joblist/joblist.cpp b/dbcon/joblist/joblist.cpp index 7adc6a980..2fa74adbb 100644 --- a/dbcon/joblist/joblist.cpp +++ b/dbcon/joblist/joblist.cpp @@ -1198,9 +1198,9 @@ string JobList::toString() const for (i = 0; i < fQuery.size(); i++) ret += fQuery[i]->toString(); - // ret += "\nProjection Steps:\n"; - // for (i = 0; i < fProject.size(); i++) - // ret += fProject[i]->toString(); + ret += "\nProjection Steps:\n"; + for (i = 0; i < fProject.size(); i++) + ret += fProject[i]->toString(); ret += "\n"; return ret; } diff --git a/dbcon/joblist/joblistfactory.cpp b/dbcon/joblist/joblistfactory.cpp index 01c6fa29c..36f263dfc 100644 --- a/dbcon/joblist/joblistfactory.cpp +++ b/dbcon/joblist/joblistfactory.cpp @@ -2313,6 +2313,7 @@ SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* r ret->errorInfo(errorInfo); } + std::cout<toString()<(fJobInfo.getJob()); string logFile, errlogFile; logFile = std::string(MCSLOGDIR) + "/cpimport/" + "Job_" + Convertor::int2Str(curJob.id) + LOG_SUFFIX; errlogFile = @@ -314,6 +314,84 @@ int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc rc, MSGLVL_ERROR); return rc; } + + // MCOL-5021 + execplan::CalpontSystemCatalog::OID tableAUXColOid; + std::string tblName; + std::string curTblName = curJob.jobTableList[i].tblName; + string::size_type startName = curTblName.rfind('.'); + + if (startName == std::string::npos) + tblName.assign(curTblName); + else + tblName.assign(curTblName.substr(startName + 1)); + + execplan::CalpontSystemCatalog::TableName table(curJob.schema, tblName); + + try + { + boost::shared_ptr cat = + execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID); + tableAUXColOid = cat->tableAUXColumnOID(table); + } + catch (logging::IDBExcept& ie) + { + rc = ERR_UNKNOWN; + std::ostringstream oss; + + if (ie.errorCode() == logging::ERR_TABLE_NOT_IN_CATALOG) + { + oss << "Table " << table.toString(); + oss << "does not exist in the system catalog."; + } + else + { + oss << "Error getting AUX column OID for table " << table.toString(); + oss << " due to: " << ie.what(); + } + + fLog.logMsg(oss.str(), rc, MSGLVL_ERROR); + return rc; + } + catch(std::exception& ex) + { + rc = ERR_UNKNOWN; + std::ostringstream oss; + oss << "Error getting AUX column OID for table " << table.toString(); + oss << " due to: " << ex.what(); + fLog.logMsg(oss.str(), rc, MSGLVL_ERROR); + return rc; + } + catch(...) + { + rc = ERR_UNKNOWN; + std::ostringstream oss; + oss << "Error getting AUX column OID for table " << table.toString(); + fLog.logMsg(oss.str(), rc, MSGLVL_ERROR); + return rc; + } + + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + if (tableAUXColOid > 3000) + { + JobColumn curColumn; + curColumn.colName = "aux"; + curColumn.mapOid = tableAUXColOid; + curColumn.typeName = "unsigned-tinyint"; + curColumn.width = 1; + curColumn.definedWidth = 1; + curColumn.compressionType = 2; + curColumn.dctnry.fCompressionType = 2; + curColumn.fMinIntSat = MIN_UTINYINT; + curColumn.fMaxIntSat = MAX_UTINYINT; + curColumn.fWithDefault = true; + curColumn.fDefaultUInt = 1; + curJob.jobTableList[i].colList.push_back(curColumn); + JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, curJob.jobTableList[i].colList.size() - 1); + curJob.jobTableList[i].fFldRefs.push_back(fieldRef); + } } // Validate that the user's xml file has been regenerated since the diff --git a/writeengine/server/we_ddlcommandproc.cpp b/writeengine/server/we_ddlcommandproc.cpp index 5ad015aa7..7bbcec1b5 100644 --- a/writeengine/server/we_ddlcommandproc.cpp +++ b/writeengine/server/we_ddlcommandproc.cpp @@ -121,7 +121,7 @@ uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) { int rc = 0; uint32_t sessionID, tmp32; - int txnID, tableOID; + int txnID, tableOID, tableAUXColumnOID; uint32_t tableWithAutoi; bs >> sessionID; bs >> tmp32; @@ -129,6 +129,8 @@ uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) bs >> tmp32; tableOID = tmp32; bs >> tmp32; + tableAUXColumnOID = tmp32; + bs >> tmp32; tableWithAutoi = tmp32; bs >> tmp32; uint16_t dbroot = tmp32; @@ -192,6 +194,10 @@ uint8_t WE_DDLCommandProc::writeSystable(ByteStream& bs, std::string& err) { colTuple.data = tableOID; } + else if (AUXCOLUMNOID_COL == column.tableColName.column) + { + colTuple.data = tableAUXColumnOID; + } else if (CREATEDATE_COL == column.tableColName.column) { time_t t; diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index e44844c1c..92e47a7ff 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -129,22 +129,24 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::ROPair tableRoPair; + CalpontSystemCatalog::OID tableAUXColOid; std::vector colNames; bool isWarningSet = false; try { tableRoPair = systemCatalogPtr->tableRID(tableName); + tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); if (rows.size()) { Row* rowPtr = rows.at(0); ColumnList columns = rowPtr->get_ColumnList(); unsigned int numcols = rowPtr->get_NumberOfColumns(); - cscColTypeList.reserve(numcols); + cscColTypeList.reserve(numcols + 1); // WIP // We presume that DictCols number is low - colStructs.reserve(numcols); + colStructs.reserve(numcols + 1); ColumnList::const_iterator column_iterator = columns.begin(); while (column_iterator != columns.end()) @@ -205,6 +207,33 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: ++column_iterator; } + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + if (tableAUXColOid > 3000) + { + CalpontSystemCatalog::ColType colType; + colType.compressionType = 2; + colType.colWidth = 1; + colType.colDataType = datatypes::SystemCatalog::UTINYINT; + WriteEngine::ColStruct colStruct; + colStruct.fColDbRoot = dbroot; + WriteEngine::DctnryStruct dctnryStruct; + dctnryStruct.fColDbRoot = dbroot; + colStruct.dataOid = tableAUXColOid; + colStruct.tokenFlag = false; + colStruct.fCompressionType = colType.compressionType; + colStruct.colWidth = colType.colWidth; + colStruct.colDataType = colType.colDataType; + dctnryStruct.dctnryOid = 0; + dctnryStruct.columnOid = colStruct.dataOid; + dctnryStruct.fCompressionType = colType.compressionType; + dctnryStruct.colWidth = colType.colWidth; + colStructs.push_back(colStruct); + dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(colType); + } + std::string tmpStr(""); for (unsigned int i = 0; i < numcols; i++) @@ -411,6 +440,27 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std: dicStringList.push_back(dicStrings); } + // MCOL-5021 + if ((i == numcols - 1) && (tableAUXColOid > 3000)) + { + WriteEngine::ColTupleList auxColTuples; + WriteEngine::dictStr auxDicStrings; + + for (uint32_t j = 0; j < origVals.size(); j++) + { + WriteEngine::ColTuple auxColTuple; + auxColTuple.data = (uint8_t)1; + + auxColTuples.push_back(auxColTuple); + //@Bug 2515. Only pass string values to write engine + auxDicStrings.push_back(""); + } + + colValuesList.push_back(auxColTuples); + //@Bug 2515. Only pass string values to write engine + dicStringList.push_back(auxDicStrings); + } + ++row_iterator; } } @@ -841,6 +891,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); systemCatalogPtr->identity(CalpontSystemCatalog::EC); CalpontSystemCatalog::ROPair roPair; + CalpontSystemCatalog::OID tableAUXColOid; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::DictOIDList dictOids; @@ -848,6 +899,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: { ridList = systemCatalogPtr->columnRIDs(tableName, true); roPair = systemCatalogPtr->tableRID(tableName); + tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); } catch (std::exception& ex) { @@ -856,13 +908,13 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: return rc; } - std::vector dctnryStoreOids(ridList.size()); + uint32_t sizeToAdd = (tableAUXColOid > 3000 ? 1 : 0); + std::vector dctnryStoreOids(ridList.size() + sizeToAdd); std::vector columns; DctnryStructList dctnryList; - std::vector dbRootHWMInfoColVec(ridList.size()); + std::vector dbRootHWMInfoColVec(ridList.size() + sizeToAdd); uint32_t tblOid = roPair.objnum; - CalpontSystemCatalog::ColType colType; std::vector colDBRootExtentInfo; bool bFirstExtentOnThisPM = false; Convertor convertor; @@ -880,24 +932,51 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: try { + std::vector colTypes; + + for (unsigned i = 0; i < ridList.size(); i++) + { + CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(ridList[i].objnum); + colTypes.push_back(colType); + + if (colType.ddn.dictOID > 0) + { + dctnryStoreOids[i] = colType.ddn.dictOID; + } + else + { + dctnryStoreOids[i] = 0; + } + } + // First gather HWM BRM information for all columns std::vector colWidths; - for (unsigned i = 0; i < ridList.size(); i++) { rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]); // need handle error + colWidths.push_back(convertor.getCorrectRowWidth(colTypes[i].colDataType, colTypes[i].colWidth)); + } - CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum); - colWidths.push_back(convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth)); + // MCOL-5021 + if (tableAUXColOid > 3000) + { + rc = BRMWrapper::getInstance()->getDbRootHWMInfo(tableAUXColOid, dbRootHWMInfoColVec[ridList.size()]); + colWidths.push_back(1); + dctnryStoreOids[ridList.size()] = 0; + CalpontSystemCatalog::ROPair auxRoPair; + auxRoPair.rid = ridList.back().rid + 1; + auxRoPair.objnum = tableAUXColOid; + ridList.push_back(auxRoPair); } for (unsigned i = 0; i < ridList.size(); i++) { + uint32_t objNum = ridList[i].objnum; + // Find DBRoot/segment file where we want to start adding rows - colType = systemCatalogPtr->colType(ridList[i].objnum); boost::shared_ptr pDBRootExtentTracker( - new DBRootExtentTracker(ridList[i].objnum, colWidths, dbRootHWMInfoColVec, i, 0)); + new DBRootExtentTracker(objNum, colWidths, dbRootHWMInfoColVec, i, 0)); dbRootExtTrackerVec.push_back(pDBRootExtentTracker); DBRootExtentInfo dbRootExtent; std::string trkErrMsg; @@ -914,34 +993,38 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: colDBRootExtentInfo.push_back(dbRootExtent); Column aColumn; - aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); - aColumn.colDataType = colType.colDataType; - aColumn.compressionType = colType.compressionType; - aColumn.dataFile.oid = ridList[i].objnum; + + if ((i == ridList.size() - 1) && (tableAUXColOid > 3000)) // AUX column + { + aColumn.colWidth = 1; + aColumn.colDataType = datatypes::SystemCatalog::UTINYINT; + // TODO MCOL-5021 compressionType for the AUX column is hard-coded to 2 + aColumn.compressionType = 2; + } + else + { + const CalpontSystemCatalog::ColType& colType = colTypes[i]; + aColumn.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); + aColumn.colDataType = colType.colDataType; + aColumn.compressionType = colType.compressionType; + + if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0)) + { + DctnryStruct aDctnry; + aDctnry.dctnryOid = colType.ddn.dictOID; + aDctnry.fColPartition = dbRootExtent.fPartition; + aDctnry.fColSegment = dbRootExtent.fSegment; + aDctnry.fColDbRoot = dbRootExtent.fDbRoot; + dctnryList.push_back(aDctnry); + } + } + + aColumn.dataFile.oid = objNum; aColumn.dataFile.fPartition = dbRootExtent.fPartition; aColumn.dataFile.fSegment = dbRootExtent.fSegment; aColumn.dataFile.fDbRoot = dbRootExtent.fDbRoot; aColumn.dataFile.hwm = dbRootExtent.fLocalHwm; columns.push_back(aColumn); - - if ((colType.compressionType > 0) && (colType.ddn.dictOID > 0)) - { - DctnryStruct aDctnry; - aDctnry.dctnryOid = colType.ddn.dictOID; - aDctnry.fColPartition = dbRootExtent.fPartition; - aDctnry.fColSegment = dbRootExtent.fSegment; - aDctnry.fColDbRoot = dbRootExtent.fDbRoot; - dctnryList.push_back(aDctnry); - } - - if (colType.ddn.dictOID > 0) - { - dctnryStoreOids[i] = colType.ddn.dictOID; - } - else - { - dctnryStoreOids[i] = 0; - } } } catch (std::exception& ex) @@ -952,7 +1035,8 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: } //@Bug 5996 validate hwm before starts - rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting"); + rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Starting", + tableAUXColOid > 3000); if (rc != 0) { @@ -1062,6 +1146,31 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: ++column_iterator; } + + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + if (tableAUXColOid > 3000) + { + CalpontSystemCatalog::ColType colType; + colType.compressionType = 2; + colType.colWidth = 1; + colType.colDataType = datatypes::SystemCatalog::UTINYINT; + WriteEngine::ColStruct colStruct; + WriteEngine::DctnryStruct dctnryStruct; + colStruct.dataOid = tableAUXColOid; + colStruct.tokenFlag = false; + colStruct.fCompressionType = colType.compressionType; + colStruct.colWidth = colType.colWidth; + colStruct.colDataType = colType.colDataType; + dctnryStruct.dctnryOid = 0; + dctnryStruct.columnOid = colStruct.dataOid; + dctnryStruct.fCompressionType = colType.compressionType; + dctnryStruct.colWidth = colType.colWidth; + colStructs.push_back(colStruct); + dctnryStructList.push_back(dctnryStruct); + cscColTypeList.push_back(colType); + } } catch (std::exception& ex) { @@ -1273,6 +1382,27 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: dicStringList.push_back(dicStrings); } + // MCOL-5021 + if ((i == numcols - 1) && (tableAUXColOid > 3000)) + { + WriteEngine::ColTupleList auxColTuples; + WriteEngine::dictStr auxDicStrings; + + for (uint32_t j = 0; j < origVals.size(); j++) + { + WriteEngine::ColTuple auxColTuple; + auxColTuple.data = (uint8_t)1; + + auxColTuples.push_back(auxColTuple); + //@Bug 2515. Only pass string values to write engine + auxDicStrings.push_back(""); + } + + colValuesList.push_back(auxColTuples); + //@Bug 2515. Only pass string values to write engine + dicStringList.push_back(auxDicStrings); + } + ++row_iterator; } @@ -1372,6 +1502,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: return rc; } +#if 0 uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId) { @@ -2102,6 +2233,7 @@ uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, CalpontSystemCatalog::removeCalpontSystemCatalog(sessionId | 0x80000000); return rc; } +#endif uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err) { @@ -2275,8 +2407,49 @@ uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, st ColExtsInfo::iterator aIt; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::ROPair roPair; + + //@Bug 5996. Validate hwm before set them + boost::shared_ptr systemCatalogPtr = + CalpontSystemCatalog::makeCalpontSystemCatalog(0); + systemCatalogPtr->identity(CalpontSystemCatalog::EC); + + CalpontSystemCatalog::OID tableAUXColOid; + + try + { + CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid); + ridList = systemCatalogPtr->columnRIDs(tableName); + tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName); + } + catch (exception& ex) + { + err = ex.what(); + rc = 1; + TableMetaData::removeTableMetaData(tableOid); + + fIsFirstBatchPm = true; + // cout << "flush files when autocommit off" << endl; + fWEWrapper.setIsInsert(true); + fWEWrapper.setBulkFlag(true); + return rc; + } + + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + bool hasAUXCol = (tableAUXColOid > 3000); + + if (hasAUXCol) + { + CalpontSystemCatalog::ROPair auxRoPair; + auxRoPair.rid = ridList.back().rid + 1; + auxRoPair.objnum = tableAUXColOid; + ridList.push_back(auxRoPair); + } + std::vector colDBRootExtentInfo; DBRootExtentInfo aExtentInfo; + std::vector auxColDBRootExtentInfo; while (it != colsExtsInfoMap.end()) { @@ -2321,37 +2494,28 @@ uint8_t WE_DMLCommandProc::processBatchInsertHwm(messageqcpp::ByteStream& bs, st if (!isDict) { - ridList.push_back(roPair); - colDBRootExtentInfo.push_back(aExtentInfo); + if (hasAUXCol && (((uint32_t)tableAUXColOid) == it->first)) + { + auxColDBRootExtentInfo.push_back(aExtentInfo); + } + else + { + colDBRootExtentInfo.push_back(aExtentInfo); + } } it++; } - //@Bug 5996. Validate hwm before set them - boost::shared_ptr systemCatalogPtr = - CalpontSystemCatalog::makeCalpontSystemCatalog(0); - systemCatalogPtr->identity(CalpontSystemCatalog::EC); - - try + if (hasAUXCol) { - CalpontSystemCatalog::TableName tableName = systemCatalogPtr->tableName(tableOid); - ridList = systemCatalogPtr->columnRIDs(tableName); - } - catch (exception& ex) - { - err = ex.what(); - rc = 1; - TableMetaData::removeTableMetaData(tableOid); - - fIsFirstBatchPm = true; - // cout << "flush files when autocommit off" << endl; - fWEWrapper.setIsInsert(true); - fWEWrapper.setBulkFlag(true); - return rc; + idbassert(auxColDBRootExtentInfo.size() == 1); + colDBRootExtentInfo.insert(colDBRootExtentInfo.end(), auxColDBRootExtentInfo.begin(), + auxColDBRootExtentInfo.end()); } - rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending"); + rc = validateColumnHWMs(ridList, systemCatalogPtr, colDBRootExtentInfo, "Ending", + hasAUXCol); if (rc != 0) { @@ -2472,6 +2636,7 @@ uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int CalpontSystemCatalog::TableName aTableName; CalpontSystemCatalog::RIDList ridList; CalpontSystemCatalog::DictOIDList dictOids; + CalpontSystemCatalog::OID tableAUXColOid; try { @@ -2480,6 +2645,7 @@ uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int aTableName = systemCatalogPtr->tableName(tblOid); ridList = systemCatalogPtr->columnRIDs(aTableName, true); dictOids = systemCatalogPtr->dictOIDs(aTableName); + tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName); } catch (exception& ex) { @@ -2496,6 +2662,17 @@ uint8_t WE_DMLCommandProc::processBatchInsertHwmFlushChunks(uint32_t tblOid, int return rc; } + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + if (tableAUXColOid > 3000) + { + CalpontSystemCatalog::ROPair auxRoPair; + auxRoPair.rid = ridList.back().rid + 1; + auxRoPair.objnum = tableAUXColOid; + ridList.push_back(auxRoPair); + } + for (unsigned i = 0; i < ridList.size(); i++) { oids[ridList[i].objnum] = ridList[i].objnum; @@ -4017,6 +4194,7 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::strin boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID); CalpontSystemCatalog::ROPair roPair; + CalpontSystemCatalog::OID tableAUXColOid; CalpontSystemCatalog::RIDList tableRidList; @@ -4024,6 +4202,7 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::strin { roPair = systemCatalogPtr->tableRID(aTableName); tableRidList = systemCatalogPtr->columnRIDs(aTableName, true); + tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName); } catch (exception& ex) { @@ -4084,6 +4263,11 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::strin } } + // MCOL-5021 Valid AUX column OID for a table is > 3000 + // Tables that were created before this feature was added will have + // tableAUXColOid = 0 + bool hasAUXCol = tableAUXColOid > 3000; + try { for (unsigned i = 0; i < tableRidList.size(); i++) @@ -4118,6 +4302,21 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::strin return rc; } + if (hasAUXCol) + { + CalpontSystemCatalog::ColType colType; + colType.compressionType = 2; + colType.colWidth = 1; + colType.colDataType = datatypes::SystemCatalog::UTINYINT; + colStruct.dataOid = tableAUXColOid; + colStruct.tokenFlag = false; + colStruct.fCompressionType = colType.compressionType; + colStruct.colWidth = colType.colWidth; + colStruct.colDataType = colType.colDataType; + colStructList.push_back(colStruct); + cscColTypeList.push_back(colType); + } + std::vector colExtentsStruct; std::vector colExtentsColType; std::vector colOldValueList; @@ -4128,7 +4327,7 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::strin int error = 0; error = fWEWrapper.deleteRow(txnId, colExtentsColType, colExtentsStruct, colOldValueList, ridLists, - roPair.objnum); + roPair.objnum, hasAUXCol); if (error != NO_ERROR) { @@ -4544,35 +4743,45 @@ uint8_t WE_DMLCommandProc::processEndTransaction(ByteStream& bs, std::string& er //------------------------------------------------------------------------------ int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList, boost::shared_ptr systemCatalogPtr, - const std::vector& segFileInfo, const char* stage) + const std::vector& segFileInfo, const char* stage, + bool hasAuxCol) { int rc = NO_ERROR; if ((!fIsFirstBatchPm) && (strcmp(stage, "Starting") == 0)) return rc; - // Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table + // Used to track first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns in table int byte1First = -1; int byte2First = -1; int byte4First = -1; int byte8First = -1; + int byte16First = -1; // Make sure the HWMs for all 1-byte columns match; same for all 2-byte, - // 4-byte, and 8-byte columns as well. + // 4-byte, 8-byte, and 16-byte columns as well. CalpontSystemCatalog::ColType colType; Convertor convertor; + uint32_t colWidth; for (unsigned k = 0; k < segFileInfo.size(); k++) { - int k1 = 0; + unsigned k1 = 0; // Find out column width - colType = systemCatalogPtr->colType(ridList[k].objnum); - colType.colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); + if (hasAuxCol && (k == segFileInfo.size() - 1)) + { + colWidth = 1; + } + else + { + colType = systemCatalogPtr->colType(ridList[k].objnum); + colWidth = convertor.getCorrectRowWidth(colType.colDataType, colType.colWidth); + } - // Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns. + // Find the first 1-byte, 2-byte, 4-byte, 8-byte and 16-byte columns. // Use those as our reference HWM for the respective column widths. - switch (colType.colWidth) + switch (colWidth) { case 1: { @@ -4601,6 +4810,15 @@ int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList break; } + case 16: + { + if (byte16First == -1) + byte16First = k; + + k1 = byte16First; + break; + } + case 8: default: { @@ -4610,7 +4828,7 @@ int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList k1 = byte8First; break; } - } // end of switch based on column width (1,2,4, or 8) + } // end of switch based on column width (1,2,4,8 or 16) // std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 << // "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm << @@ -4624,15 +4842,26 @@ int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList (segFileInfo[k1].fSegment != segFileInfo[k].fSegment) || (segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm)) { - CalpontSystemCatalog::ColType colType2; - colType2 = systemCatalogPtr->colType(ridList[k1].objnum); + uint32_t colWidth2; + + if (hasAuxCol && (k1 == segFileInfo.size() - 1)) + { + colWidth2 = 1; + } + else + { + CalpontSystemCatalog::ColType colType2; + colType2 = systemCatalogPtr->colType(ridList[k1].objnum); + colWidth2 = colType2.colWidth; + } + ostringstream oss; oss << stage << " HWMs do not match for" " OID1-" << ridList[k1].objnum << "; DBRoot-" << segFileInfo[k1].fDbRoot << "; partition-" << segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment << "; hwm-" - << segFileInfo[k1].fLocalHwm << "; width-" << colType2.colWidth << ':' << std::endl + << segFileInfo[k1].fLocalHwm << "; width-" << colWidth2 << ':' << std::endl << " and OID2-" << ridList[k].objnum << "; DBRoot-" << segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-" << segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << colType.colWidth; @@ -4666,8 +4895,8 @@ int WE_DMLCommandProc::validateColumnHWMs(CalpontSystemCatalog::RIDList& ridList // Validate/compare HWM for 1-byte column in relation to 2-byte column, etc. // Without knowing the exact row count, we can't extrapolate the exact HWM // for the wider column, but we can narrow it down to an expected range. - int refCol = 0; - int colIdx = 0; + unsigned refCol = 0; + unsigned colIdx = 0; // Validate/compare HWMs given a 1-byte column as a starting point if (byte1First >= 0) @@ -4771,12 +5000,27 @@ errorCheck: if (rc != NO_ERROR) { - CalpontSystemCatalog::ColType colType1, colType2; - colType1 = systemCatalogPtr->colType(ridList[refCol].objnum); - colType1.colWidth = convertor.getCorrectRowWidth(colType1.colDataType, colType1.colWidth); + uint32_t colWidth1, colWidth2; - colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum); - colType2.colWidth = convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth); + if (hasAuxCol && (refCol == ridList.size() - 1)) + { + colWidth1 = 1; + } + else + { + CalpontSystemCatalog::ColType colType1 = systemCatalogPtr->colType(ridList[refCol].objnum); + colWidth1 = convertor.getCorrectRowWidth(colType1.colDataType, colType1.colWidth); + } + + if (hasAuxCol && (colIdx == ridList.size() - 1)) + { + colWidth2 = 1; + } + else + { + CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[colIdx].objnum); + colWidth2 = convertor.getCorrectRowWidth(colType2.colDataType, colType2.colWidth); + } ostringstream oss; oss << stage @@ -4784,10 +5028,10 @@ errorCheck: " OID1-" << ridList[refCol].objnum << "; DBRoot-" << segFileInfo[refCol].fDbRoot << "; partition-" << segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment << "; hwm-" - << segFileInfo[refCol].fLocalHwm << "; width-" << colType1.colWidth << ':' << std::endl + << segFileInfo[refCol].fLocalHwm << "; width-" << colWidth1 << ':' << std::endl << " and OID2-" << ridList[colIdx].objnum << "; DBRoot-" << segFileInfo[colIdx].fDbRoot << "; partition-" << segFileInfo[colIdx].fPartition << "; segment-" << segFileInfo[colIdx].fSegment - << "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-" << colType2.colWidth; + << "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-" << colWidth2; fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_ERROR); } diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index 23301793d..5db46d8d4 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -78,8 +78,8 @@ class WE_DMLCommandProc EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string& err); EXPORT uint8_t processBatchInsert(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId); - EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, - ByteStream::quadbyte& PMId); + //EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string& err, + // ByteStream::quadbyte& PMId); EXPORT uint8_t commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err); EXPORT uint8_t commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string& err); EXPORT uint8_t rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string& err); @@ -101,7 +101,8 @@ class WE_DMLCommandProc EXPORT uint8_t getWrittenLbids(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId); int validateColumnHWMs(execplan::CalpontSystemCatalog::RIDList& ridList, boost::shared_ptr systemCatalogPtr, - const std::vector& segFileInfo, const char* stage); + const std::vector& segFileInfo, const char* stage, + bool hasAuxCol); private: WriteEngineWrapper fWEWrapper; diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 9da77a3a2..95bb4c4e2 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -156,11 +156,13 @@ void DmlReadThread::operator()() break; } +#if 0 case WE_SVR_BATCH_INSERT_BINARY: { rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId); break; } +#endif case WE_SVR_GET_WRITTEN_LBIDS: { diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 9e87e8be5..b94812969 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -997,7 +997,7 @@ int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid, int WriteEngineWrapper::deleteRow(const TxnID& txnid, const vector& colExtentsColType, vector& colExtentsStruct, vector& colOldValueList, - vector& ridLists, const int32_t tableOid) + vector& ridLists, const int32_t tableOid, bool hasAUXCol) { ColTuple curTuple; ColStruct curColStruct; @@ -1065,7 +1065,7 @@ int WriteEngineWrapper::deleteRow(const TxnID& txnid, const vector // unfortunately I don't have a better way to instruct without passing too many parameters m_opType = DELETE; rc = updateColumnRec(txnid, colExtentsColType, colExtentsStruct, colValueList, colOldValueList, ridLists, - dctnryExtentsStruct, dctnryValueList, tableOid); + dctnryExtentsStruct, dctnryValueList, tableOid, hasAUXCol); m_opType = NOOP; return rc; @@ -4487,7 +4487,8 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, const vector& colExtentsStruct, ColValueList& colValueList, vector& colOldValueList, vector& ridLists, vector& dctnryExtentsStruct, - DctnryValueList& dctnryValueList, const int32_t tableOid) + DctnryValueList& dctnryValueList, const int32_t tableOid, + bool hasAUXCol) { int rc = 0; unsigned numExtents = colExtentsStruct.size(); @@ -4600,9 +4601,31 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, const vector currentExtentRangesPtrsAUX(1, currentExtentRangesPtrs.back()); + + rc = writeColumnRecUpdate(txnid, cscColTypeListAUX, colStructListAUX, colValueListAUX, colOldValueList, + ridLists[extent], tableOid, true, ridLists[extent].size(), + ¤tExtentRangesPtrsAUX); + + for (auto& cpInfoPtr : currentExtentRangesPtrs) + { + if (cpInfoPtr) + { + cpInfoPtr->toInvalid(); + } + } + } + else + { + rc = writeColumnRecUpdate(txnid, cscColTypeList, colStructList, colValueList, colOldValueList, + ridLists[extent], tableOid, true, ridLists[extent].size(), + ¤tExtentRangesPtrs); + } if (rc != NO_ERROR) break; diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index c571ac177..fba2947c6 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -254,7 +254,8 @@ class WriteEngineWrapper : public WEObj */ EXPORT int deleteRow(const TxnID& txnid, const std::vector& colExtentsColType, std::vector& colExtentsStruct, std::vector& colOldValueList, - std::vector& ridLists, const int32_t tableOid); + std::vector& ridLists, const int32_t tableOid, + bool hasAUXCol = false); /** * @brief Delete a list of rows from a table @@ -564,7 +565,8 @@ class WriteEngineWrapper : public WEObj std::vector& colExtentsStruct, ColValueList& colValueList, std::vector& colOldValueList, std::vector& ridLists, std::vector& dctnryExtentsStruct, - DctnryValueList& dctnryValueList, const int32_t tableOid); + DctnryValueList& dctnryValueList, const int32_t tableOid, + bool hasAUXCol = false); /** * @brief Update values into columns