You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-01 06:46:55 +03:00
MCOL-1675 When insert record calculate HWM using a column with the smallest width instead of the first column in the same way as in MCOL-984.
This commit is contained in:
@ -182,6 +182,37 @@ int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colS
|
|||||||
return NO_ERROR;
|
return NO_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*@brief findSmallestColumn --Find the smallest column for this table
|
||||||
|
*/
|
||||||
|
/***********************************************************
|
||||||
|
* DESCRIPTION:
|
||||||
|
* Find the smallest column for this table
|
||||||
|
* PARAMETERS:
|
||||||
|
* lowColLen - returns smallest column width
|
||||||
|
* colId - returns smallest column id
|
||||||
|
* colStructList - column struct list
|
||||||
|
* RETURN:
|
||||||
|
* void
|
||||||
|
***********************************************************/
|
||||||
|
void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList)
|
||||||
|
// MCOL-1675: find the smallest column width to calculate the RowID from so
|
||||||
|
// that all HWMs will be incremented by this operation
|
||||||
|
{
|
||||||
|
int32_t lowColLen = 8192;
|
||||||
|
for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
|
||||||
|
{
|
||||||
|
if (colStructList[colIt].colWidth < lowColLen)
|
||||||
|
{
|
||||||
|
colId = colIt;
|
||||||
|
lowColLen = colStructList[colId].colWidth;
|
||||||
|
if ( lowColLen == 1 )
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*@convertValArray - Convert interface values to internal values
|
/*@convertValArray - Convert interface values to internal values
|
||||||
*/
|
*/
|
||||||
/***********************************************************
|
/***********************************************************
|
||||||
@ -847,6 +878,11 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
|||||||
for (i = 0; i < colStructList.size(); i++)
|
for (i = 0; i < colStructList.size(); i++)
|
||||||
Convertor::convertColType(&colStructList[i]);
|
Convertor::convertColType(&colStructList[i]);
|
||||||
|
|
||||||
|
uint32_t colId = 0;
|
||||||
|
// MCOL-1675: find the smallest column width to calculate the RowID from so
|
||||||
|
// that all HWMs will be incremented by this operation
|
||||||
|
findSmallestColumn(colId, colStructList);
|
||||||
|
|
||||||
// rc = checkValid(txnid, colStructList, colValueList, ridList);
|
// rc = checkValid(txnid, colStructList, colValueList, ridList);
|
||||||
// if (rc != NO_ERROR)
|
// if (rc != NO_ERROR)
|
||||||
// return rc;
|
// return rc;
|
||||||
@ -873,8 +909,8 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
|||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
if (isFirstBatchPm)
|
if (isFirstBatchPm)
|
||||||
{
|
{
|
||||||
currentDBrootIdx = dbRootExtentTrackers[0]->getCurrentDBRootIdx();
|
currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
|
||||||
extentInfo = dbRootExtentTrackers[0]->getDBRootExtentList();
|
extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
|
||||||
dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
|
dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
|
||||||
partitionNum = extentInfo[currentDBrootIdx].fPartition;
|
partitionNum = extentInfo[currentDBrootIdx].fPartition;
|
||||||
|
|
||||||
@ -914,7 +950,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
|||||||
{
|
{
|
||||||
colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
||||||
colOp->initColumn(curCol);
|
colOp->initColumn(curCol);
|
||||||
colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
|
colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType,
|
||||||
colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
|
colStructList[i].colType, colStructList[i].dataOid, colStructList[i].fCompressionType,
|
||||||
dbRoot, partitionNum, segmentNum);
|
dbRoot, partitionNum, segmentNum);
|
||||||
rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
|
rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid, extents[i].allocSize, dbRoot,
|
||||||
@ -1040,7 +1076,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
|||||||
} // if (isFirstBatchPm)
|
} // if (isFirstBatchPm)
|
||||||
else //get the extent info from tableMetaData
|
else //get the extent info from tableMetaData
|
||||||
{
|
{
|
||||||
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid);
|
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
|
||||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||||
while (it != aColExtsInfo.end())
|
while (it != aColExtsInfo.end())
|
||||||
{
|
{
|
||||||
@ -1073,7 +1109,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
|||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// allocate row id(s)
|
// allocate row id(s)
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
curColStruct = colStructList[0];
|
curColStruct = colStructList[colId];
|
||||||
colOp = m_colOp[op(curColStruct.fCompressionType)];
|
colOp = m_colOp[op(curColStruct.fCompressionType)];
|
||||||
|
|
||||||
colOp->initColumn(curCol);
|
colOp->initColumn(curCol);
|
||||||
@ -1084,23 +1120,27 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
|||||||
vector<ExtentInfo> fileInfo;
|
vector<ExtentInfo> fileInfo;
|
||||||
dbRoot = curColStruct.fColDbRoot;
|
dbRoot = curColStruct.fColDbRoot;
|
||||||
//use the first column to calculate row id
|
//use the first column to calculate row id
|
||||||
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid);
|
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
|
||||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||||
while (it != aColExtsInfo.end())
|
while (it != aColExtsInfo.end())
|
||||||
{
|
{
|
||||||
if ((it->dbRoot == colStructList[0].fColDbRoot) && (it->partNum == colStructList[0].fColPartition) && (it->segNum == colStructList[0].fColSegment) && it->current )
|
if ((it->dbRoot == colStructList[colId].fColDbRoot) &&
|
||||||
|
(it->partNum == colStructList[colId].fColPartition) &&
|
||||||
|
(it->segNum == colStructList[colId].fColSegment) && it->current )
|
||||||
|
{
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
it++;
|
it++;
|
||||||
}
|
}
|
||||||
if (it != aColExtsInfo.end())
|
if (it != aColExtsInfo.end())
|
||||||
{
|
{
|
||||||
hwm = it->hwm;
|
hwm = it->hwm;
|
||||||
//cout << "Got from colextinfo hwm for oid " << colStructList[0].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
|
//cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
oldHwm = hwm; //Save this info for rollback
|
oldHwm = hwm; //Save this info for rollback
|
||||||
//need to pass real dbRoot, partition, and segment to setColParam
|
//need to pass real dbRoot, partition, and segment to setColParam
|
||||||
colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType,
|
colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
|
||||||
curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
|
curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
|
||||||
curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
|
curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
|
||||||
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
|
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
|
||||||
@ -1123,13 +1163,13 @@ timer.start("allocRowId");
|
|||||||
if (idbdatafile::IDBPolicy::useHdfs())
|
if (idbdatafile::IDBPolicy::useHdfs())
|
||||||
insertSelect = true;
|
insertSelect = true;
|
||||||
|
|
||||||
rc = colOp->allocRowId(txnid, bUseStartExtent,
|
rc = colOp->allocRowId(txnid, bUseStartExtent,
|
||||||
curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
|
curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
|
||||||
newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
|
newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
|
||||||
|
|
||||||
//cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
|
//cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
|
||||||
// cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
|
//cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
|
||||||
if (rc != NO_ERROR) //Clean up is already done
|
if (rc != NO_ERROR) //Clean up is already done
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
#ifdef PROFILE
|
#ifdef PROFILE
|
||||||
@ -1139,14 +1179,17 @@ timer.stop("allocRowId");
|
|||||||
// Expand initial abbreviated extent if any RID in 1st extent is > 256K.
|
// Expand initial abbreviated extent if any RID in 1st extent is > 256K.
|
||||||
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
||||||
if ((curCol.dataFile.fPartition == 0) &&
|
if ((curCol.dataFile.fPartition == 0) &&
|
||||||
(curCol.dataFile.fSegment == 0) &&
|
(curCol.dataFile.fSegment == 0) &&
|
||||||
((totalRow-rowsLeft) > 0) &&
|
((totalRow-rowsLeft) > 0) &&
|
||||||
(rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
|
(rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
|
||||||
{
|
{
|
||||||
for (unsigned k=1; k<colStructList.size(); k++)
|
for (unsigned k=0; k<colStructList.size(); k++)
|
||||||
{
|
{
|
||||||
|
// Skip the selected column
|
||||||
|
if (k == colId)
|
||||||
|
continue;
|
||||||
Column expandCol;
|
Column expandCol;
|
||||||
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
||||||
colOp->setColParam(expandCol, 0,
|
colOp->setColParam(expandCol, 0,
|
||||||
@ -1505,18 +1548,10 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
|||||||
for (i = 0; i < colStructList.size(); i++)
|
for (i = 0; i < colStructList.size(); i++)
|
||||||
Convertor::convertColType(&colStructList[i]);
|
Convertor::convertColType(&colStructList[i]);
|
||||||
|
|
||||||
// MCOL-984: find the smallest column width to calculate the RowID from so
|
uint32_t colId = 0;
|
||||||
// that all HWMs will be incremented by this operation
|
// MCOL-1675: find the smallest column width to calculate the RowID from so
|
||||||
int32_t lowColLen = 8192;
|
// that all HWMs will be incremented by this operation
|
||||||
int32_t colId = 0;
|
findSmallestColumn(colId, colStructList);
|
||||||
for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
|
|
||||||
{
|
|
||||||
if (colStructList[colIt].colWidth < lowColLen)
|
|
||||||
{
|
|
||||||
colId = colIt;
|
|
||||||
lowColLen = colStructList[colId].colWidth;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// rc = checkValid(txnid, colStructList, colValueList, ridList);
|
// rc = checkValid(txnid, colStructList, colValueList, ridList);
|
||||||
// if (rc != NO_ERROR)
|
// if (rc != NO_ERROR)
|
||||||
@ -1809,7 +1844,7 @@ timer.stop("allocRowId");
|
|||||||
// Expand initial abbreviated extent if any RID in 1st extent is > 256K.
|
// Expand initial abbreviated extent if any RID in 1st extent is > 256K.
|
||||||
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
||||||
if ((curCol.dataFile.fPartition == 0) &&
|
if ((curCol.dataFile.fPartition == 0) &&
|
||||||
(curCol.dataFile.fSegment == 0) &&
|
(curCol.dataFile.fSegment == 0) &&
|
||||||
((totalRow-rowsLeft) > 0) &&
|
((totalRow-rowsLeft) > 0) &&
|
||||||
@ -1821,7 +1856,8 @@ timer.stop("allocRowId");
|
|||||||
if (k == colId)
|
if (k == colId)
|
||||||
continue;
|
continue;
|
||||||
Column expandCol;
|
Column expandCol;
|
||||||
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
||||||
|
// Shouldn't we change 0 to colId here?
|
||||||
colOp->setColParam(expandCol, 0,
|
colOp->setColParam(expandCol, 0,
|
||||||
colStructList[k].colWidth,
|
colStructList[k].colWidth,
|
||||||
colStructList[k].colDataType,
|
colStructList[k].colDataType,
|
||||||
@ -2782,6 +2818,11 @@ StopWatch timer;
|
|||||||
for (i = 0; i < colStructList.size(); i++)
|
for (i = 0; i < colStructList.size(); i++)
|
||||||
Convertor::convertColType(&colStructList[i]);
|
Convertor::convertColType(&colStructList[i]);
|
||||||
|
|
||||||
|
uint32_t colId = 0;
|
||||||
|
// MCOL-1675: find the smallest column width to calculate the RowID from so
|
||||||
|
// that all HWMs will be incremented by this operation
|
||||||
|
findSmallestColumn(colId, colStructList);
|
||||||
|
|
||||||
rc = checkValid(txnid, colStructList, colValueList, ridList);
|
rc = checkValid(txnid, colStructList, colValueList, ridList);
|
||||||
if (rc != NO_ERROR)
|
if (rc != NO_ERROR)
|
||||||
return rc;
|
return rc;
|
||||||
@ -2799,7 +2840,7 @@ StopWatch timer;
|
|||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// allocate row id(s)
|
// allocate row id(s)
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
curColStruct = colStructList[0];
|
curColStruct = colStructList[colId];
|
||||||
colOp = m_colOp[op(curColStruct.fCompressionType)];
|
colOp = m_colOp[op(curColStruct.fCompressionType)];
|
||||||
|
|
||||||
colOp->initColumn(curCol);
|
colOp->initColumn(curCol);
|
||||||
@ -2834,7 +2875,7 @@ StopWatch timer;
|
|||||||
|
|
||||||
oldHwm = hwm; //Save this info for rollback
|
oldHwm = hwm; //Save this info for rollback
|
||||||
//need to pass real dbRoot, partition, and segment to setColParam
|
//need to pass real dbRoot, partition, and segment to setColParam
|
||||||
colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType,
|
colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
|
||||||
curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
|
curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
|
||||||
dbRoot, partitionNum, segmentNum);
|
dbRoot, partitionNum, segmentNum);
|
||||||
|
|
||||||
@ -2944,13 +2985,15 @@ timer.stop("allocRowId");
|
|||||||
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
// if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
||||||
if ((colStructList[0].fColPartition == 0) &&
|
if ((colStructList[colId].fColPartition == 0) &&
|
||||||
(colStructList[0].fColSegment == 0) &&
|
(colStructList[colId].fColSegment == 0) &&
|
||||||
((totalRow-rowsLeft) > 0) &&
|
((totalRow-rowsLeft) > 0) &&
|
||||||
(rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
|
(rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
|
||||||
{
|
{
|
||||||
for (unsigned k=1; k<colStructList.size(); k++)
|
for (unsigned k=0; k<colStructList.size(); k++)
|
||||||
{
|
{
|
||||||
|
if (k == colId)
|
||||||
|
continue;
|
||||||
Column expandCol;
|
Column expandCol;
|
||||||
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
colOp = m_colOp[op(colStructList[k].fCompressionType)];
|
||||||
colOp->setColParam(expandCol, 0,
|
colOp->setColParam(expandCol, 0,
|
||||||
|
@ -607,6 +607,11 @@ private:
|
|||||||
*/
|
*/
|
||||||
int checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const;
|
int checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Find the smallest column for this table
|
||||||
|
*/
|
||||||
|
void findSmallestColumn(uint32_t &colId, ColStructList colStructList);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Convert interface column type to a internal column type
|
* @brief Convert interface column type to a internal column type
|
||||||
*/
|
*/
|
||||||
|
Reference in New Issue
Block a user