You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-1408 Multiple API HWM boundary fixes
Fixes the following: * Generate error if calculateRowId fails * No data written when first extent is completely full on a write, all data going to second extent. * 0 byte valArray malloc * valArray free() on no malloc * Column touched but no data written if all data going to second extent * Wrong colWidth used on second extent calculateRowId * Out of bounds memory write (crash) when no data for first extent * Extent not committed if all data going to second extent
This commit is contained in:
@ -2008,7 +2008,6 @@ timer.stop("tokenize");
|
|||||||
if (it != aColExtsInfo.end()) //update hwm info
|
if (it != aColExtsInfo.end()) //update hwm info
|
||||||
{
|
{
|
||||||
oldHwm = it->hwm;
|
oldHwm = it->hwm;
|
||||||
}
|
|
||||||
|
|
||||||
// save hwm for the old extent
|
// save hwm for the old extent
|
||||||
colWidth = colStructList[i].colWidth;
|
colWidth = colStructList[i].colWidth;
|
||||||
@ -2032,6 +2031,7 @@ timer.stop("tokenize");
|
|||||||
else
|
else
|
||||||
return ERR_INVALID_PARAM;
|
return ERR_INVALID_PARAM;
|
||||||
|
|
||||||
|
}
|
||||||
//update hwm for the new extent
|
//update hwm for the new extent
|
||||||
if (newExtent)
|
if (newExtent)
|
||||||
{
|
{
|
||||||
@ -2043,6 +2043,7 @@ timer.stop("tokenize");
|
|||||||
break;
|
break;
|
||||||
it++;
|
it++;
|
||||||
}
|
}
|
||||||
|
colWidth = newColStructList[i].colWidth;
|
||||||
succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio);
|
succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK/colWidth, colWidth, curFbo, curBio);
|
||||||
if (succFlag)
|
if (succFlag)
|
||||||
{
|
{
|
||||||
@ -2107,6 +2108,9 @@ timer.start("writeColumnRec");
|
|||||||
curFbo));
|
curFbo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
return ERR_INVALID_PARAM;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// If we create a new extent for this batch
|
// If we create a new extent for this batch
|
||||||
for (unsigned i = 0; i < newColStructList.size(); i++)
|
for (unsigned i = 0; i < newColStructList.size(); i++)
|
||||||
@ -2123,7 +2127,8 @@ timer.start("writeColumnRec");
|
|||||||
curFbo));
|
curFbo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
else
|
||||||
|
return ERR_INVALID_PARAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lbids.size() > 0)
|
if (lbids.size() > 0)
|
||||||
@ -4604,7 +4609,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
|||||||
bool versioning)
|
bool versioning)
|
||||||
{
|
{
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
void* valArray;
|
void* valArray = NULL;
|
||||||
string segFile;
|
string segFile;
|
||||||
Column curCol;
|
Column curCol;
|
||||||
ColStructList::size_type totalColumn;
|
ColStructList::size_type totalColumn;
|
||||||
@ -4629,132 +4634,135 @@ StopWatch timer;
|
|||||||
totalRow2 = 0;
|
totalRow2 = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
valArray = malloc(sizeof(uint64_t) * totalRow1);
|
// It is possible totalRow1 is zero but totalRow2 has values
|
||||||
|
if ((totalRow1 == 0) && (totalRow2 == 0))
|
||||||
if (totalRow1 == 0)
|
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
|
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
|
||||||
for (i = 0; i < totalColumn; i++)
|
if (totalRow1)
|
||||||
{
|
{
|
||||||
//@Bug 2205 Check if all rows go to the new extent
|
valArray = malloc(sizeof(uint64_t) * totalRow1);
|
||||||
//Write the first batch
|
for (i = 0; i < totalColumn; i++)
|
||||||
RID * firstPart = rowIdArray;
|
|
||||||
ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
|
||||||
|
|
||||||
// set params
|
|
||||||
colOp->initColumn(curCol);
|
|
||||||
// need to pass real dbRoot, partition, and segment to setColParam
|
|
||||||
colOp->setColParam(curCol, 0, colStructList[i].colWidth,
|
|
||||||
colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
|
|
||||||
colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
|
|
||||||
colStructList[i].fColPartition, colStructList[i].fColSegment);
|
|
||||||
|
|
||||||
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
|
|
||||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
|
||||||
while (it != aColExtsInfo.end())
|
|
||||||
{
|
{
|
||||||
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
|
//@Bug 2205 Check if all rows go to the new extent
|
||||||
break;
|
//Write the first batch
|
||||||
it++;
|
RID * firstPart = rowIdArray;
|
||||||
}
|
ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];
|
||||||
|
|
||||||
if (it == aColExtsInfo.end()) //add this one to the list
|
// set params
|
||||||
{
|
colOp->initColumn(curCol);
|
||||||
ColExtInfo aExt;
|
// need to pass real dbRoot, partition, and segment to setColParam
|
||||||
aExt.dbRoot =colStructList[i].fColDbRoot;
|
colOp->setColParam(curCol, 0, colStructList[i].colWidth,
|
||||||
aExt.partNum = colStructList[i].fColPartition;
|
colStructList[i].colDataType, colStructList[i].colType, colStructList[i].dataOid,
|
||||||
aExt.segNum = colStructList[i].fColSegment;
|
colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
|
||||||
aExt.compType = colStructList[i].fCompressionType;
|
colStructList[i].fColPartition, colStructList[i].fColSegment);
|
||||||
aColExtsInfo.push_back(aExt);
|
|
||||||
aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
|
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
|
||||||
if (rc != NO_ERROR)
|
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||||
break;
|
while (it != aColExtsInfo.end())
|
||||||
|
|
||||||
// handling versioning
|
|
||||||
vector<LBIDRange> rangeList;
|
|
||||||
if (versioning)
|
|
||||||
{
|
|
||||||
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
|
|
||||||
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
|
|
||||||
if (rc != NO_ERROR) {
|
|
||||||
if (colStructList[i].fCompressionType == 0)
|
|
||||||
{
|
|
||||||
curCol.dataFile.pFile->flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//totalRow1 -= totalRow2;
|
|
||||||
// have to init the size here
|
|
||||||
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
|
|
||||||
uint8_t tmp8;
|
|
||||||
uint16_t tmp16;
|
|
||||||
uint32_t tmp32;
|
|
||||||
for (size_t j = 0; j < totalRow1; j++)
|
|
||||||
{
|
|
||||||
uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j];
|
|
||||||
switch (colStructList[i].colType)
|
|
||||||
{
|
{
|
||||||
case WriteEngine::WR_VARBINARY : // treat same as char for now
|
if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
|
||||||
case WriteEngine::WR_CHAR:
|
|
||||||
case WriteEngine::WR_BLOB:
|
|
||||||
case WriteEngine::WR_TEXT:
|
|
||||||
((uint64_t*)valArray)[j] = curValue;
|
|
||||||
break;
|
|
||||||
case WriteEngine::WR_INT:
|
|
||||||
case WriteEngine::WR_UINT:
|
|
||||||
case WriteEngine::WR_FLOAT:
|
|
||||||
tmp32 = curValue;
|
|
||||||
((uint32_t*)valArray)[j] = tmp32;
|
|
||||||
break;
|
|
||||||
case WriteEngine::WR_ULONGLONG:
|
|
||||||
case WriteEngine::WR_LONGLONG:
|
|
||||||
case WriteEngine::WR_DOUBLE:
|
|
||||||
case WriteEngine::WR_TOKEN:
|
|
||||||
((uint64_t*)valArray)[j] = curValue;
|
|
||||||
break;
|
|
||||||
case WriteEngine::WR_BYTE:
|
|
||||||
case WriteEngine::WR_UBYTE:
|
|
||||||
tmp8 = curValue;
|
|
||||||
((uint8_t*)valArray)[j] = tmp8;
|
|
||||||
break;
|
|
||||||
case WriteEngine::WR_SHORT:
|
|
||||||
case WriteEngine::WR_USHORT:
|
|
||||||
tmp16 = curValue;
|
|
||||||
((uint16_t*)valArray)[j] = tmp16;
|
|
||||||
break;
|
break;
|
||||||
|
it++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (it == aColExtsInfo.end()) //add this one to the list
|
||||||
|
{
|
||||||
|
ColExtInfo aExt;
|
||||||
|
aExt.dbRoot =colStructList[i].fColDbRoot;
|
||||||
|
aExt.partNum = colStructList[i].fColPartition;
|
||||||
|
aExt.segNum = colStructList[i].fColSegment;
|
||||||
|
aExt.compType = colStructList[i].fCompressionType;
|
||||||
|
aColExtsInfo.push_back(aExt);
|
||||||
|
aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
|
||||||
|
if (rc != NO_ERROR)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// handling versioning
|
||||||
|
vector<LBIDRange> rangeList;
|
||||||
|
if (versioning)
|
||||||
|
{
|
||||||
|
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
|
||||||
|
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
|
||||||
|
if (rc != NO_ERROR) {
|
||||||
|
if (colStructList[i].fCompressionType == 0)
|
||||||
|
{
|
||||||
|
curCol.dataFile.pFile->flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//totalRow1 -= totalRow2;
|
||||||
|
// have to init the size here
|
||||||
|
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
|
||||||
|
uint8_t tmp8;
|
||||||
|
uint16_t tmp16;
|
||||||
|
uint32_t tmp32;
|
||||||
|
for (size_t j = 0; j < totalRow1; j++)
|
||||||
|
{
|
||||||
|
uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j];
|
||||||
|
switch (colStructList[i].colType)
|
||||||
|
{
|
||||||
|
case WriteEngine::WR_VARBINARY : // treat same as char for now
|
||||||
|
case WriteEngine::WR_CHAR:
|
||||||
|
case WriteEngine::WR_BLOB:
|
||||||
|
case WriteEngine::WR_TEXT:
|
||||||
|
((uint64_t*)valArray)[j] = curValue;
|
||||||
|
break;
|
||||||
|
case WriteEngine::WR_INT:
|
||||||
|
case WriteEngine::WR_UINT:
|
||||||
|
case WriteEngine::WR_FLOAT:
|
||||||
|
tmp32 = curValue;
|
||||||
|
((uint32_t*)valArray)[j] = tmp32;
|
||||||
|
break;
|
||||||
|
case WriteEngine::WR_ULONGLONG:
|
||||||
|
case WriteEngine::WR_LONGLONG:
|
||||||
|
case WriteEngine::WR_DOUBLE:
|
||||||
|
case WriteEngine::WR_TOKEN:
|
||||||
|
((uint64_t*)valArray)[j] = curValue;
|
||||||
|
break;
|
||||||
|
case WriteEngine::WR_BYTE:
|
||||||
|
case WriteEngine::WR_UBYTE:
|
||||||
|
tmp8 = curValue;
|
||||||
|
((uint8_t*)valArray)[j] = tmp8;
|
||||||
|
break;
|
||||||
|
case WriteEngine::WR_SHORT:
|
||||||
|
case WriteEngine::WR_USHORT:
|
||||||
|
tmp16 = curValue;
|
||||||
|
((uint16_t*)valArray)[j] = tmp16;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef PROFILE
|
||||||
|
timer.start("writeRow ");
|
||||||
|
#endif
|
||||||
|
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
|
||||||
|
#ifdef PROFILE
|
||||||
|
timer.stop("writeRow ");
|
||||||
|
#endif
|
||||||
|
colOp->closeColumnFile(curCol);
|
||||||
|
|
||||||
|
if (versioning)
|
||||||
|
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
||||||
|
|
||||||
|
// check error
|
||||||
|
if (rc != NO_ERROR)
|
||||||
|
break;
|
||||||
|
|
||||||
|
} // end of for (i = 0
|
||||||
|
if (valArray != NULL)
|
||||||
|
{
|
||||||
|
free(valArray);
|
||||||
|
valArray = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#ifdef PROFILE
|
|
||||||
timer.start("writeRow ");
|
|
||||||
#endif
|
|
||||||
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
|
|
||||||
#ifdef PROFILE
|
|
||||||
timer.stop("writeRow ");
|
|
||||||
#endif
|
|
||||||
colOp->closeColumnFile(curCol);
|
|
||||||
|
|
||||||
if (versioning)
|
|
||||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
|
||||||
|
|
||||||
// check error
|
|
||||||
if (rc != NO_ERROR)
|
|
||||||
break;
|
|
||||||
|
|
||||||
} // end of for (i = 0
|
|
||||||
if (valArray != NULL)
|
|
||||||
{
|
|
||||||
free(valArray);
|
|
||||||
valArray = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MCOL-1176 - Write second extent
|
// MCOL-1176 - Write second extent
|
||||||
|
Reference in New Issue
Block a user