diff --git a/versioning/BRM/brmtypes.h b/versioning/BRM/brmtypes.h index 2e74e4aec..1b6f25f43 100644 --- a/versioning/BRM/brmtypes.h +++ b/versioning/BRM/brmtypes.h @@ -438,6 +438,7 @@ const uint8_t GET_SYSTEM_STATE = 54; const uint8_t SET_SYSTEM_STATE = 55; const uint8_t GET_UNIQUE_UINT64 = 56; const uint8_t CLEAR_SYSTEM_STATE = 57; +const uint8_t GET_UNCOMMITTED_LBIDS = 58; /* OID Manager interface */ const uint8_t ALLOC_OIDS = 60; diff --git a/versioning/BRM/masterdbrmnode.cpp b/versioning/BRM/masterdbrmnode.cpp index 112ea7232..dca6e1d69 100644 --- a/versioning/BRM/masterdbrmnode.cpp +++ b/versioning/BRM/masterdbrmnode.cpp @@ -380,6 +380,7 @@ void MasterDBRMNode::msgProcessor() case SET_SYSTEM_STATE: doSetSystemState(msg, p); continue; case CLEAR_SYSTEM_STATE: doClearSystemState(msg, p); continue; case SM_RESET: doSessionManagerReset(msg, p); continue; + case GET_UNCOMMITTED_LBIDS: doGetUncommittedLbids(msg, p); continue; } /* Process TableLock calls */ @@ -1353,6 +1354,98 @@ void MasterDBRMNode::doSIDTIDMap(ByteStream &msg, ThreadParams *p) catch (...) { } } +void MasterDBRMNode::doGetUncommittedLbids(ByteStream &msg, ThreadParams *p) +{ + ByteStream reply; + vector lbidList; + VSS vss; + ExtentMap em; + bool locked = false; + vector::iterator lbidIt; + typedef pair range_t; + range_t range; + vector ranges; + vector::iterator rangeIt; + ByteStream::byte cmd; + ByteStream::quadbyte transID; + msg >> cmd; + msg >> transID; + try { + vss.lock(VSS::READ); + locked = true; + + // Get a full list of uncommitted LBIDs related to this transactin. + vss.getUncommittedLBIDs(transID, lbidList); + + vss.release(VSS::READ); + locked = false; + + if(lbidList.size() > 0) { + + // Sort the vector. + std::sort::iterator>(lbidList.begin(), lbidList.end()); + + // Get the LBID range for the first block in the list. + lbidIt = lbidList.begin(); + if (em.lookup(*lbidIt, range.first, range.second) < 0) { + reply.reset(); + reply << (uint8_t) ERR_FAILURE; + try { + p->sock->write(reply); + } + catch (...) { } + + return; + } + ranges.push_back(range); + + // Loop through the LBIDs and add the new ranges. + ++lbidIt; + while(lbidIt != lbidList.end()) { + if (*lbidIt > range.second) { + if (em.lookup(*lbidIt, range.first, range.second) < 0) { + reply.reset(); + reply << (uint8_t) ERR_FAILURE; + try { + p->sock->write(reply); + } + catch (...) { } + return; + + } + ranges.push_back(range); + } + ++lbidIt; + } + + // Reset the lbidList and return only the first LBID in each extent that was changed + // in the transaction. + lbidList.clear(); + for (rangeIt = ranges.begin(); rangeIt != ranges.end(); rangeIt++) { + lbidList.push_back(rangeIt->first); + } + } + reply << (uint8_t) ERR_OK; + serializeInlineVector(reply, lbidList); + try { + p->sock->write(reply); + } + catch (...) { } + return; + } + catch (exception &e) { + if (locked) + vss.release(VSS::READ); + reply.reset(); + reply << (uint8_t) ERR_FAILURE; + try { + p->sock->write(reply); + } + catch (...) { } + return; + } +} + void MasterDBRMNode::doGetUniqueUint32(ByteStream &msg, ThreadParams *p) { ByteStream reply; diff --git a/versioning/BRM/masterdbrmnode.h b/versioning/BRM/masterdbrmnode.h index 880586775..98c6b7c57 100644 --- a/versioning/BRM/masterdbrmnode.h +++ b/versioning/BRM/masterdbrmnode.h @@ -194,6 +194,8 @@ private: void doSetSystemState(messageqcpp::ByteStream &msg, ThreadParams *p); void doClearSystemState(messageqcpp::ByteStream &msg, ThreadParams *p); void doSessionManagerReset(messageqcpp::ByteStream &msg, ThreadParams *p); + void doGetUncommittedLbids(messageqcpp::ByteStream &msg, ThreadParams *p); + /* OID Manager interface */ OIDServer oids; diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index b0cd83ac5..9015decff 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -1122,12 +1122,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { - if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (indata =="0000-00-00")) || - ((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) && (indata =="0000-00-00 00:00:00"))) - { - isNULL = true; - } - if (isNULL && colType.defaultValue.empty()) //error out { Message::Args args; @@ -1265,6 +1259,668 @@ End-Disable use of MetaFile for bulk rollback support return rc; } +uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId) +{ + int rc = 0; + //cout << "processBatchInsert received bytestream length " << bs.length() << endl; + + ByteStream::quadbyte tmp32; + ByteStream::byte tmp8; + bs >> tmp32; + //cout << "processBatchInsert got transaction id " << tmp32 << endl; + bs >> PMId; + //cout << "processBatchInsert gor PMId " << PMId << endl; + uint32_t sessionId; + bs >> sessionId; + //cout << " processBatchInsert for session " << sessionId << endl; + bool isAutocommitOn; + bs >> tmp8; + isAutocommitOn = tmp8; + if (idbdatafile::IDBPolicy::useHdfs()) + isAutocommitOn = true; + //cout << "This session isAutocommitOn is " << isAutocommitOn << endl; + BRM::TxnID txnid; + txnid.id = tmp32; + txnid.valid = true; + bool isInsertSelect; + bs >> tmp8; + // For insert select, skip the hwm block and start inserting from the next block + // to avoid self insert issue. + //For batch insert: if not first batch, use the saved last rid to start adding rows. + isInsertSelect = tmp8; + + WriteEngine::ColStructList colStructs; + WriteEngine::DctnryStructList dctnryStructList; + WriteEngine::DctnryValueList dctnryValueList; + std::vector colValuesList; + WriteEngine::DictStrList dicStringList ; + CalpontSystemCatalog::TableName tableName; + CalpontSystemCatalog::TableColName tableColName; + bs >> tableColName.table; + bs >> tableColName.schema; + tableName.table = tableColName.table; + tableName.schema = tableColName.schema; + boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); + systemCatalogPtr->identity(CalpontSystemCatalog::EC); + CalpontSystemCatalog::ROPair roPair; + CalpontSystemCatalog::RIDList ridList; + CalpontSystemCatalog::DictOIDList dictOids; + try + { + ridList = systemCatalogPtr->columnRIDs(tableName, true); + roPair = systemCatalogPtr->tableRID( tableName); + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + + + std::vector dctnryStoreOids(ridList.size()) ; + std::vector columns; + DctnryStructList dctnryList; + std::vector dbRootHWMInfoColVec(ridList.size()); + + uint32_t tblOid = roPair.objnum; + CalpontSystemCatalog::ColType colType; + std::vector colDBRootExtentInfo; + bool bFirstExtentOnThisPM = false; + Convertor convertor; + if ( fIsFirstBatchPm ) + { + dbRootExtTrackerVec.clear(); + if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn))) + fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL)); + fWEWrapper.setIsInsert(true); + fWEWrapper.setBulkFlag(true); + fWEWrapper.setTransId(txnid.id); + try + { + // First gather HWM BRM information for all columns + std::vector colWidths; + for (unsigned i=0; i < ridList.size(); i++) + { + rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]); + //need handle error + + CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum); + colWidths.push_back( convertor.getCorrectRowWidth( + colType2.colDataType, colType2.colWidth) ); + } + + for (unsigned i=0; i < ridList.size(); i++) + { + // Find DBRoot/segment file where we want to start adding rows + colType = systemCatalogPtr->colType(ridList[i].objnum); + boost::shared_ptr pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum, + colWidths, dbRootHWMInfoColVec, i, 0) ); + dbRootExtTrackerVec.push_back( pDBRootExtentTracker ); + DBRootExtentInfo dbRootExtent; + std::string trkErrMsg; + bool bEmptyPM; + if (i == 0) + { + rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent,bFirstExtentOnThisPM, bEmptyPM, trkErrMsg); + /* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM << + " oid:dbroot:hwm = " << ridList[i].objnum << ":"< rangeList; + + // use of MetaFile for bulk rollback support + if ( fIsFirstBatchPm && isAutocommitOn) + { + //save meta data, version last block for each dbroot at the start of batch insert + try + { + fRBMetaWriter->init(tblOid, tableName.table); + fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec); + //cout << "Saved meta files" << endl; + if (!bFirstExtentOnThisPM) + { + //cout << "Backing up hwm chunks" << endl; + for (unsigned i=0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary + { + // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context + fRBMetaWriter->backupDctnryHWMChunk( + dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment); + } + } + } + catch (WeException& ex) // catch exception to close file, then rethrow + { + rc = 1; + err = ex.what(); + } + //Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited + if ( rc != 0) + return rc; + + } + + std::vector colNames; + bool isWarningSet = false; + uint32_t columnCount; + bs >> columnCount; + if (columnCount) + { + try + { + for (uint32_t current_column = 0; current_column < columnCount; current_column++) + { + uint32_t tmp32; + std::string colName; + bs >> tmp32; + bs >> colName; + colNames.push_back(colName); + CalpontSystemCatalog::OID oid = tmp32; + + CalpontSystemCatalog::ColType colType; + colType = systemCatalogPtr->colType(oid); + + WriteEngine::ColStruct colStruct; + WriteEngine::DctnryStruct dctnryStruct; + colStruct.dataOid = oid; + colStruct.tokenFlag = false; + colStruct.fCompressionType = colType.compressionType; + // Token + if ( isDictCol(colType) ) + { + colStruct.colWidth = 8; + colStruct.tokenFlag = true; + } + else + { + colStruct.colWidth = colType.colWidth; + } + colStruct.colDataType = colType.colDataType; + + if (colStruct.tokenFlag) + { + dctnryStruct.dctnryOid = colType.ddn.dictOID; + dctnryStruct.columnOid = colStruct.dataOid; + dctnryStruct.fCompressionType = colType.compressionType; + dctnryStruct.colWidth = colType.colWidth; + } + else + { + dctnryStruct.dctnryOid = 0; + dctnryStruct.columnOid = colStruct.dataOid; + dctnryStruct.fCompressionType = colType.compressionType; + dctnryStruct.colWidth = colType.colWidth; + } + + colStructs.push_back(colStruct); + dctnryStructList.push_back(dctnryStruct); + + } + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + + std::string tmpStr(""); + uint32_t valuesPerColumn; + bs >> valuesPerColumn; + colValuesList.reserve(columnCount * valuesPerColumn); + try + { + bool pushWarning = false; + for (uint32_t j = 0; j < columnCount; j++) + { + WriteEngine::DctColTupleList dctColTuples; + tableColName.column = colNames[j]; + CalpontSystemCatalog::OID oid = colStructs[j].dataOid; + + CalpontSystemCatalog::ColType colType; + colType = systemCatalogPtr->colType(oid); + + bool isNULL = false; + WriteEngine::dictStr dicStrings; + // token + if ( isDictCol(colType) ) + { + for ( uint32_t i=0; i < valuesPerColumn; i++ ) + { + bs >> tmp8; + isNULL = tmp8; + bs >> tmpStr; + if ( tmpStr.length() == 0 ) + isNULL = true; + + if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) + { + if (isNULL && colType.defaultValue.empty()) //error out + { + Message::Args args; + args.add(tableColName.column); + err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); + rc = 1; + return rc; + } + else if (isNULL && !(colType.defaultValue.empty())) + { + tmpStr = colType.defaultValue; + } + } + + if ( tmpStr.length() > (unsigned int)colType.colWidth ) + { + tmpStr = tmpStr.substr(0, colType.colWidth); + if ( !pushWarning ) + pushWarning = true; + } + colValuesList.push_back(0); + //@Bug 2515. Only pass string values to write engine + dicStrings.push_back( tmpStr ); + } + //@Bug 2515. Only pass string values to write engine + dicStringList.push_back( dicStrings ); + } + else + { + string x; + //scan once to check how many autoincrement value needed + uint32_t nextValNeeded = 0; + uint64_t nextVal = 1; + if (colType.autoincrement) + { + try + { + nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); + fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + } + + for ( uint32_t i=0; i < valuesPerColumn; i++ ) + { + bs >> tmp8; + isNULL = tmp8; + + uint8_t val8; + uint16_t val16; + uint32_t val32; + uint64_t val64; + uint64_t colValue; + float valF; + double valD; + std::string valStr; + bool valZero = false; // Needed for autoinc check + switch (colType.colDataType) + { + case execplan::CalpontSystemCatalog::TINYINT: + case execplan::CalpontSystemCatalog::UTINYINT: + bs >> val8; + if (val8 == 0) + valZero = true; + colValue = val8; + break; + case execplan::CalpontSystemCatalog::SMALLINT: + case execplan::CalpontSystemCatalog::USMALLINT: + bs >> val16; + if (val16 == 0) + valZero = true; + colValue = val16; + break; + case execplan::CalpontSystemCatalog::DATE: + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + bs >> val32; + if (val32 == 0) + valZero = true; + colValue = val32; + break; + case execplan::CalpontSystemCatalog::BIGINT: + case execplan::CalpontSystemCatalog::DATETIME: + case execplan::CalpontSystemCatalog::UBIGINT: + bs >> val64; + if (val64 == 0) + valZero = true; + colValue = val64; + break; + case execplan::CalpontSystemCatalog::DECIMAL: + switch (colType.colWidth) + { + case 1: + { + bs >> val8; + colValue = val8; + break; + } + case 2: + { + bs >> val16; + colValue = val16; + break; + } + case 4: + { + bs >> val32; + colValue = val32; + break; + } + default: + { + bs >> val64; + colValue = val64; + break; + } + } + + break; + case execplan::CalpontSystemCatalog::UDECIMAL: + // UDECIMAL numbers may not be negative + if (colType.colWidth == 1) + { + bs >> val8; + if (val8 < 0 && + val8 != static_cast(joblist::TINYINTEMPTYROW) && + val8 != static_cast(joblist::TINYINTNULL)) + { + val8 = 0; + pushWarning = true; + } + colValue = val8; + } + else if (colType.colWidth == 2) + { + bs >> val16; + if (val16 < 0 && + val16 != static_cast(joblist::SMALLINTEMPTYROW) && + val16 != static_cast(joblist::SMALLINTNULL)) + { + val16 = 0; + pushWarning = true; + } + colValue = val16; + } + else if (colType.colWidth == 4) + { + bs >> val32; + if (val32 < 0 && + val32 != static_cast(joblist::INTEMPTYROW) && + val32 != static_cast(joblist::INTNULL)) + { + val32 = 0; + pushWarning = true; + } + colValue = val32; + } + else if (colType.colWidth == 8) + { + bs >> val64; + if (val64 < 0 && + val64 != static_cast(joblist::BIGINTEMPTYROW) && + val64 != static_cast(joblist::BIGINTNULL)) + { + val64 = 0; + pushWarning = true; + } + colValue = val64; + } + break; + case execplan::CalpontSystemCatalog::DOUBLE: + bs >> val64; + colValue = val64; + break; + case execplan::CalpontSystemCatalog::UDOUBLE: + bs >> val64; + memcpy(&valD, &val64, 8); + if (valD < 0.0 && valD != joblist::DOUBLEEMPTYROW && valD != joblist::DOUBLENULL) + { + valD = 0.0; + pushWarning = true; + } + + colValue = val64; + break; + case execplan::CalpontSystemCatalog::FLOAT: + bs >> val32; + colValue = val32; + break; + case execplan::CalpontSystemCatalog::UFLOAT: + bs >> val32; + memcpy(&valF, &val32, 4); + if (valF < 0.0 && valF != joblist::FLOATEMPTYROW && valF != joblist::FLOATNULL) + { + valF = 0.0; + pushWarning = true; + } + + colValue = val32; + break; + + case execplan::CalpontSystemCatalog::CHAR: + case execplan::CalpontSystemCatalog::VARCHAR: + case execplan::CalpontSystemCatalog::TEXT: + case execplan::CalpontSystemCatalog::BLOB: + bs >> valStr; + if (valStr.length() > (unsigned int)colType.colWidth) + { + valStr = valStr.substr(0, colType.colWidth); + pushWarning = true; + } + else + { + if ( (unsigned int)colType.colWidth > valStr.length()) + { + //Pad null character to the string + valStr.resize(colType.colWidth, 0); + } + } + memcpy(&colValue, valStr.c_str(), valStr.length()); + break; + default: + rc = 1; + err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT); + break; + } + //check if autoincrement column and value is 0 or null + if (colType.autoincrement && ( isNULL || valZero)) + { + ostringstream oss; + oss << nextVal++; + isNULL = false; + try + { + nextValNeeded++; + bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal); + if (!reserved) + { + err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT); + rc = 1; + return rc; + } + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + colValue = nextVal; + } + + if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) + { + if (isNULL && colType.defaultValue.empty()) //error out + { + Message::Args args; + args.add(tableColName.column); + err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); + rc = 1; + return rc; + } + else if (isNULL && !(colType.defaultValue.empty())) + { + memcpy(&colValue, colType.defaultValue.c_str(), colType.defaultValue.length()); + isNULL = false; + } + } + //@Bug 1806 + if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) + { + return rc; + } + if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) ) + rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; + + + colValuesList.push_back(colValue); + //@Bug 2515. Only pass string values to write engine + dicStrings.push_back( valStr ); + } + dicStringList.push_back( dicStrings ); + } + + if (pushWarning) + { + colNames.push_back(tableColName.column); + isWarningSet = true; + } + } + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + } + + // call the write engine to write the rows + int error = NO_ERROR; + //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3); + //cout << "Batch inserting a row with transaction id " << txnid.id << endl; + if (colValuesList.size() > 0) + { + if (NO_ERROR != + (error = fWEWrapper.insertColumnRecsBinary(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, + dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm))) + { + if (error == ERR_BRM_DEAD_LOCK) + { + rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; + WErrorCodes ec; + err = ec.errorString(error); + } + else if ( error == ERR_BRM_VB_OVERFLOW ) + { + rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; + err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); + } + else + { + rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; + WErrorCodes ec; + err = ec.errorString(error); + } + } + } + if (fIsFirstBatchPm && isAutocommitOn) + { + //fWEWrapper.writeVBEnd(txnid.id, rangeList); + fIsFirstBatchPm = false; + } + else if (fIsFirstBatchPm) + { + fIsFirstBatchPm = false; + } + if ( isWarningSet && ( rc == NO_ERROR ) ) + { + rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; + //cout << "Got warning" << endl; + Message::Args args; + string cols = "'" + colNames[0] + "'"; + + for (unsigned i=1; ierrorMsg(WARN_DATA_TRUNC,args); + + // Strict mode enabled, so rollback on warning + // NOTE: This doesn't seem to be a possible code path yet + /*if (insertPkg.get_isWarnToError()) + { + string applName ("BatchInsert"); + fWEWrapper.bulkRollback(tblOid,txnid.id,tableName.toString(), + applName, false, err); + BulkRollbackMgr::deleteMetaFile( tblOid ); + }*/ + } + //cout << "Batch insert return code " << rc << endl; + return rc; +} + + uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err) { uint8_t rc = 0; @@ -2754,6 +3410,47 @@ uint8_t WE_DMLCommandProc::processUpdate(messageqcpp::ByteStream& bs, return rc; } +uint8_t WE_DMLCommandProc::getWrittenLbids(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId) +{ + uint8_t rc = 0; + uint32_t txnId; + vector lbidList; + + bs >> txnId; + std::tr1::unordered_map::iterator mapIter; + std::tr1::unordered_map m_txnLBIDMap = fWEWrapper.getTxnMap(); + try + { + mapIter = m_txnLBIDMap.find(txnId); + if (mapIter != m_txnLBIDMap.end()) + { + SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second; + std::tr1::unordered_map ::iterator listIter = spTxnLBIDRec->m_LBIDMap.begin(); + while (listIter != spTxnLBIDRec->m_LBIDMap.end()) + { + lbidList.push_back(listIter->first); + listIter++; + } + } + } + catch(...) {} + bs.restart(); + try + { + serializeInlineVector (bs, lbidList); + } + catch (exception& ex) + { + // Append to errmsg in case we already have an error + if (err.length() > 0) + err += "; "; + err += ex.what(); + rc = 1; + return rc; + } + return rc; +} + uint8_t WE_DMLCommandProc::processFlushFiles(messageqcpp::ByteStream& bs, std::string & err) { uint8_t rc = 0; diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index cdee413c2..ccef3ef58 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -79,6 +79,7 @@ class WE_DMLCommandProc EXPORT uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t processBatchInsert(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); + EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); EXPORT uint8_t commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err); @@ -95,6 +96,7 @@ class WE_DMLCommandProc EXPORT uint8_t processPurgeFDCache(ByteStream& bs, std::string & err); EXPORT uint8_t processEndTransaction(ByteStream& bs, std::string & err); EXPORT uint8_t processFixRows(ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); + EXPORT uint8_t getWrittenLbids(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); int validateColumnHWMs( execplan::CalpontSystemCatalog::RIDList& ridList, boost::shared_ptr systemCatalogPtr, diff --git a/writeengine/server/we_messages.h b/writeengine/server/we_messages.h index 19a44241c..93eafa21d 100644 --- a/writeengine/server/we_messages.h +++ b/writeengine/server/we_messages.h @@ -61,8 +61,8 @@ enum ServerMessages WE_SVR_COMMIT_BATCH_AUTO_OFF, WE_SVR_ROLLBACK_BATCH_AUTO_OFF, WE_SVR_BATCH_AUTOON_REMOVE_META, - WE_SVR_UPDATE, //35 - WE_SVR_FLUSH_FILES, + WE_SVR_UPDATE, + WE_SVR_FLUSH_FILES, //35 WE_SVR_DELETE, WE_SVR_DML_BULKROLLBACK, WE_SVR_DML_BULKROLLBACK_CLEANUP, @@ -82,6 +82,8 @@ enum ServerMessages WE_END_TRANSACTION, WE_SRV_FIX_ROWS, WE_SVR_WRITE_CREATE_SYSCOLUMN, + WE_SVR_BATCH_INSERT_BINARY, + WE_SVR_GET_WRITTEN_LBIDS, WE_CLT_SRV_DATA=100, WE_CLT_SRV_EOD, diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 135501322..8f1bb86a9 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -149,6 +149,16 @@ void DmlReadThread::operator()() //cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl; break; } + case WE_SVR_BATCH_INSERT_BINARY: + { + rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId); + break; + } + case WE_SVR_GET_WRITTEN_LBIDS: + { + rc = fWeDMLprocessor->getWrittenLbids(ibs, errMsg, PMId); + break; + } case WE_SVR_BATCH_INSERT_END: { rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg); @@ -378,7 +388,7 @@ void DmlReadThread::operator()() obs << errMsg; } - if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId ==WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS)) + if ((msgId == WE_SVR_COMMIT_BATCH_AUTO_ON) || (msgId ==WE_SVR_BATCH_INSERT_END) || (msgId == WE_SVR_FETCH_DDL_LOGS) || (msgId == WE_SVR_GET_WRITTEN_LBIDS)) { obs += ibs; //cout << " sending back hwm info with ibs length " << endl; diff --git a/writeengine/wrapper/we_colop.h b/writeengine/wrapper/we_colop.h index 3bdb3bd8e..f9ff717a6 100644 --- a/writeengine/wrapper/we_colop.h +++ b/writeengine/wrapper/we_colop.h @@ -313,13 +313,13 @@ public: */ int copyVB(IDBDataFile* pSource, const BRM::VER_t txnD, const OID oid, std::vector& fboList, std::vector& rangeList); -protected: - /** * @brief close column file */ EXPORT virtual void closeColumnFile(Column& column) const; +protected: + /** * @brief populate readBuf with data in block #lbid */ diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 45447d08c..69cda5ce2 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -1177,7 +1177,10 @@ timer.stop("allocRowId"); //-------------------------------------------------------------------------- // Tokenize data if needed //-------------------------------------------------------------------------- - BRMWrapper::setUseVb( true ); + if (insertSelect && isAutoCommitOn) + BRMWrapper::setUseVb( false ); + else + BRMWrapper::setUseVb( true ); dictStr::iterator dctStr_iter; ColTupleList::iterator col_iter; for (i = 0; i < colStructList.size(); i++) @@ -1282,7 +1285,10 @@ timer.stop("tokenize"); } } } - BRMWrapper::setUseVb( true ); + if (insertSelect && isAutoCommitOn) + BRMWrapper::setUseVb( false ); + else + BRMWrapper::setUseVb( true ); //-------------------------------------------------------------------------- // Update column info structure @Bug 1862 set hwm, and @@ -1451,6 +1457,639 @@ timer.start("writeColumnRec"); return rc; } +int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, + ColStructList& colStructList, + std::vector& colValueList, + DctnryStructList& dctnryStructList, + DictStrList& dictStrList, + std::vector > & dbRootExtentTrackers, + RBMetaWriter* fRBMetaWriter, + bool bFirstExtentOnThisPM, + bool insertSelect, + bool isAutoCommitOn, + OID tableOid, + bool isFirstBatchPm) +{ + int rc; + RID* rowIdArray = NULL; + Column curCol; + ColStruct curColStruct; + ColStructList newColStructList; + DctnryStructList newDctnryStructList; + HWM hwm = 0; + HWM oldHwm = 0; + HWM newHwm = 0; + size_t totalRow; + ColStructList::size_type totalColumns; + uint64_t rowsLeft = 0; + bool newExtent = false; + RIDList ridList; + ColumnOp* colOp = NULL; + + // Set tmp file suffix to modify HDFS db file + bool useTmpSuffix = false; + if (idbdatafile::IDBPolicy::useHdfs()) + { + if (!bFirstExtentOnThisPM) + useTmpSuffix = true; + } + + unsigned i=0; +#ifdef PROFILE + StopWatch timer; +#endif + + //Convert data type and column width to write engine specific + for (i = 0; i < colStructList.size(); i++) + Convertor::convertColType(&colStructList[i]); + + // rc = checkValid(txnid, colStructList, colValueList, ridList); + // if (rc != NO_ERROR) + // return rc; + + setTransId(txnid); + uint16_t dbRoot, segmentNum; + uint32_t partitionNum; + string segFile; + bool newFile; + TableMetaData* tableMetaData= TableMetaData::makeTableMetaData(tableOid); + //populate colStructList with file information + IDBDataFile* pFile = NULL; + std::vector extentInfo; + int currentDBrootIdx = 0; + std::vector extents; + + //-------------------------------------------------------------------------- + // For first batch on this PM: + // o get starting extent from ExtentTracker, and allocate extent if needed + // o construct colStructList and dctnryStructList accordingly + // o save extent information in tableMetaData for future use + // If not first batch on this PM: + // o construct colStructList and dctnryStructList from tableMetaData + //-------------------------------------------------------------------------- + if (isFirstBatchPm) + { + currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx(); + extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList(); + dbRoot = extentInfo[currentDBrootIdx].fDbRoot; + partitionNum = extentInfo[currentDBrootIdx].fPartition; + + //---------------------------------------------------------------------- + // check whether this extent is the first on this PM + //---------------------------------------------------------------------- + if (bFirstExtentOnThisPM) + { + //cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl; + std::vector cols; + BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn; + for (i=0; i < colStructList.size(); i++) + { + createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid; + createStripeColumnExtentsArgIn.width = colStructList[i].colWidth; + createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType; + cols.push_back(createStripeColumnExtentsArgIn); + } + rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents); + if (rc != NO_ERROR) + return rc; + //Create column files + BRM::CPInfoList_t cpinfoList; + BRM::CPInfo cpInfo; + if (isUnsigned(colStructList[i].colDataType)) + { + cpInfo.max = 0; + cpInfo.min = static_cast(numeric_limits::max()); + } + else + { + cpInfo.max = numeric_limits::min(); + cpInfo.min = numeric_limits::max(); + } + cpInfo.seqNum = -1; + for ( i=0; i < extents.size(); i++) + { + colOp = m_colOp[op(colStructList[i].fCompressionType)]; + colOp->initColumn(curCol); + colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType, + colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType, + dbRoot, partitionNum, segmentNum); + rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot, + partitionNum, segmentNum, segFile, pFile, newFile); + if (rc != NO_ERROR) + return rc; + + //mark the extents to invalid + cpInfo.firstLbid = extents[i].startLbid; + cpinfoList.push_back(cpInfo); + colStructList[i].fColPartition = partitionNum; + colStructList[i].fColSegment = segmentNum; + colStructList[i].fColDbRoot = dbRoot; + dctnryStructList[i].fColPartition = partitionNum; + dctnryStructList[i].fColSegment = segmentNum; + dctnryStructList[i].fColDbRoot = dbRoot; + } + + //mark the extents to invalid + rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList); + if (rc != NO_ERROR) + return rc; + //create corresponding dictionary files + for (i=0; i < dctnryStructList.size(); i++) + { + if (dctnryStructList[i].dctnryOid > 0) + { + rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum, + segmentNum, dctnryStructList[i].fCompressionType); + if ( rc != NO_ERROR) + return rc; + } + } + } // if ( bFirstExtentOnThisPM) + else // if (!bFirstExtentOnThisPM) + { + std::vector tmpExtentInfo; + for (i=0; i < dbRootExtentTrackers.size(); i++) + { + tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList(); + colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition; + colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment; + colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot; + //cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) + break; + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aExt.isDict = false; + if (bFirstExtentOnThisPM) + { + aExt.hwm = extents[i].startBlkOffset; + aExt.isNewExt = true; + //cout << "adding a ext to metadata" << endl; + } + else + { + std::vector tmpExtentInfo; + tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList(); + aExt.isNewExt = false; + aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm; + //cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl; + } + aExt.current = true; + aColExtsInfo.push_back(aExt); + //cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl; + } + + tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + for (i=0; i < dctnryStructList.size(); i++) + { + if (dctnryStructList[i].dctnryOid > 0) + { + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment)) + break; + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = dctnryStructList[i].fColDbRoot; + aExt.partNum = dctnryStructList[i].fColPartition; + aExt.segNum = dctnryStructList[i].fColSegment; + aExt.compType = dctnryStructList[i].fCompressionType; + aExt.isDict = true; + aColExtsInfo.push_back(aExt); + } + tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo); + } + } + + } // if (isFirstBatchPm) + else //get the extent info from tableMetaData + { + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if (it->current) + break; + it++; + } + if (it == aColExtsInfo.end()) + return 1; + + for (i=0; i < colStructList.size(); i++) + { + colStructList[i].fColPartition = it->partNum; + colStructList[i].fColSegment = it->segNum; + colStructList[i].fColDbRoot = it->dbRoot; + dctnryStructList[i].fColPartition = it->partNum; + dctnryStructList[i].fColSegment = it->segNum; + dctnryStructList[i].fColDbRoot = it->dbRoot; + } + } + + totalColumns = colStructList.size(); + totalRow = colValueList.size() / totalColumns; + rowIdArray = new RID[totalRow]; + // use scoped_array to ensure ptr deletion regardless of where we return + boost::scoped_array rowIdArrayPtr(rowIdArray); + memset(rowIdArray, 0, (sizeof(RID)*totalRow)); + + //-------------------------------------------------------------------------- + // allocate row id(s) + //-------------------------------------------------------------------------- + curColStruct = colStructList[0]; + colOp = m_colOp[op(curColStruct.fCompressionType)]; + + colOp->initColumn(curCol); + + //Get the correct segment, partition, column file + vector colExtentInfo; //Save those empty extents in case of failure to rollback + vector dictExtentInfo; //Save those empty extents in case of failure to rollback + vector fileInfo; + dbRoot = curColStruct.fColDbRoot; + //use the first column to calculate row id + ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[0].fColDbRoot) && (it->partNum == colStructList[0].fColPartition) && (it->segNum == colStructList[0].fColSegment) && it->current ) + break; + it++; + } + if (it != aColExtsInfo.end()) + { + hwm = it->hwm; + //cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl; + } + + oldHwm = hwm; //Save this info for rollback + //need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, + curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType, + curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment); + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file + if (rc != NO_ERROR) { + return rc; + } + + //get hwm first + // @bug 286 : fix for bug 286 - correct the typo in getHWM + //RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm)); + + Column newCol; + +#ifdef PROFILE +timer.start("allocRowId"); +#endif + newColStructList = colStructList; + newDctnryStructList = dctnryStructList; + bool bUseStartExtent = true; + if (idbdatafile::IDBPolicy::useHdfs()) + insertSelect = true; + + rc = colOp->allocRowId(txnid, bUseStartExtent, + curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile, + newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm); + + //cout << "after allocrowid, total row = " < 256K. + // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it. + //-------------------------------------------------------------------------- +// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated? + if ((curCol.dataFile.fPartition == 0) && + (curCol.dataFile.fSegment == 0) && + ((totalRow-rowsLeft) > 0) && + (rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK)) + { + for (unsigned k=1; ksetColParam(expandCol, 0, + colStructList[k].colWidth, + colStructList[k].colDataType, + colStructList[k].colType, + colStructList[k].dataOid, + colStructList[k].fCompressionType, + colStructList[k].fColDbRoot, + colStructList[k].fColPartition, + colStructList[k].fColSegment); + rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file + if (rc == NO_ERROR) + { + if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth)) + { + rc = colOp->expandAbbrevExtent(expandCol); + } + } + if (rc != NO_ERROR) + { + return rc; + } + colOp->closeColumnFile(expandCol); + } + } + + //-------------------------------------------------------------------------- + // Tokenize data if needed + //-------------------------------------------------------------------------- + if (insertSelect && isAutoCommitOn) + BRMWrapper::setUseVb( false ); + else + BRMWrapper::setUseVb( true ); + dictStr::iterator dctStr_iter; + uint64_t *colValPtr; + size_t rowsPerColumn = colValueList.size() / colStructList.size(); + for (i = 0; i < colStructList.size(); i++) + { + if (colStructList[i].tokenFlag) + { + dctStr_iter = dictStrList[i].begin(); + Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)]; + rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, + dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition, + dctnryStructList[i].fColSegment, + useTmpSuffix); // @bug 5572 HDFS tmp file + if (rc !=NO_ERROR) + { + cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid<< endl; + return rc; + } + + for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++) + { + colValPtr = &colValueList[(i*rowsPerColumn) + rows]; + if (dctStr_iter->length() == 0) + { + Token nullToken; + memcpy(colValPtr, &nullToken, 8); + } + else + { +#ifdef PROFILE +timer.start("tokenize"); +#endif + DctnryTuple dctTuple; + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); + dctTuple.sigSize = dctStr_iter->length(); + dctTuple.isNull = false; + rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType); + if (rc != NO_ERROR) + { + dctnry->closeDctnry(); + return rc; + } +#ifdef PROFILE +timer.stop("tokenize"); +#endif + memcpy(colValPtr, &dctTuple.token, 8); + } + dctStr_iter++; + + } + //close dictionary files + rc = dctnry->closeDctnry(false); + if (rc != NO_ERROR) + return rc; + + if (newExtent) + { + //@Bug 4854 back up hwm chunk for the file to be modified + if (fRBMetaWriter) + fRBMetaWriter->backupDctnryHWMChunk(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment); + rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, + newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, + newDctnryStructList[i].fColSegment, + false); // @bug 5572 HDFS tmp file + if (rc !=NO_ERROR) + return rc; + + for (uint32_t rows = 0; rows < rowsLeft; rows++) + { + colValPtr = &colValueList[(i*rowsPerColumn) + rows]; + if (dctStr_iter->length() == 0) + { + Token nullToken; + memcpy(colValPtr, &nullToken, 8); + } + else + { +#ifdef PROFILE +timer.start("tokenize"); +#endif + DctnryTuple dctTuple; + dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str(); + dctTuple.sigSize = dctStr_iter->length(); + dctTuple.isNull = false; + rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType); + if (rc != NO_ERROR) + { + dctnry->closeDctnry(); + return rc; + } +#ifdef PROFILE +timer.stop("tokenize"); +#endif + memcpy(colValPtr, &dctTuple.token, 8); + } + dctStr_iter++; + } + //close dictionary files + rc = dctnry->closeDctnry(false); + if (rc != NO_ERROR) + return rc; + } + } + } + if (insertSelect && isAutoCommitOn) + BRMWrapper::setUseVb( false ); + else + BRMWrapper::setUseVb( true ); + + //-------------------------------------------------------------------------- + // Update column info structure @Bug 1862 set hwm, and + // Prepare ValueList for new extent (if applicable) + //-------------------------------------------------------------------------- + //@Bug 2205 Check whether all rows go to the new extent + RID lastRid = 0; + RID lastRidNew = 0; + if (totalRow-rowsLeft > 0) + { + lastRid = rowIdArray[totalRow-rowsLeft-1]; + lastRidNew = rowIdArray[totalRow-1]; + } + else + { + lastRid = 0; + lastRidNew = rowIdArray[totalRow-1]; + } + //cout << "rowid allocated is " << lastRid << endl; + //if a new extent is created, all the columns in this table should have their own new extent + //First column already processed + + //@Bug 1701. Close the file (if uncompressed) + m_colOp[op(curCol.compressionType)]->closeColumnFile(curCol); + //cout << "Saving hwm info for new ext batch" << endl; + //Update hwm to set them in the end + bool succFlag = false; + unsigned colWidth = 0; + int curFbo = 0, curBio; + for (i=0; i < totalColumns; i++) + { + //shoud be obtained from saved hwm + aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid); + it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) + && (it->segNum == colStructList[i].fColSegment) && it->current) + break; + it++; + } + if (it != aColExtsInfo.end()) //update hwm info + { + oldHwm = it->hwm; + } + + // save hwm for the old extent + colWidth = colStructList[i].colWidth; + succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio); + //cout << "insertcolumnrec oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl; + if (succFlag) + { + if ((HWM)curFbo >= oldHwm) + { + it->hwm = (HWM)curFbo; + } + //@Bug 4947. set current to false for old extent. + if (newExtent) + { + it->current = false; + } + + //cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = " + //<< it->dbRoot<<":"<partNum<<":"<segNum<<":"<hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl; + } + else + return ERR_INVALID_PARAM; + + //update hwm for the new extent + if (newExtent) + { + it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) + && (it->segNum == newColStructList[i].fColSegment) && it->current) + break; + it++; + } + succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio); + if (succFlag) + { + if (it != aColExtsInfo.end()) + { + it->hwm = (HWM)curFbo; + //cout << "setting hwm to " << (int)curFbo <<" for seg " <segNum << endl; + it->current = true; + } + } + else + return ERR_INVALID_PARAM; + } + tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + // end of allocate row id + +#ifdef PROFILE +timer.start("writeColumnRec"); +#endif +//cout << "Writing column record" << endl; + + if (rc == NO_ERROR) + { + //---------------------------------------------------------------------- + //Mark extents invalid + //---------------------------------------------------------------------- + vector lbids; + vector colDataTypes; + bool successFlag = true; + unsigned width = 0; + int curFbo = 0, curBio, lastFbo = -1; + + if (isFirstBatchPm && (totalRow == rowsLeft)) + {} + else { + for (unsigned i = 0; i < colStructList.size(); i++) + { + colOp = m_colOp[op(colStructList[i].fCompressionType)]; + width = colStructList[i].colWidth; + successFlag = colOp->calculateRowId(lastRid , BYTE_PER_BLOCK/width, width, curFbo, curBio); + if (successFlag) { + if (curFbo != lastFbo) { + RETURN_ON_ERROR(AddLBIDtoList(txnid, + lbids, + colDataTypes, + colStructList[i], + curFbo)); + } + } + } + } + + if (lbids.size() > 0) + rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes); + + //---------------------------------------------------------------------- + // Write row(s) to database file(s) + //---------------------------------------------------------------------- + bool versioning = !(isAutoCommitOn && insertSelect); + rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file + } + return rc; +} + + int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, ColStructList& colStructList, ColValueList& colValueList, @@ -1666,6 +2305,7 @@ timer.start("allocRowId"); } } + BRMWrapper::setUseVb(true); //Tokenize data if needed dictStr::iterator dctStr_iter; ColTupleList::iterator col_iter; @@ -3903,6 +4543,185 @@ timer.finish(); return rc; } +int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, + const ColStructList& colStructList, + std::vector& colValueList, + RID* rowIdArray, + const ColStructList& newColStructList, + const int32_t tableOid, + bool useTmpSuffix, + bool versioning) +{ + int rc = 0; + void* valArray; + string segFile; + Column curCol; + ColStructList::size_type totalColumn; + ColStructList::size_type i; + size_t totalRow; + + setTransId(txnid); + + totalColumn = colStructList.size(); +#ifdef PROFILE +StopWatch timer; +#endif + totalRow = colValueList.size() / totalColumn; + + valArray = malloc(sizeof(uint64_t) * totalRow); + + if (totalRow == 0) + return rc; + + TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); + for (i = 0; i < totalColumn; i++) + { + //@Bug 2205 Check if all rows go to the new extent + //Write the first batch + RID * firstPart = rowIdArray; + ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)]; + + // set params + colOp->initColumn(curCol); + // need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, colStructList[i].colWidth, + colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid, + colStructList[i].fCompressionType, colStructList[i].fColDbRoot, + colStructList[i].fColPartition, colStructList[i].fColSegment); + + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) + break; + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot =colStructList[i].fColDbRoot; + aExt.partNum = colStructList[i].fColPartition; + aExt.segNum = colStructList[i].fColSegment; + aExt.compType = colStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); + } + + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], + colStructList[i].colWidth, totalRow, firstPart, rangeList); + if (rc != NO_ERROR) { + if (colStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + //totalRow1 -= totalRow2; + // have to init the size here + // nullArray = (bool*) malloc(sizeof(bool) * totalRow); + uint8_t tmp8; + uint16_t tmp16; + uint32_t tmp32; + for (size_t j = 0; j < totalRow; j++) + { + uint64_t curValue = colValueList[(totalRow*i) + j]; + switch (colStructList[i].colType) + { + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + switch (colStructList[i].colWidth) + { + case 1: + tmp8 = curValue; + ((uint8_t*)valArray)[j] = tmp8; + break; + case 2: + tmp16 = curValue; + ((uint16_t*)valArray)[j] = tmp16; + break; + case 3: + case 4: + tmp32 = curValue; + ((uint32_t*)valArray)[j] = tmp32; + break; + case 5: + case 6: + case 7: + case 8: + ((uint64_t*)valArray)[j] = curValue; + break; + } + + break; + case WriteEngine::WR_INT: + case WriteEngine::WR_UINT: + case WriteEngine::WR_FLOAT: + tmp32 = curValue; + ((uint32_t*)valArray)[j] = tmp32; + break; + case WriteEngine::WR_ULONGLONG: + case WriteEngine::WR_LONGLONG: + case WriteEngine::WR_DOUBLE: + case WriteEngine::WR_TOKEN: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_BYTE: + case WriteEngine::WR_UBYTE: + tmp8 = curValue; + ((uint8_t*)valArray)[j] = tmp8; + break; + case WriteEngine::WR_SHORT: + case WriteEngine::WR_USHORT: + tmp16 = curValue; + ((uint16_t*)valArray)[j] = tmp16; + break; + } + } + + +#ifdef PROFILE +timer.start("writeRow "); +#endif + rc = colOp->writeRow(curCol, totalRow, firstPart, valArray); +#ifdef PROFILE +timer.stop("writeRow "); +#endif + colOp->closeColumnFile(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + // check error + if (rc != NO_ERROR) + break; + + } // end of for (i = 0 + if (valArray != NULL) + free(valArray); + +#ifdef PROFILE +timer.finish(); +#endif + return rc; +} + + int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index a47b9e291..7c7862d5f 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -303,6 +303,20 @@ public: bool isAutoCommitOn = false, OID tableOid = 0, bool isFirstBatchPm = false); + + EXPORT int insertColumnRecsBinary(const TxnID& txnid, + ColStructList& colStructList, + std::vector& colValueList, + DctnryStructList& dctnryStructList, + DictStrList& dictStrList, + std::vector > & dbRootExtentTrackers, + RBMetaWriter* fRBMetaWriter, + bool bFirstExtentOnThisPM, + bool insertSelect = false, + bool isAutoCommitOn = false, + OID tableOid = 0, + bool isFirstBatchPm = false); + /** * @brief Insert values into systables @@ -646,6 +660,11 @@ private: ColValueList& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning = true); + int writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, + std::vector& colValueList, + RID* rowIdArray, const ColStructList& newColStructList, + const int32_t tableOid, + bool useTmpSuffix, bool versioning = true); //@Bug 1886,2870 pass the address of ridList vector