1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1176 Fix API extent rollover

When the API inserts data into ColumnStore which will roll over into a
new extent that data wasn't being put into the new extent and corruption
occured. This patch now tracks the additional data and inserts it into
the new extent. It also makes sure the LBIDs are stored so that they are
correctly committed.
This commit is contained in:
Andrew Hutchings
2018-01-30 11:46:47 +00:00
parent 1e56c7f41a
commit 63f8e1ce71
2 changed files with 179 additions and 9 deletions

View File

@ -1475,6 +1475,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
Column curCol;
ColStruct curColStruct;
ColStructList newColStructList;
std::vector<uint64_t> colNewValueList;
DctnryStructList newDctnryStructList;
HWM hwm = 0;
HWM oldHwm = 0;
@ -2055,6 +2056,19 @@ timer.stop("tokenize");
tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
}
//--------------------------------------------------------------------------
//Prepare the valuelist for the new extent
//--------------------------------------------------------------------------
for (unsigned i=1; i <= totalColumns; i++)
{
// Copy values to second value list
for (uint64_t j=rowsLeft; j > 0; j--)
{
colNewValueList.push_back(colValueList[(totalRow*i)-j]);
}
}
// end of allocate row id
#ifdef PROFILE
@ -2091,6 +2105,22 @@ timer.start("writeColumnRec");
}
}
}
// If we create a new extent for this batch
for (unsigned i = 0; i < newColStructList.size(); i++)
{
colOp = m_colOp[op(newColStructList[i].fCompressionType)];
width = newColStructList[i].colWidth;
successFlag = colOp->calculateRowId(lastRidNew , BYTE_PER_BLOCK/width, width, curFbo, curBio);
if (successFlag) {
if (curFbo != lastFbo) {
RETURN_ON_ERROR(AddLBIDtoList(txnid,
lbids,
colDataTypes,
newColStructList[i],
curFbo));
}
}
}
}
if (lbids.size() > 0)
@ -2100,7 +2130,7 @@ timer.start("writeColumnRec");
// Write row(s) to database file(s)
//----------------------------------------------------------------------
bool versioning = !(isAutoCommitOn && insertSelect);
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
}
return rc;
}
@ -4564,6 +4594,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
std::vector<uint64_t>& colValueList,
RID* rowIdArray,
const ColStructList& newColStructList,
std::vector<uint64_t>& newColValueList,
const int32_t tableOid,
bool useTmpSuffix,
bool versioning)
@ -4574,7 +4605,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
Column curCol;
ColStructList::size_type totalColumn;
ColStructList::size_type i;
size_t totalRow;
size_t totalRow1, totalRow2;
setTransId(txnid);
@ -4582,11 +4613,21 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
#ifdef PROFILE
StopWatch timer;
#endif
totalRow = colValueList.size() / totalColumn;
valArray = malloc(sizeof(uint64_t) * totalRow);
totalRow1 = colValueList.size() / totalColumn;
if (newColValueList.size() > 0)
{
totalRow2 = newColValueList.size() / newColStructList.size();
totalRow1 -= totalRow2;
}
else
{
totalRow2 = 0;
}
if (totalRow == 0)
valArray = malloc(sizeof(uint64_t) * totalRow1);
if (totalRow1 == 0)
return rc;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
@ -4634,7 +4675,7 @@ StopWatch timer;
if (versioning)
{
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
colStructList[i].colWidth, totalRow, firstPart, rangeList);
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
if (rc != NO_ERROR) {
if (colStructList[i].fCompressionType == 0)
{
@ -4652,9 +4693,9 @@ StopWatch timer;
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
for (size_t j = 0; j < totalRow; j++)
for (size_t j = 0; j < totalRow1; j++)
{
uint64_t curValue = colValueList[(totalRow*i) + j];
uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j];
switch (colStructList[i].colType)
{
case WriteEngine::WR_VARBINARY : // treat same as char for now
@ -4692,7 +4733,7 @@ StopWatch timer;
#ifdef PROFILE
timer.start("writeRow ");
#endif
rc = colOp->writeRow(curCol, totalRow, firstPart, valArray);
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
#ifdef PROFILE
timer.stop("writeRow ");
#endif
@ -4707,7 +4748,135 @@ timer.stop("writeRow ");
} // end of for (i = 0
if (valArray != NULL)
{
free(valArray);
valArray = NULL;
}
// MCOL-1176 - Write second extent
if (totalRow2)
{
valArray = malloc(sizeof(uint64_t) * totalRow2);
for (i = 0; i < newColStructList.size(); i++)
{
//@Bug 2205 Check if all rows go to the new extent
//Write the first batch
RID * secondPart = rowIdArray + totalRow1;
ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
// set params
colOp->initColumn(curCol);
// need to pass real dbRoot, partition, and segment to setColParam
colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
newColStructList[i].fColPartition, newColStructList[i].fColSegment);
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
break;
it++;
}
if (it == aColExtsInfo.end()) //add this one to the list
{
ColExtInfo aExt;
aExt.dbRoot = newColStructList[i].fColDbRoot;
aExt.partNum = newColStructList[i].fColPartition;
aExt.segNum = newColStructList[i].fColSegment;
aExt.compType = newColStructList[i].fCompressionType;
aColExtsInfo.push_back(aExt);
aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
}
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
if (rc != NO_ERROR)
break;
// handling versioning
vector<LBIDRange> rangeList;
if (versioning)
{
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
if (rc != NO_ERROR) {
if (newColStructList[i].fCompressionType == 0)
{
curCol.dataFile.pFile->flush();
}
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
break;
}
}
//totalRow1 -= totalRow2;
// have to init the size here
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
for (size_t j = 0; j < totalRow2; j++)
{
uint64_t curValue = newColValueList[(totalRow2*i) + j];
switch (newColStructList[i].colType)
{
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_CHAR:
case WriteEngine::WR_BLOB:
case WriteEngine::WR_TEXT:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_INT:
case WriteEngine::WR_UINT:
case WriteEngine::WR_FLOAT:
tmp32 = curValue;
((uint32_t*)valArray)[j] = tmp32;
break;
case WriteEngine::WR_ULONGLONG:
case WriteEngine::WR_LONGLONG:
case WriteEngine::WR_DOUBLE:
case WriteEngine::WR_TOKEN:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_BYTE:
case WriteEngine::WR_UBYTE:
tmp8 = curValue;
((uint8_t*)valArray)[j] = tmp8;
break;
case WriteEngine::WR_SHORT:
case WriteEngine::WR_USHORT:
tmp16 = curValue;
((uint16_t*)valArray)[j] = tmp16;
break;
}
}
#ifdef PROFILE
timer.start("writeRow ");
#endif
rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
#ifdef PROFILE
timer.stop("writeRow ");
#endif
colOp->closeColumnFile(curCol);
if (versioning)
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
// check error
if (rc != NO_ERROR)
break;
} // end of for (i = 0
}
if (valArray != NULL)
free(valArray);
#ifdef PROFILE
timer.finish();