From 8a9c58cf6bd3e2d1180bf972f739fa9c0ef4b9ff Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Tue, 13 Jun 2017 13:19:01 +0100 Subject: [PATCH] MCOL-769 Add new binary bulk insert command For use with mcsapi and maybe INSERT...SELECT and LDI --- writeengine/server/we_dmlcommandproc.cpp | 721 ++++++++++++++++++++++- writeengine/server/we_dmlcommandproc.h | 1 + writeengine/server/we_messages.h | 5 +- writeengine/server/we_readthread.cpp | 5 + 4 files changed, 724 insertions(+), 8 deletions(-) diff --git a/writeengine/server/we_dmlcommandproc.cpp b/writeengine/server/we_dmlcommandproc.cpp index b0cd83ac5..43e36b58f 100644 --- a/writeengine/server/we_dmlcommandproc.cpp +++ b/writeengine/server/we_dmlcommandproc.cpp @@ -1122,12 +1122,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std:: if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) { - if (((colType.colDataType == execplan::CalpontSystemCatalog::DATE) && (indata =="0000-00-00")) || - ((colType.colDataType == execplan::CalpontSystemCatalog::DATETIME) && (indata =="0000-00-00 00:00:00"))) - { - isNULL = true; - } - if (isNULL && colType.defaultValue.empty()) //error out { Message::Args args; @@ -1265,6 +1259,721 @@ End-Disable use of MetaFile for bulk rollback support return rc; } +uint8_t WE_DMLCommandProc::processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId) +{ + int rc = 0; + //cout << "processBatchInsert received bytestream length " << bs.length() << endl; + + ByteStream::quadbyte tmp32; + ByteStream::byte tmp8; + bs >> tmp32; + //cout << "processBatchInsert got transaction id " << tmp32 << endl; + bs >> PMId; + //cout << "processBatchInsert gor PMId " << PMId << endl; + uint32_t sessionId; + bs >> sessionId; + //cout << " processBatchInsert for session " << sessionId << endl; + bool isAutocommitOn; + bs >> tmp8; + isAutocommitOn = tmp8; + if (idbdatafile::IDBPolicy::useHdfs()) + isAutocommitOn = true; + //cout << "This session isAutocommitOn is " << isAutocommitOn << endl; + BRM::TxnID txnid; + txnid.id = tmp32; + txnid.valid = true; + bool isInsertSelect; + bs >> tmp8; + // For insert select, skip the hwm block and start inserting from the next block + // to avoid self insert issue. + //For batch insert: if not first batch, use the saved last rid to start adding rows. + isInsertSelect = tmp8; + + WriteEngine::ColStructList colStructs; + WriteEngine::DctnryStructList dctnryStructList; + WriteEngine::DctnryValueList dctnryValueList; + WriteEngine::ColValueList colValuesList; + WriteEngine::DictStrList dicStringList ; + CalpontSystemCatalog::TableName tableName; + CalpontSystemCatalog::TableColName tableColName; + bs >> tableColName.table; + bs >> tableColName.schema; + tableName.table = tableColName.table; + tableName.schema = tableColName.schema; + boost::shared_ptr systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionId); + systemCatalogPtr->identity(CalpontSystemCatalog::EC); + CalpontSystemCatalog::ROPair roPair; + CalpontSystemCatalog::RIDList ridList; + CalpontSystemCatalog::DictOIDList dictOids; + try + { + ridList = systemCatalogPtr->columnRIDs(tableName, true); + roPair = systemCatalogPtr->tableRID( tableName); + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + + + std::vector dctnryStoreOids(ridList.size()) ; + std::vector columns; + DctnryStructList dctnryList; + std::vector dbRootHWMInfoColVec(ridList.size()); + + uint32_t tblOid = roPair.objnum; + CalpontSystemCatalog::ColType colType; + std::vector colDBRootExtentInfo; + bool bFirstExtentOnThisPM = false; + Convertor convertor; + if ( fIsFirstBatchPm ) + { + dbRootExtTrackerVec.clear(); + if (isAutocommitOn || ((fRBMetaWriter.get() == NULL) && (!isAutocommitOn))) + fRBMetaWriter.reset(new RBMetaWriter("BatchInsert", NULL)); + fWEWrapper.setIsInsert(true); + fWEWrapper.setBulkFlag(true); + fWEWrapper.setTransId(txnid.id); + try + { + // First gather HWM BRM information for all columns + std::vector colWidths; + for (unsigned i=0; i < ridList.size(); i++) + { + rc = BRMWrapper::getInstance()->getDbRootHWMInfo(ridList[i].objnum, dbRootHWMInfoColVec[i]); + //need handle error + + CalpontSystemCatalog::ColType colType2 = systemCatalogPtr->colType(ridList[i].objnum); + colWidths.push_back( convertor.getCorrectRowWidth( + colType2.colDataType, colType2.colWidth) ); + } + + for (unsigned i=0; i < ridList.size(); i++) + { + // Find DBRoot/segment file where we want to start adding rows + colType = systemCatalogPtr->colType(ridList[i].objnum); + boost::shared_ptr pDBRootExtentTracker (new DBRootExtentTracker(ridList[i].objnum, + colWidths, dbRootHWMInfoColVec, i, 0) ); + dbRootExtTrackerVec.push_back( pDBRootExtentTracker ); + DBRootExtentInfo dbRootExtent; + std::string trkErrMsg; + bool bEmptyPM; + if (i == 0) + { + rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent,bFirstExtentOnThisPM, bEmptyPM, trkErrMsg); + /* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM << + " oid:dbroot:hwm = " << ridList[i].objnum << ":"< rangeList; + + // use of MetaFile for bulk rollback support + if ( fIsFirstBatchPm && isAutocommitOn) + { + //save meta data, version last block for each dbroot at the start of batch insert + try + { + fRBMetaWriter->init(tblOid, tableName.table); + fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec); + //cout << "Saved meta files" << endl; + if (!bFirstExtentOnThisPM) + { + //cout << "Backing up hwm chunks" << endl; + for (unsigned i=0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary + { + // @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context + fRBMetaWriter->backupDctnryHWMChunk( + dctnryList[i].dctnryOid, dctnryList[i].fColDbRoot, dctnryList[i].fColPartition, dctnryList[i].fColSegment); + } + } + } + catch (WeException& ex) // catch exception to close file, then rethrow + { + rc = 1; + err = ex.what(); + } + //Do versioning. Currently, we only version columns, not strings. If there is a design change, this will need to be re-visited + if ( rc != 0) + return rc; + + } + + std::vector colNames; + bool isWarningSet = false; + uint32_t columnCount; + bs >> columnCount; + if (columnCount) + { + try + { + for (uint32_t current_column = 0; current_column < columnCount; current_column++) + { + uint32_t tmp32; + std::string colName; + bs >> tmp32; + bs >> colName; + colNames.push_back(colName); + CalpontSystemCatalog::OID oid = tmp32; + + CalpontSystemCatalog::ColType colType; + colType = systemCatalogPtr->colType(oid); + + WriteEngine::ColStruct colStruct; + WriteEngine::DctnryStruct dctnryStruct; + colStruct.dataOid = oid; + colStruct.tokenFlag = false; + colStruct.fCompressionType = colType.compressionType; + // Token + if ( isDictCol(colType) ) + { + colStruct.colWidth = 8; + colStruct.tokenFlag = true; + } + else + { + colStruct.colWidth = colType.colWidth; + } + colStruct.colDataType = colType.colDataType; + + if (colStruct.tokenFlag) + { + dctnryStruct.dctnryOid = colType.ddn.dictOID; + dctnryStruct.columnOid = colStruct.dataOid; + dctnryStruct.fCompressionType = colType.compressionType; + dctnryStruct.colWidth = colType.colWidth; + } + else + { + dctnryStruct.dctnryOid = 0; + dctnryStruct.columnOid = colStruct.dataOid; + dctnryStruct.fCompressionType = colType.compressionType; + dctnryStruct.colWidth = colType.colWidth; + } + + colStructs.push_back(colStruct); + dctnryStructList.push_back(dctnryStruct); + + } + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + + std::string tmpStr(""); + uint32_t valuesPerColumn; + bs >> valuesPerColumn; + try + { + WriteEngine::ColTupleList colTuples; + WriteEngine::DctColTupleList dctColTuples; + bool pushWarning = false; + for (uint32_t j = 0; j < columnCount; j++) + { + tableColName.column = colNames[j]; + CalpontSystemCatalog::OID oid = colStructs[j].dataOid; + + CalpontSystemCatalog::ColType colType; + colType = systemCatalogPtr->colType(oid); + + boost::any datavalue; + bool isNULL = false; + WriteEngine::dictStr dicStrings; + // token + if ( isDictCol(colType) ) + { + for ( uint32_t i=0; i < valuesPerColumn; i++ ) + { + bs >> tmp8; + isNULL = tmp8; + bs >> tmpStr; + if ( tmpStr.length() == 0 ) + isNULL = true; + + if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) + { + if (isNULL && colType.defaultValue.empty()) //error out + { + Message::Args args; + args.add(tableColName.column); + err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); + rc = 1; + return rc; + } + else if (isNULL && !(colType.defaultValue.empty())) + { + tmpStr = colType.defaultValue; + } + } + + if ( tmpStr.length() > (unsigned int)colType.colWidth ) + { + tmpStr = tmpStr.substr(0, colType.colWidth); + if ( !pushWarning ) + pushWarning = true; + } + WriteEngine::ColTuple colTuple; + colTuple.data = datavalue; + + colTuples.push_back(colTuple); + //@Bug 2515. Only pass string values to write engine + dicStrings.push_back( tmpStr ); + } + colValuesList.push_back(colTuples); + //@Bug 2515. Only pass string values to write engine + dicStringList.push_back( dicStrings ); + } + else + { + string x; + //scan once to check how many autoincrement value needed + uint32_t nextValNeeded = 0; + uint64_t nextVal = 1; + if (colType.autoincrement) + { + try + { + nextVal = systemCatalogPtr->nextAutoIncrValue(tableName); + fDbrm.startAISequence(oid, nextVal, colType.colWidth, colType.colDataType); + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + } + + for ( uint32_t i=0; i < valuesPerColumn; i++ ) + { + bs >> tmp8; + isNULL = tmp8; + + int8_t val8; + int16_t val16; + int32_t val32; + int64_t val64; + float valF; + double valD; + std::string valStr; + bool valZero = false; // Needed for autoinc check + switch (colType.colDataType) + { + case execplan::CalpontSystemCatalog::TINYINT: + bs >> val8; + if (val8 == 0) + valZero = true; + datavalue = (char)val8; + break; + case execplan::CalpontSystemCatalog::SMALLINT: + bs >> val16; + if (val16 == 0) + valZero = true; + datavalue = (short)val16; + break; + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::INT: + bs >> val32; + if (val32 == 0) + valZero = true; + datavalue = (int)val32; + break; + case execplan::CalpontSystemCatalog::BIGINT: + bs >> val64; + if (val64 == 0) + valZero = true; + datavalue = (long long)val64; + break; + case execplan::CalpontSystemCatalog::UTINYINT: + bs >> val8; + if (val8 == 0) + valZero = true; + datavalue = (uint8_t)val8; + break; + case execplan::CalpontSystemCatalog::DATE: + case execplan::CalpontSystemCatalog::USMALLINT: + bs >> val16; + if (val16 == 0) + valZero = true; + datavalue = (uint16_t)val16; + break; + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::UINT: + bs >> val32; + if (val32 == 0) + valZero = true; + datavalue = (uint32_t)val32; + break; + case execplan::CalpontSystemCatalog::DATETIME: + case execplan::CalpontSystemCatalog::UBIGINT: + bs >> val64; + if (val64 == 0) + valZero = true; + datavalue = (uint64_t)val64; + break; + case execplan::CalpontSystemCatalog::DECIMAL: + switch (colType.colWidth) + { + case 1: + { + bs >> val8; + datavalue = (char) val8; + break; + } + case 2: + { + bs >> val16; + datavalue = (short) val16; + break; + } + case 4: + { + bs >> val32; + datavalue = (int) val32; + break; + } + default: + { + bs >> val64; + datavalue = (long long) val64; + break; + } + } + + break; + case execplan::CalpontSystemCatalog::UDECIMAL: + // UDECIMAL numbers may not be negative + if (colType.colWidth == 1) + { + bs >> val8; + if (val8 < 0 && + val8 != static_cast(joblist::TINYINTEMPTYROW) && + val8 != static_cast(joblist::TINYINTNULL)) + { + val8 = 0; + pushWarning = true; + } + datavalue = (char)val8; + } + else if (colType.colWidth == 2) + { + bs >> val16; + if (val16 < 0 && + val16 != static_cast(joblist::SMALLINTEMPTYROW) && + val16 != static_cast(joblist::SMALLINTNULL)) + { + val16 = 0; + pushWarning = true; + } + datavalue = (short)val16; + } + else if (colType.colWidth == 4) + { + bs >> val32; + if (val32 < 0 && + val32 != static_cast(joblist::INTEMPTYROW) && + val32 != static_cast(joblist::INTNULL)) + { + val32 = 0; + pushWarning = true; + } + datavalue = (int)val32; + } + else if (colType.colWidth == 8) + { + bs >> val64; + if (val64 < 0 && + val64 != static_cast(joblist::BIGINTEMPTYROW) && + val64 != static_cast(joblist::BIGINTNULL)) + { + val64 = 0; + pushWarning = true; + } + datavalue = (long long)val64; + } + break; + case execplan::CalpontSystemCatalog::DOUBLE: + bs >> val64; + memcpy(&valD, &val64, 8); + datavalue = valD; + break; + case execplan::CalpontSystemCatalog::UDOUBLE: + bs >> val64; + memcpy(&valD, &val64, 8); + if (valD < 0.0 && valD != joblist::DOUBLEEMPTYROW && valD != joblist::DOUBLENULL) + { + valD = 0.0; + pushWarning = true; + } + + datavalue = valD; + break; + case execplan::CalpontSystemCatalog::FLOAT: + bs >> val32; + memcpy(&valF, &val32, 4); + datavalue = valF; + break; + case execplan::CalpontSystemCatalog::UFLOAT: + bs >> val32; + memcpy(&valF, &val32, 4); + if (valF < 0.0 && valF != joblist::FLOATEMPTYROW && valF != joblist::FLOATNULL) + { + valF = 0.0; + pushWarning = true; + } + + datavalue = valF; + break; + + case execplan::CalpontSystemCatalog::CHAR: + case execplan::CalpontSystemCatalog::VARCHAR: + case execplan::CalpontSystemCatalog::TEXT: + case execplan::CalpontSystemCatalog::BLOB: + bs >> valStr; + if (valStr.length() > (unsigned int)colType.colWidth) + { + valStr = valStr.substr(0, colType.colWidth); + pushWarning = true; + } + else + { + if ( (unsigned int)colType.colWidth > valStr.length()) + { + //Pad null character to the string + valStr.resize(colType.colWidth, 0); + } + } + datavalue = valStr; + break; + default: + rc = 1; + err = IDBErrorInfo::instance()->errorMsg(ERR_DATATYPE_NOT_SUPPORT); + break; + } + //check if autoincrement column and value is 0 or null + if (colType.autoincrement && ( isNULL || valZero)) + { + ostringstream oss; + oss << nextVal++; + isNULL = false; + try + { + nextValNeeded++; + bool reserved = fDbrm.getAIRange(oid, nextValNeeded, &nextVal); + if (!reserved) + { + err = IDBErrorInfo::instance()->errorMsg(ERR_EXCEED_LIMIT); + rc = 1; + return rc; + } + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + switch (colType.colDataType) + { + case execplan::CalpontSystemCatalog::TINYINT: + case execplan::CalpontSystemCatalog::UTINYINT: + datavalue = (uint8_t) nextVal; + break; + case execplan::CalpontSystemCatalog::SMALLINT: + case execplan::CalpontSystemCatalog::USMALLINT: + datavalue = (uint16_t) nextVal; + break; + case execplan::CalpontSystemCatalog::MEDINT: + case execplan::CalpontSystemCatalog::UMEDINT: + case execplan::CalpontSystemCatalog::INT: + case execplan::CalpontSystemCatalog::UINT: + datavalue = (uint32_t) nextVal; + break; + case execplan::CalpontSystemCatalog::BIGINT: + case execplan::CalpontSystemCatalog::UBIGINT: + default: + datavalue = (uint64_t) nextVal; + break; + } + } + + if (colType.constraintType == CalpontSystemCatalog::NOTNULL_CONSTRAINT) + { + if (isNULL && colType.defaultValue.empty()) //error out + { + Message::Args args; + args.add(tableColName.column); + err = IDBErrorInfo::instance()->errorMsg(ERR_NOT_NULL_CONSTRAINTS, args); + rc = 1; + return rc; + } + else if (isNULL && !(colType.defaultValue.empty())) + { + datavalue = colType.defaultValue; + isNULL = false; + } + } + //@Bug 1806 + if (rc != NO_ERROR && rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING) + { + return rc; + } + if ( pushWarning && ( rc != dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING ) ) + rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; + + WriteEngine::ColTuple colTuple; + colTuple.data = datavalue; + + colTuples.push_back(colTuple); + //@Bug 2515. Only pass string values to write engine + dicStrings.push_back( valStr ); + } + colValuesList.push_back(colTuples); + dicStringList.push_back( dicStrings ); + } + + if (pushWarning) + { + colNames.push_back(tableColName.column); + isWarningSet = true; + } + } + } + catch (std::exception& ex) + { + err = ex.what(); + rc = 1; + return rc; + } + } + + // call the write engine to write the rows + int error = NO_ERROR; + //fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3); + //cout << "Batch inserting a row with transaction id " << txnid.id << endl; + if (colValuesList.size() > 0) + { + if (colValuesList[0].size() > 0) + { + if (NO_ERROR != + (error = fWEWrapper.insertColumnRecs(txnid.id, colStructs, colValuesList, dctnryStructList, dicStringList, + dbRootExtTrackerVec, 0, bFirstExtentOnThisPM, isInsertSelect, isAutocommitOn, roPair.objnum, fIsFirstBatchPm))) + { + if (error == ERR_BRM_DEAD_LOCK) + { + rc = dmlpackageprocessor::DMLPackageProcessor::DEAD_LOCK_ERROR; + WErrorCodes ec; + err = ec.errorString(error); + } + else if ( error == ERR_BRM_VB_OVERFLOW ) + { + rc = dmlpackageprocessor::DMLPackageProcessor::VB_OVERFLOW_ERROR; + err = IDBErrorInfo::instance()->errorMsg(ERR_VERSIONBUFFER_OVERFLOW); + } + else + { + rc = dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR; + WErrorCodes ec; + err = ec.errorString(error); + } + } + } + } + if (fIsFirstBatchPm && isAutocommitOn) + { + //fWEWrapper.writeVBEnd(txnid.id, rangeList); + fIsFirstBatchPm = false; + } + else if (fIsFirstBatchPm) + { + fIsFirstBatchPm = false; + } + if ( isWarningSet && ( rc == NO_ERROR ) ) + { + rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING; + //cout << "Got warning" << endl; + Message::Args args; + string cols = "'" + colNames[0] + "'"; + + for (unsigned i=1; ierrorMsg(WARN_DATA_TRUNC,args); + + // Strict mode enabled, so rollback on warning + // NOTE: This doesn't seem to be a possible code path yet + /*if (insertPkg.get_isWarnToError()) + { + string applName ("BatchInsert"); + fWEWrapper.bulkRollback(tblOid,txnid.id,tableName.toString(), + applName, false, err); + BulkRollbackMgr::deleteMetaFile( tblOid ); + }*/ + } + //cout << "Batch insert return code " << rc << endl; + return rc; +} + + uint8_t WE_DMLCommandProc::commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err) { uint8_t rc = 0; diff --git a/writeengine/server/we_dmlcommandproc.h b/writeengine/server/we_dmlcommandproc.h index cdee413c2..e5f991c94 100644 --- a/writeengine/server/we_dmlcommandproc.h +++ b/writeengine/server/we_dmlcommandproc.h @@ -79,6 +79,7 @@ class WE_DMLCommandProc EXPORT uint8_t rollbackBlocks(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t rollbackVersion(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t processBatchInsert(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); + EXPORT uint8_t processBatchInsertBinary(messageqcpp::ByteStream& bs, std::string & err, ByteStream::quadbyte & PMId); EXPORT uint8_t commitBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t commitBatchAutoOff(messageqcpp::ByteStream& bs, std::string & err); EXPORT uint8_t rollbackBatchAutoOn(messageqcpp::ByteStream& bs, std::string & err); diff --git a/writeengine/server/we_messages.h b/writeengine/server/we_messages.h index 19a44241c..19a61cf04 100644 --- a/writeengine/server/we_messages.h +++ b/writeengine/server/we_messages.h @@ -61,8 +61,8 @@ enum ServerMessages WE_SVR_COMMIT_BATCH_AUTO_OFF, WE_SVR_ROLLBACK_BATCH_AUTO_OFF, WE_SVR_BATCH_AUTOON_REMOVE_META, - WE_SVR_UPDATE, //35 - WE_SVR_FLUSH_FILES, + WE_SVR_UPDATE, + WE_SVR_FLUSH_FILES, //35 WE_SVR_DELETE, WE_SVR_DML_BULKROLLBACK, WE_SVR_DML_BULKROLLBACK_CLEANUP, @@ -82,6 +82,7 @@ enum ServerMessages WE_END_TRANSACTION, WE_SRV_FIX_ROWS, WE_SVR_WRITE_CREATE_SYSCOLUMN, + WE_SVR_BATCH_INSERT_BINARY, WE_CLT_SRV_DATA=100, WE_CLT_SRV_EOD, diff --git a/writeengine/server/we_readthread.cpp b/writeengine/server/we_readthread.cpp index 135501322..77beeb581 100644 --- a/writeengine/server/we_readthread.cpp +++ b/writeengine/server/we_readthread.cpp @@ -149,6 +149,11 @@ void DmlReadThread::operator()() //cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl; break; } + case WE_SVR_BATCH_INSERT_BINARY: + { + rc = fWeDMLprocessor->processBatchInsertBinary(ibs, errMsg, PMId); + break; + } case WE_SVR_BATCH_INSERT_END: { rc = fWeDMLprocessor->processBatchInsertHwm(ibs, errMsg);