1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-984 Fix API bulk insert rowID/HWM calulation

The rowID and therefore HWM for an insert was being calculated based on
the first column. If there are smaller columns in the table these will
insert in the middle of blocks instead of creating new blocks. This means
that we would no longer be crash safe and PrimProc gets very confused until
a cache flush.

We now use the smallest column to calculate the rowID and HWM increment
(as cpimport does).
This commit is contained in:
Andrew Hutchings
2017-10-26 11:19:41 +01:00
parent a116455715
commit 7aa588f523

View File

@ -1728,7 +1728,20 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
//--------------------------------------------------------------------------
// allocate row id(s)
//--------------------------------------------------------------------------
curColStruct = colStructList[0];
// MCOL-984: find the smallest column width to calculate the RowID from so
// that all HWMs will be incremented by this operation
int32_t lowColLen = 8192;
int32_t colId = 0;
for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
{
if (colStructList[colIt].colWidth < lowColLen)
{
colId = colIt;
lowColLen = colStructList[colId].colWidth;
curColStruct = colStructList[colId];
}
}
colOp = m_colOp[op(curColStruct.fCompressionType)];
colOp->initColumn(curCol);
@ -1738,12 +1751,12 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
vector<ExtentInfo> dictExtentInfo; //Save those empty extents in case of failure to rollback
vector<ExtentInfo> fileInfo;
dbRoot = curColStruct.fColDbRoot;
//use the first column to calculate row id
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[0].dataOid);
//use the smallest column to calculate row id
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
if ((it->dbRoot == colStructList[0].fColDbRoot) && (it->partNum == colStructList[0].fColPartition) && (it->segNum == colStructList[0].fColSegment) && it->current )
if ((it->dbRoot == colStructList[colId].fColDbRoot) && (it->partNum == colStructList[colId].fColPartition) && (it->segNum == colStructList[colId].fColSegment) && it->current )
break;
it++;
}
@ -1755,7 +1768,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
oldHwm = hwm; //Save this info for rollback
//need to pass real dbRoot, partition, and segment to setColParam
colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType,
colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType,
curColStruct.colType, curColStruct.dataOid, curColStruct.fCompressionType,
curColStruct.fColDbRoot, curColStruct.fColPartition, curColStruct.fColSegment);
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix); // @bug 5572 HDFS tmp file
@ -1800,8 +1813,11 @@ timer.stop("allocRowId");
((totalRow-rowsLeft) > 0) &&
(rowIdArray[totalRow-rowsLeft-1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
{
for (unsigned k=1; k<colStructList.size(); k++)
for (unsigned k=0; k<colStructList.size(); k++)
{
// Skip the selected column
if (k == colId)
continue;
Column expandCol;
colOp = m_colOp[op(colStructList[k].fCompressionType)];
colOp->setColParam(expandCol, 0,