diff --git a/writeengine/wrapper/writeengine.cpp b/writeengine/wrapper/writeengine.cpp index 8da33caad..dc863b823 100644 --- a/writeengine/wrapper/writeengine.cpp +++ b/writeengine/wrapper/writeengine.cpp @@ -1475,6 +1475,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid, Column curCol; ColStruct curColStruct; ColStructList newColStructList; + std::vector colNewValueList; DctnryStructList newDctnryStructList; HWM hwm = 0; HWM oldHwm = 0; @@ -2055,6 +2056,19 @@ timer.stop("tokenize"); tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo); } + //-------------------------------------------------------------------------- + //Prepare the valuelist for the new extent + //-------------------------------------------------------------------------- + + for (unsigned i=1; i <= totalColumns; i++) + { + // Copy values to second value list + for (uint64_t j=rowsLeft; j > 0; j--) + { + colNewValueList.push_back(colValueList[(totalRow*i)-j]); + } + } + // end of allocate row id #ifdef PROFILE @@ -2091,6 +2105,22 @@ timer.start("writeColumnRec"); } } } + // If we create a new extent for this batch + for (unsigned i = 0; i < newColStructList.size(); i++) + { + colOp = m_colOp[op(newColStructList[i].fCompressionType)]; + width = newColStructList[i].colWidth; + successFlag = colOp->calculateRowId(lastRidNew , BYTE_PER_BLOCK/width, width, curFbo, curBio); + if (successFlag) { + if (curFbo != lastFbo) { + RETURN_ON_ERROR(AddLBIDtoList(txnid, + lbids, + colDataTypes, + newColStructList[i], + curFbo)); + } + } + } } if (lbids.size() > 0) @@ -2100,7 +2130,7 @@ timer.start("writeColumnRec"); // Write row(s) to database file(s) //---------------------------------------------------------------------- bool versioning = !(isAutoCommitOn && insertSelect); - rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file + rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file } return rc; } @@ -4564,6 +4594,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, std::vector& colValueList, RID* rowIdArray, const ColStructList& newColStructList, + std::vector& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning) @@ -4574,7 +4605,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, Column curCol; ColStructList::size_type totalColumn; ColStructList::size_type i; - size_t totalRow; + size_t totalRow1, totalRow2; setTransId(txnid); @@ -4582,11 +4613,21 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, #ifdef PROFILE StopWatch timer; #endif - totalRow = colValueList.size() / totalColumn; - valArray = malloc(sizeof(uint64_t) * totalRow); + totalRow1 = colValueList.size() / totalColumn; + if (newColValueList.size() > 0) + { + totalRow2 = newColValueList.size() / newColStructList.size(); + totalRow1 -= totalRow2; + } + else + { + totalRow2 = 0; + } - if (totalRow == 0) + valArray = malloc(sizeof(uint64_t) * totalRow1); + + if (totalRow1 == 0) return rc; TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid); @@ -4634,7 +4675,7 @@ StopWatch timer; if (versioning) { rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], - colStructList[i].colWidth, totalRow, firstPart, rangeList); + colStructList[i].colWidth, totalRow1, firstPart, rangeList); if (rc != NO_ERROR) { if (colStructList[i].fCompressionType == 0) { @@ -4652,9 +4693,9 @@ StopWatch timer; uint8_t tmp8; uint16_t tmp16; uint32_t tmp32; - for (size_t j = 0; j < totalRow; j++) + for (size_t j = 0; j < totalRow1; j++) { - uint64_t curValue = colValueList[(totalRow*i) + j]; + uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j]; switch (colStructList[i].colType) { case WriteEngine::WR_VARBINARY : // treat same as char for now @@ -4692,7 +4733,7 @@ StopWatch timer; #ifdef PROFILE timer.start("writeRow "); #endif - rc = colOp->writeRow(curCol, totalRow, firstPart, valArray); + rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray); #ifdef PROFILE timer.stop("writeRow "); #endif @@ -4707,7 +4748,135 @@ timer.stop("writeRow "); } // end of for (i = 0 if (valArray != NULL) + { free(valArray); + valArray = NULL; + } + + // MCOL-1176 - Write second extent + if (totalRow2) + { + valArray = malloc(sizeof(uint64_t) * totalRow2); + for (i = 0; i < newColStructList.size(); i++) + { + //@Bug 2205 Check if all rows go to the new extent + //Write the first batch + RID * secondPart = rowIdArray + totalRow1; + ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)]; + + // set params + colOp->initColumn(curCol); + // need to pass real dbRoot, partition, and segment to setColParam + colOp->setColParam(curCol, 0, newColStructList[i].colWidth, + newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid, + newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot, + newColStructList[i].fColPartition, newColStructList[i].fColSegment); + + ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid); + ColExtsInfo::iterator it = aColExtsInfo.begin(); + while (it != aColExtsInfo.end()) + { + if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment)) + break; + it++; + } + + if (it == aColExtsInfo.end()) //add this one to the list + { + ColExtInfo aExt; + aExt.dbRoot = newColStructList[i].fColDbRoot; + aExt.partNum = newColStructList[i].fColPartition; + aExt.segNum = newColStructList[i].fColSegment; + aExt.compType = newColStructList[i].fCompressionType; + aColExtsInfo.push_back(aExt); + aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo); + } + + rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file + if (rc != NO_ERROR) + break; + + // handling versioning + vector rangeList; + if (versioning) + { + rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i], + newColStructList[i].colWidth, totalRow2, secondPart, rangeList); + if (rc != NO_ERROR) { + if (newColStructList[i].fCompressionType == 0) + { + curCol.dataFile.pFile->flush(); + } + + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + break; + } + } + + //totalRow1 -= totalRow2; + // have to init the size here + // nullArray = (bool*) malloc(sizeof(bool) * totalRow); + uint8_t tmp8; + uint16_t tmp16; + uint32_t tmp32; + for (size_t j = 0; j < totalRow2; j++) + { + uint64_t curValue = newColValueList[(totalRow2*i) + j]; + switch (newColStructList[i].colType) + { + case WriteEngine::WR_VARBINARY : // treat same as char for now + case WriteEngine::WR_CHAR: + case WriteEngine::WR_BLOB: + case WriteEngine::WR_TEXT: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_INT: + case WriteEngine::WR_UINT: + case WriteEngine::WR_FLOAT: + tmp32 = curValue; + ((uint32_t*)valArray)[j] = tmp32; + break; + case WriteEngine::WR_ULONGLONG: + case WriteEngine::WR_LONGLONG: + case WriteEngine::WR_DOUBLE: + case WriteEngine::WR_TOKEN: + ((uint64_t*)valArray)[j] = curValue; + break; + case WriteEngine::WR_BYTE: + case WriteEngine::WR_UBYTE: + tmp8 = curValue; + ((uint8_t*)valArray)[j] = tmp8; + break; + case WriteEngine::WR_SHORT: + case WriteEngine::WR_USHORT: + tmp16 = curValue; + ((uint16_t*)valArray)[j] = tmp16; + break; + } + } + + + #ifdef PROFILE + timer.start("writeRow "); + #endif + rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray); + #ifdef PROFILE + timer.stop("writeRow "); + #endif + colOp->closeColumnFile(curCol); + + if (versioning) + BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList); + + // check error + if (rc != NO_ERROR) + break; + + } // end of for (i = 0 + } + if (valArray != NULL) + free(valArray); + #ifdef PROFILE timer.finish(); diff --git a/writeengine/wrapper/writeengine.h b/writeengine/wrapper/writeengine.h index 7c7862d5f..f254a28cd 100644 --- a/writeengine/wrapper/writeengine.h +++ b/writeengine/wrapper/writeengine.h @@ -663,6 +663,7 @@ private: int writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList, std::vector& colValueList, RID* rowIdArray, const ColStructList& newColStructList, + std::vector& newColValueList, const int32_t tableOid, bool useTmpSuffix, bool versioning = true);