1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-04 04:42:30 +03:00

Merge branch 'develop-1.1' into MCOL-1160

This commit is contained in:
David.Hall
2018-01-30 15:32:25 -06:00
committed by GitHub
47 changed files with 1013 additions and 140 deletions

View File

@ -1475,6 +1475,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
Column curCol;
ColStruct curColStruct;
ColStructList newColStructList;
std::vector<uint64_t> colNewValueList;
DctnryStructList newDctnryStructList;
HWM hwm = 0;
HWM oldHwm = 0;
@ -2058,6 +2059,19 @@ timer.stop("tokenize");
tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
}
//--------------------------------------------------------------------------
//Prepare the valuelist for the new extent
//--------------------------------------------------------------------------
for (unsigned i=1; i <= totalColumns; i++)
{
// Copy values to second value list
for (uint64_t j=rowsLeft; j > 0; j--)
{
colNewValueList.push_back(colValueList[(totalRow*i)-j]);
}
}
// end of allocate row id
#ifdef PROFILE
@ -2094,6 +2108,22 @@ timer.start("writeColumnRec");
}
}
}
// If we create a new extent for this batch
for (unsigned i = 0; i < newColStructList.size(); i++)
{
colOp = m_colOp[op(newColStructList[i].fCompressionType)];
width = newColStructList[i].colWidth;
successFlag = colOp->calculateRowId(lastRidNew , BYTE_PER_BLOCK/width, width, curFbo, curBio);
if (successFlag) {
if (curFbo != lastFbo) {
RETURN_ON_ERROR(AddLBIDtoList(txnid,
lbids,
colDataTypes,
newColStructList[i],
curFbo));
}
}
}
}
if (lbids.size() > 0)
@ -2104,7 +2134,7 @@ timer.start("writeColumnRec");
//----------------------------------------------------------------------
bool versioning = !(isAutoCommitOn && insertSelect);
AddDictToList(txnid, dictLbids);
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
}
return rc;
}
@ -4568,6 +4598,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
std::vector<uint64_t>& colValueList,
RID* rowIdArray,
const ColStructList& newColStructList,
std::vector<uint64_t>& newColValueList,
const int32_t tableOid,
bool useTmpSuffix,
bool versioning)
@ -4578,7 +4609,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
Column curCol;
ColStructList::size_type totalColumn;
ColStructList::size_type i;
size_t totalRow;
size_t totalRow1, totalRow2;
setTransId(txnid);
@ -4586,11 +4617,21 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
#ifdef PROFILE
StopWatch timer;
#endif
totalRow = colValueList.size() / totalColumn;
valArray = malloc(sizeof(uint64_t) * totalRow);
totalRow1 = colValueList.size() / totalColumn;
if (newColValueList.size() > 0)
{
totalRow2 = newColValueList.size() / newColStructList.size();
totalRow1 -= totalRow2;
}
else
{
totalRow2 = 0;
}
if (totalRow == 0)
valArray = malloc(sizeof(uint64_t) * totalRow1);
if (totalRow1 == 0)
return rc;
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
@ -4638,7 +4679,7 @@ StopWatch timer;
if (versioning)
{
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
colStructList[i].colWidth, totalRow, firstPart, rangeList);
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
if (rc != NO_ERROR) {
if (colStructList[i].fCompressionType == 0)
{
@ -4656,9 +4697,9 @@ StopWatch timer;
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
for (size_t j = 0; j < totalRow; j++)
for (size_t j = 0; j < totalRow1; j++)
{
uint64_t curValue = colValueList[(totalRow*i) + j];
uint64_t curValue = colValueList[((totalRow1 + totalRow2)*i) + j];
switch (colStructList[i].colType)
{
case WriteEngine::WR_VARBINARY : // treat same as char for now
@ -4696,7 +4737,7 @@ StopWatch timer;
#ifdef PROFILE
timer.start("writeRow ");
#endif
rc = colOp->writeRow(curCol, totalRow, firstPart, valArray);
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
#ifdef PROFILE
timer.stop("writeRow ");
#endif
@ -4711,7 +4752,135 @@ timer.stop("writeRow ");
} // end of for (i = 0
if (valArray != NULL)
{
free(valArray);
valArray = NULL;
}
// MCOL-1176 - Write second extent
if (totalRow2)
{
valArray = malloc(sizeof(uint64_t) * totalRow2);
for (i = 0; i < newColStructList.size(); i++)
{
//@Bug 2205 Check if all rows go to the new extent
//Write the first batch
RID * secondPart = rowIdArray + totalRow1;
ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
// set params
colOp->initColumn(curCol);
// need to pass real dbRoot, partition, and segment to setColParam
colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
newColStructList[i].fColPartition, newColStructList[i].fColSegment);
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
ColExtsInfo::iterator it = aColExtsInfo.begin();
while (it != aColExtsInfo.end())
{
if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
break;
it++;
}
if (it == aColExtsInfo.end()) //add this one to the list
{
ColExtInfo aExt;
aExt.dbRoot = newColStructList[i].fColDbRoot;
aExt.partNum = newColStructList[i].fColPartition;
aExt.segNum = newColStructList[i].fColSegment;
aExt.compType = newColStructList[i].fCompressionType;
aColExtsInfo.push_back(aExt);
aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
}
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
if (rc != NO_ERROR)
break;
// handling versioning
vector<LBIDRange> rangeList;
if (versioning)
{
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
if (rc != NO_ERROR) {
if (newColStructList[i].fCompressionType == 0)
{
curCol.dataFile.pFile->flush();
}
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
break;
}
}
//totalRow1 -= totalRow2;
// have to init the size here
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
uint8_t tmp8;
uint16_t tmp16;
uint32_t tmp32;
for (size_t j = 0; j < totalRow2; j++)
{
uint64_t curValue = newColValueList[(totalRow2*i) + j];
switch (newColStructList[i].colType)
{
case WriteEngine::WR_VARBINARY : // treat same as char for now
case WriteEngine::WR_CHAR:
case WriteEngine::WR_BLOB:
case WriteEngine::WR_TEXT:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_INT:
case WriteEngine::WR_UINT:
case WriteEngine::WR_FLOAT:
tmp32 = curValue;
((uint32_t*)valArray)[j] = tmp32;
break;
case WriteEngine::WR_ULONGLONG:
case WriteEngine::WR_LONGLONG:
case WriteEngine::WR_DOUBLE:
case WriteEngine::WR_TOKEN:
((uint64_t*)valArray)[j] = curValue;
break;
case WriteEngine::WR_BYTE:
case WriteEngine::WR_UBYTE:
tmp8 = curValue;
((uint8_t*)valArray)[j] = tmp8;
break;
case WriteEngine::WR_SHORT:
case WriteEngine::WR_USHORT:
tmp16 = curValue;
((uint16_t*)valArray)[j] = tmp16;
break;
}
}
#ifdef PROFILE
timer.start("writeRow ");
#endif
rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
#ifdef PROFILE
timer.stop("writeRow ");
#endif
colOp->closeColumnFile(curCol);
if (versioning)
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
// check error
if (rc != NO_ERROR)
break;
} // end of for (i = 0
}
if (valArray != NULL)
free(valArray);
#ifdef PROFILE
timer.finish();