You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-04 04:42:30 +03:00
MCOL-769 Much higher performance bulk insert
Use void* pointers instead of boost::any with lots of copies
This commit is contained in:
@ -1457,6 +1457,638 @@ timer.start("writeColumnRec");
|
||||
return rc;
|
||||
}
|
||||
|
||||
int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
ColStructList& colStructList,
|
||||
std::vector<uint64_t>& colValueList,
|
||||
DctnryStructList& dctnryStructList,
|
||||
DictStrList& dictStrList,
|
||||
std::vector<boost::shared_ptr<DBRootExtentTracker> > & dbRootExtentTrackers,
|
||||
RBMetaWriter* fRBMetaWriter,
|
||||
bool bFirstExtentOnThisPM,
|
||||
bool insertSelect,
|
||||
bool isAutoCommitOn,
|
||||
OID tableOid,
|
||||
bool isFirstBatchPm)
|
||||
{
|
||||
int rc;
|
||||
RID* rowIdArray = NULL;
|
||||
Column curCol;
|
||||
ColStruct curColStruct;
|
||||
ColStructList newColStructList;
|
||||
DctnryStructList newDctnryStructList;
|
||||
HWM hwm = 0;
|
||||
HWM oldHwm = 0;
|
||||
HWM newHwm = 0;
|
||||
size_t totalRow;
|
||||
ColStructList::size_type totalColumns;
|
||||
uint64_t rowsLeft = 0;
|
||||
bool newExtent = false;
|
||||
RIDList ridList;
|
||||
ColumnOp* colOp = NULL;
|
||||
|
||||
// Set tmp file suffix to modify HDFS db file
|
||||
bool useTmpSuffix = false;
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
{
|
||||
if (!bFirstExtentOnThisPM)
|
||||
useTmpSuffix = true;
|
||||
}
|
||||
|
||||
unsigned i=0;
|
||||
#ifdef PROFILE
|
||||
StopWatch timer;
|
||||
#endif
|
||||
|
||||
//Convert data type and column width to write engine specific
|
||||
for (i = 0; i < colStructList.size(); i++)
|
||||
Convertor::convertColType(&colStructList[i]);
|
||||
|
||||
// rc = checkValid(txnid, colStructList, colValueList, ridList);
|
||||
// if (rc != NO_ERROR)
|
||||
// return rc;
|
||||
|
||||
setTransId(txnid);
|
||||
uint16_t dbRoot, segmentNum;
|
||||
uint32_t partitionNum;
|
||||
string segFile;
|
||||
bool newFile;
|
||||
TableMetaData* tableMetaData= TableMetaData::makeTableMetaData(tableOid);
|
||||
//populate colStructList with file information
|
||||
IDBDataFile* pFile = NULL;
|
||||
std::vector<DBRootExtentInfo> extentInfo;
|
||||
int currentDBrootIdx = 0;
|
||||
std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// For first batch on this PM:
|
||||
// o get starting extent from ExtentTracker, and allocate extent if needed
|
||||
// o construct colStructList and dctnryStructList accordingly
|
||||
// o save extent information in tableMetaData for future use
|
||||
// If not first batch on this PM:
|
||||
// o construct colStructList and dctnryStructList from tableMetaData
|
||||
//--------------------------------------------------------------------------
|
||||
if (isFirstBatchPm)
|
||||
{
|
||||
currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx();
|
||||
extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList();
|
||||
dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
|
||||
partitionNum = extentInfo[currentDBrootIdx].fPartition;
|
||||
|
||||
//----------------------------------------------------------------------
|
||||
// check whether this extent is the first on this PM
|
||||
//----------------------------------------------------------------------
|
||||
if (bFirstExtentOnThisPM)
|
||||
{
|
||||
//cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
|
||||
std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
|
||||
BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
|
||||
for (i=0; i < colStructList.size(); i++)
|
||||
{
|
||||
createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
|
||||
createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
|
||||
createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
|
||||
cols.push_back(createStripeColumnExtentsArgIn);
|
||||
}
|
||||
rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum, extents);
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
//Create column files
|
||||
BRM::CPInfoList_t cpinfoList;
|
||||
BRM::CPInfo cpInfo;
|
||||
if (isUnsigned(colStructList[i].colDataType))
|
||||
{
|
||||
cpInfo.max = 0;
|
||||
cpInfo.min = static_cast<int64_t>(numeric_limits<uint64_t>::max());
|
||||
}
|
||||
else
|
||||
{
|
||||
cpInfo.max = numeric_limits<int64_t>::min();
|
||||
cpInfo.min = numeric_limits<int64_t>::max();
|
||||
}
|
||||
cpInfo.seqNum = -1;
|
||||
for ( i=0; i < extents.size(); i++)
|
||||
{
|
||||
colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
||||
colOp->initColumn(curCol);
|
||||
colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
|
||||
colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
|
||||
dbRoot, partitionNum, segmentNum);
|
||||
rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
|
||||
partitionNum, segmentNum, segFile, pFile, newFile);
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
|
||||
//mark the extents to invalid
|
||||
cpInfo.firstLbid = extents[i].startLbid;
|
||||
cpinfoList.push_back(cpInfo);
|
||||
colStructList[i].fColPartition = partitionNum;
|
||||
colStructList[i].fColSegment = segmentNum;
|
||||
colStructList[i].fColDbRoot = dbRoot;
|
||||
dctnryStructList[i].fColPartition = partitionNum;
|
||||
dctnryStructList[i].fColSegment = segmentNum;
|
||||
dctnryStructList[i].fColDbRoot = dbRoot;
|
||||
}
|
||||
|
||||
//mark the extents to invalid
|
||||
rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
//create corresponding dictionary files
|
||||
for (i=0; i < dctnryStructList.size(); i++)
|
||||
{
|
||||
if (dctnryStructList[i].dctnryOid > 0)
|
||||
{
|
||||
rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot, partitionNum,
|
||||
segmentNum, dctnryStructList[i].fCompressionType);
|
||||
if ( rc != NO_ERROR)
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
} // if ( bFirstExtentOnThisPM)
|
||||
else // if (!bFirstExtentOnThisPM)
|
||||
{
|
||||
std::vector<DBRootExtentInfo> tmpExtentInfo;
|
||||
for (i=0; i < dbRootExtentTrackers.size(); i++)
|
||||
{
|
||||
tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
|
||||
colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
|
||||
colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
|
||||
colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
|
||||
//cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
|
||||
//<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
|
||||
dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
|
||||
dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
|
||||
dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------------
|
||||
// Save the extents info in tableMetaData
|
||||
//----------------------------------------------------------------------
|
||||
for (i=0; i < colStructList.size(); i++)
|
||||
{
|
||||
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
|
||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
|
||||
if (it == aColExtsInfo.end()) //add this one to the list
|
||||
{
|
||||
ColExtInfo aExt;
|
||||
aExt.dbRoot = colStructList[i].fColDbRoot;
|
||||
aExt.partNum = colStructList[i].fColPartition;
|
||||
aExt.segNum = colStructList[i].fColSegment;
|
||||
aExt.compType = colStructList[i].fCompressionType;
|
||||
aExt.isDict = false;
|
||||
if (bFirstExtentOnThisPM)
|
||||
{
|
||||
aExt.hwm = extents[i].startBlkOffset;
|
||||
aExt.isNewExt = true;
|
||||
//cout << "adding a ext to metadata" << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
std::vector<DBRootExtentInfo> tmpExtentInfo;
|
||||
tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
|
||||
aExt.isNewExt = false;
|
||||
aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
|
||||
//cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
|
||||
}
|
||||
aExt.current = true;
|
||||
aColExtsInfo.push_back(aExt);
|
||||
//cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
|
||||
}
|
||||
|
||||
tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
||||
}
|
||||
|
||||
for (i=0; i < dctnryStructList.size(); i++)
|
||||
{
|
||||
if (dctnryStructList[i].dctnryOid > 0)
|
||||
{
|
||||
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
|
||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == dctnryStructList[i].fColDbRoot) && (it->partNum == dctnryStructList[i].fColPartition) && (it->segNum == dctnryStructList[i].fColSegment))
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
|
||||
if (it == aColExtsInfo.end()) //add this one to the list
|
||||
{
|
||||
ColExtInfo aExt;
|
||||
aExt.dbRoot = dctnryStructList[i].fColDbRoot;
|
||||
aExt.partNum = dctnryStructList[i].fColPartition;
|
||||
aExt.segNum = dctnryStructList[i].fColSegment;
|
||||
aExt.compType = dctnryStructList[i].fCompressionType;
|
||||
aExt.isDict = true;
|
||||
aColExtsInfo.push_back(aExt);
|
||||
}
|
||||
tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
|
||||
}
|
||||
}
|
||||
|
||||
} // if (isFirstBatchPm)
|
||||
else //get the extent info from tableMetaData
|
||||
{
|
||||
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid);
|
||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if (it->current)
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
if (it == aColExtsInfo.end())
|
||||
return 1;
|
||||
|
||||
for (i=0; i < colStructList.size(); i++)
|
||||
{
|
||||
colStructList[i].fColPartition = it->partNum;
|
||||
colStructList[i].fColSegment = it->segNum;
|
||||
colStructList[i].fColDbRoot = it->dbRoot;
|
||||
dctnryStructList[i].fColPartition = it->partNum;
|
||||
dctnryStructList[i].fColSegment = it->segNum;
|
||||
dctnryStructList[i].fColDbRoot = it->dbRoot;
|
||||
}
|
||||
}
|
||||
|
||||
totalColumns = colStructList.size();
|
||||
totalRow = colValueList.size() / totalColumns;
|
||||
rowIdArray = new RID[totalRow];
|
||||
// use scoped_array to ensure ptr deletion regardless of where we return
|
||||
boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
|
||||
memset(rowIdArray, 0, (sizeof(RID)*totalRow));
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// allocate row id(s)
|
||||
//--------------------------------------------------------------------------
|
||||
curColStruct = colStructList[0];
|
||||
colOp = m_colOp[op(curColStruct.fCompressionType)];
|
||||
|
||||
colOp->initColumn(curCol);
|
||||
|
||||
//Get the correct segment, partition, column file
|
||||
vector<ExtentInfo> colExtentInfo; //Save those empty extents in case of failure to rollback
|
||||
vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
|
||||
vector<ExtentInfo> fileInfo;
|
||||
dbRoot = curColStruct.fColDbRoot;
|
||||
//use the first column to calculate row id
|
||||
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid);
|
||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == colStructList[0].fColDbRoot) && (it->partNum == colStructList[0].fColPartition) && (it->segNum == colStructList[0].fColSegment) && it->current )
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
if (it != aColExtsInfo.end())
|
||||
{
|
||||
hwm = it->hwm;
|
||||
//cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
|
||||
}
|
||||
|
||||
oldHwm = hwm; //Save this info for rollback
|
||||
//need to pass real dbRoot, partition, and segment to setColParam
|
||||
colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType,
|
||||
curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
|
||||
curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
|
||||
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
|
||||
if (rc != NO_ERROR) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
//get hwm first
|
||||
// @bug 286 : fix for bug 286 - correct the typo in getHWM
|
||||
//RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));
|
||||
|
||||
Column newCol;
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.start("allocRowId");
|
||||
#endif
|
||||
newColStructList = colStructList;
|
||||
newDctnryStructList = dctnryStructList;
|
||||
bool bUseStartExtent = true;
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
insertSelect = true;
|
||||
|
||||
rc = colOp->allocRowId(txnid, bUseStartExtent,
|
||||
curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
|
||||
newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
|
||||
|
||||
//cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
|
||||
// cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
|
||||
if (rc != NO_ERROR) //Clean up is already done
|
||||
return rc;
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.stop("allocRowId");
|
||||
#endif
|
||||
//--------------------------------------------------------------------------
|
||||
// Expand initial abbreviated extent if any RID in 1st extent is > 256K.
|
||||
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
||||
//--------------------------------------------------------------------------
|
||||
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
||||
if ((curCol.dataFile.fPartition == 0) &&
|
||||
(curCol.dataFile.fSegment == 0) &&
|
||||
((totalRow-rowsLeft) > 0) &&
|
||||
(rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
|
||||
{
|
||||
for (unsigned k=1; k<colStructList.size(); k++)
|
||||
{
|
||||
Column expandCol;
|
||||
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
||||
colOp->setColParam(expandCol, 0,
|
||||
colStructList[k].colWidth,
|
||||
colStructList[k].colDataType,
|
||||
colStructList[k].colType,
|
||||
colStructList[k].dataOid,
|
||||
colStructList[k].fCompressionType,
|
||||
colStructList[k].fColDbRoot,
|
||||
colStructList[k].fColPartition,
|
||||
colStructList[k].fColSegment);
|
||||
rc = colOp->openColumnFile(expandCol, segFile, true); // @bug 5572 HDFS tmp file
|
||||
if (rc == NO_ERROR)
|
||||
{
|
||||
if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
|
||||
{
|
||||
rc = colOp->expandAbbrevExtent(expandCol);
|
||||
}
|
||||
}
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
return rc;
|
||||
}
|
||||
colOp->closeColumnFile(expandCol);
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// Tokenize data if needed
|
||||
//--------------------------------------------------------------------------
|
||||
if (insertSelect && isAutoCommitOn)
|
||||
BRMWrapper::setUseVb( false );
|
||||
else
|
||||
BRMWrapper::setUseVb( true );
|
||||
dictStr::iterator dctStr_iter;
|
||||
uint64_t *colValPtr;
|
||||
size_t rowsPerColumn = colValueList.size() / colStructList.size();
|
||||
for (i = 0; i < colStructList.size(); i++)
|
||||
{
|
||||
if (colStructList[i].tokenFlag)
|
||||
{
|
||||
dctStr_iter = dictStrList[i].begin();
|
||||
Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
|
||||
rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid,
|
||||
dctnryStructList[i].fColDbRoot, dctnryStructList[i].fColPartition,
|
||||
dctnryStructList[i].fColSegment,
|
||||
useTmpSuffix); // @bug 5572 HDFS tmp file
|
||||
if (rc !=NO_ERROR)
|
||||
{
|
||||
cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid<< endl;
|
||||
return rc;
|
||||
}
|
||||
|
||||
for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
|
||||
{
|
||||
colValPtr = &colValueList[(i*rowsPerColumn) + rows];
|
||||
if (dctStr_iter->length() == 0)
|
||||
{
|
||||
Token nullToken;
|
||||
memcpy(colValPtr, &nullToken, 8);
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef PROFILE
|
||||
timer.start("tokenize");
|
||||
#endif
|
||||
DctnryTuple dctTuple;
|
||||
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
|
||||
dctTuple.sigSize = dctStr_iter->length();
|
||||
dctTuple.isNull = false;
|
||||
rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
dctnry->closeDctnry();
|
||||
return rc;
|
||||
}
|
||||
#ifdef PROFILE
|
||||
timer.stop("tokenize");
|
||||
#endif
|
||||
memcpy(colValPtr, &dctTuple.token, 8);
|
||||
}
|
||||
dctStr_iter++;
|
||||
|
||||
}
|
||||
//close dictionary files
|
||||
rc = dctnry->closeDctnry(false);
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
|
||||
if (newExtent)
|
||||
{
|
||||
//@Bug 4854 back up hwm chunk for the file to be modified
|
||||
if (fRBMetaWriter)
|
||||
fRBMetaWriter->backupDctnryHWMChunk(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);
|
||||
rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid,
|
||||
newDctnryStructList[i].fColDbRoot, newDctnryStructList[i].fColPartition,
|
||||
newDctnryStructList[i].fColSegment,
|
||||
false); // @bug 5572 HDFS tmp file
|
||||
if (rc !=NO_ERROR)
|
||||
return rc;
|
||||
|
||||
for (uint32_t rows = 0; rows < rowsLeft; rows++)
|
||||
{
|
||||
colValPtr = &colValueList[(i*rowsPerColumn) + rows];
|
||||
if (dctStr_iter->length() == 0)
|
||||
{
|
||||
Token nullToken;
|
||||
memcpy(colValPtr, &nullToken, 8);
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef PROFILE
|
||||
timer.start("tokenize");
|
||||
#endif
|
||||
DctnryTuple dctTuple;
|
||||
dctTuple.sigValue = (unsigned char*)dctStr_iter->c_str();
|
||||
dctTuple.sigSize = dctStr_iter->length();
|
||||
dctTuple.isNull = false;
|
||||
rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
dctnry->closeDctnry();
|
||||
return rc;
|
||||
}
|
||||
#ifdef PROFILE
|
||||
timer.stop("tokenize");
|
||||
#endif
|
||||
memcpy(colValPtr, &dctTuple.token, 8);
|
||||
}
|
||||
dctStr_iter++;
|
||||
}
|
||||
//close dictionary files
|
||||
rc = dctnry->closeDctnry(false);
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (insertSelect && isAutoCommitOn)
|
||||
BRMWrapper::setUseVb( false );
|
||||
else
|
||||
BRMWrapper::setUseVb( true );
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// Update column info structure @Bug 1862 set hwm, and
|
||||
// Prepare ValueList for new extent (if applicable)
|
||||
//--------------------------------------------------------------------------
|
||||
//@Bug 2205 Check whether all rows go to the new extent
|
||||
RID lastRid = 0;
|
||||
RID lastRidNew = 0;
|
||||
if (totalRow-rowsLeft > 0)
|
||||
{
|
||||
lastRid = rowIdArray[totalRow-rowsLeft-1];
|
||||
lastRidNew = rowIdArray[totalRow-1];
|
||||
}
|
||||
else
|
||||
{
|
||||
lastRid = 0;
|
||||
lastRidNew = rowIdArray[totalRow-1];
|
||||
}
|
||||
//cout << "rowid allocated is " << lastRid << endl;
|
||||
//if a new extent is created, all the columns in this table should have their own new extent
|
||||
//First column already processed
|
||||
|
||||
//@Bug 1701. Close the file (if uncompressed)
|
||||
m_colOp[op(curCol.compressionType)]->closeColumnFile(curCol);
|
||||
//cout << "Saving hwm info for new ext batch" << endl;
|
||||
//Update hwm to set them in the end
|
||||
bool succFlag = false;
|
||||
unsigned colWidth = 0;
|
||||
int curFbo = 0, curBio;
|
||||
for (i=0; i < totalColumns; i++)
|
||||
{
|
||||
//shoud be obtained from saved hwm
|
||||
aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
|
||||
it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition)
|
||||
&& (it->segNum == colStructList[i].fColSegment) && it->current)
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
if (it != aColExtsInfo.end()) //update hwm info
|
||||
{
|
||||
oldHwm = it->hwm;
|
||||
}
|
||||
|
||||
// save hwm for the old extent
|
||||
colWidth = colStructList[i].colWidth;
|
||||
succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio);
|
||||
//cout << "insertcolumnrec oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl;
|
||||
if (succFlag)
|
||||
{
|
||||
if ((HWM)curFbo >= oldHwm)
|
||||
{
|
||||
it->hwm = (HWM)curFbo;
|
||||
}
|
||||
//@Bug 4947. set current to false for old extent.
|
||||
if (newExtent)
|
||||
{
|
||||
it->current = false;
|
||||
}
|
||||
|
||||
//cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = "
|
||||
//<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl;
|
||||
}
|
||||
else
|
||||
return ERR_INVALID_PARAM;
|
||||
|
||||
//update hwm for the new extent
|
||||
if (newExtent)
|
||||
{
|
||||
it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition)
|
||||
&& (it->segNum == newColStructList[i].fColSegment) && it->current)
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio);
|
||||
if (succFlag)
|
||||
{
|
||||
if (it != aColExtsInfo.end())
|
||||
{
|
||||
it->hwm = (HWM)curFbo;
|
||||
//cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
|
||||
it->current = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
return ERR_INVALID_PARAM;
|
||||
}
|
||||
tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
||||
}
|
||||
|
||||
// end of allocate row id
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.start("writeColumnRec");
|
||||
#endif
|
||||
//cout << "Writing column record" << endl;
|
||||
|
||||
if (rc == NO_ERROR)
|
||||
{
|
||||
//----------------------------------------------------------------------
|
||||
//Mark extents invalid
|
||||
//----------------------------------------------------------------------
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
bool successFlag = true;
|
||||
unsigned width = 0;
|
||||
int curFbo = 0, curBio, lastFbo = -1;
|
||||
|
||||
if (isFirstBatchPm && (totalRow == rowsLeft))
|
||||
{}
|
||||
else {
|
||||
for (unsigned i = 0; i < colStructList.size(); i++)
|
||||
{
|
||||
colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
||||
width = colStructList[i].colWidth;
|
||||
successFlag = colOp->calculateRowId(lastRid , BYTE_PER_BLOCK/width, width, curFbo, curBio);
|
||||
if (successFlag) {
|
||||
if (curFbo != lastFbo) {
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
colStructList[i],
|
||||
curFbo));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (lbids.size() > 0)
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
|
||||
//----------------------------------------------------------------------
|
||||
// Write row(s) to database file(s)
|
||||
//----------------------------------------------------------------------
|
||||
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix); // @bug 5572 HDFS tmp file
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
|
||||
ColStructList& colStructList,
|
||||
ColValueList& colValueList,
|
||||
@ -3909,6 +4541,204 @@ timer.finish();
|
||||
return rc;
|
||||
}
|
||||
|
||||
int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
const ColStructList& colStructList,
|
||||
std::vector<uint64_t>& colValueList,
|
||||
RID* rowIdArray,
|
||||
const ColStructList& newColStructList,
|
||||
const int32_t tableOid,
|
||||
bool useTmpSuffix,
|
||||
bool versioning)
|
||||
{
|
||||
bool bExcp;
|
||||
int rc = 0;
|
||||
void* valArray;
|
||||
string segFile;
|
||||
Column curCol;
|
||||
ColStructList::size_type totalColumn;
|
||||
ColStructList::size_type i;
|
||||
size_t totalRow;
|
||||
|
||||
setTransId(txnid);
|
||||
|
||||
totalColumn = colStructList.size();
|
||||
#ifdef PROFILE
|
||||
StopWatch timer;
|
||||
#endif
|
||||
totalRow = colValueList.size() / totalColumn;
|
||||
|
||||
valArray = malloc(sizeof(uint64_t) * totalRow);
|
||||
|
||||
if (totalRow == 0)
|
||||
return rc;
|
||||
|
||||
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
|
||||
for (i = 0; i < totalColumn; i++)
|
||||
{
|
||||
RID * secondPart = rowIdArray + totalRow;
|
||||
//@Bug 2205 Check if all rows go to the new extent
|
||||
//Write the first batch
|
||||
RID * firstPart = rowIdArray;
|
||||
ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
||||
|
||||
// set params
|
||||
colOp->initColumn(curCol);
|
||||
// need to pass real dbRoot, partition, and segment to setColParam
|
||||
colOp->setColParam(curCol, 0, colStructList[i].colWidth,
|
||||
colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
|
||||
colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
|
||||
colStructList[i].fColPartition, colStructList[i].fColSegment);
|
||||
|
||||
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
|
||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
|
||||
break;
|
||||
it++;
|
||||
}
|
||||
|
||||
if (it == aColExtsInfo.end()) //add this one to the list
|
||||
{
|
||||
ColExtInfo aExt;
|
||||
aExt.dbRoot =colStructList[i].fColDbRoot;
|
||||
aExt.partNum = colStructList[i].fColPartition;
|
||||
aExt.segNum = colStructList[i].fColSegment;
|
||||
aExt.compType = colStructList[i].fCompressionType;
|
||||
aColExtsInfo.push_back(aExt);
|
||||
aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
||||
}
|
||||
|
||||
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
|
||||
if (rc != NO_ERROR)
|
||||
break;
|
||||
|
||||
// handling versioning
|
||||
vector<LBIDRange> rangeList;
|
||||
if (versioning)
|
||||
{
|
||||
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
|
||||
colStructList[i].colWidth, totalRow, firstPart, rangeList);
|
||||
if (rc != NO_ERROR) {
|
||||
if (colStructList[i].fCompressionType == 0)
|
||||
{
|
||||
curCol.dataFile.pFile->flush();
|
||||
}
|
||||
|
||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//totalRow1 -= totalRow2;
|
||||
// have to init the size here
|
||||
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
|
||||
uint8_t tmp8;
|
||||
uint16_t tmp16;
|
||||
uint32_t tmp32;
|
||||
float tmpF;
|
||||
double tmpD;
|
||||
for (size_t j = 0; j < totalRow; j++)
|
||||
{
|
||||
uint64_t curValue = colValueList[(totalRow*i) + j];
|
||||
switch (colStructList[i].colType)
|
||||
{
|
||||
case WriteEngine::WR_INT:
|
||||
tmp32 = curValue;
|
||||
((int*)valArray)[j] = tmp32;
|
||||
break;
|
||||
case WriteEngine::WR_UINT:
|
||||
tmp32 = curValue;
|
||||
((uint32_t*)valArray)[j] = tmp32;
|
||||
break;
|
||||
case WriteEngine::WR_VARBINARY : // treat same as char for now
|
||||
case WriteEngine::WR_CHAR:
|
||||
case WriteEngine::WR_BLOB:
|
||||
case WriteEngine::WR_TEXT:
|
||||
switch (colStructList[i].colWidth)
|
||||
{
|
||||
case 1:
|
||||
tmp8 = curValue;
|
||||
((uint8_t*)valArray)[j] = tmp8;
|
||||
break;
|
||||
case 2:
|
||||
tmp16 = curValue;
|
||||
((uint16_t*)valArray)[j] = tmp16;
|
||||
break;
|
||||
case 3:
|
||||
tmp32 = curValue;
|
||||
((uint32_t*)valArray)[j] = tmp32;
|
||||
break;
|
||||
case 4:
|
||||
((uint32_t*)valArray)[j] = curValue;
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
case WriteEngine::WR_FLOAT:
|
||||
tmp32 = curValue;
|
||||
memcpy(&((float*)valArray)[j], &tmp32, 4);
|
||||
break;
|
||||
case WriteEngine::WR_DOUBLE:
|
||||
memcpy(&((double*)valArray)[j], &curValue, 8);
|
||||
break;
|
||||
case WriteEngine::WR_BYTE:
|
||||
tmp8 = curValue;
|
||||
((char*)valArray)[j] = tmp8;
|
||||
break;
|
||||
case WriteEngine::WR_UBYTE:
|
||||
tmp8 = curValue;
|
||||
((uint8_t*)valArray)[j] = tmp8;
|
||||
break;
|
||||
case WriteEngine::WR_SHORT:
|
||||
tmp16 = curValue;
|
||||
((short*)valArray)[j] = tmp16;
|
||||
break;
|
||||
case WriteEngine::WR_USHORT:
|
||||
tmp32 = curValue;
|
||||
((uint16_t*)valArray)[j] = tmp16;
|
||||
break;
|
||||
case WriteEngine::WR_LONGLONG:
|
||||
tmp32 = curValue;
|
||||
((long long*)valArray)[j] = tmp32;
|
||||
break;
|
||||
case WriteEngine::WR_ULONGLONG:
|
||||
((uint64_t*)valArray)[j] = curValue;
|
||||
break;
|
||||
case WriteEngine::WR_TOKEN:
|
||||
memcpy(&((Token*)valArray)[j], &curValue, 8);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.start("writeRow ");
|
||||
#endif
|
||||
rc = colOp->writeRow(curCol, totalRow, firstPart, valArray);
|
||||
#ifdef PROFILE
|
||||
timer.stop("writeRow ");
|
||||
#endif
|
||||
colOp->closeColumnFile(curCol);
|
||||
|
||||
if (versioning)
|
||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
||||
|
||||
// check error
|
||||
if (rc != NO_ERROR)
|
||||
break;
|
||||
|
||||
} // end of for (i = 0
|
||||
if (valArray != NULL)
|
||||
free(valArray);
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.finish();
|
||||
#endif
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
int WriteEngineWrapper::writeColumnRec(const TxnID& txnid,
|
||||
const ColStructList& colStructList,
|
||||
const ColValueList& colValueList,
|
||||
|
Reference in New Issue
Block a user