You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Merge branch 'develop-1.1' into dev-merge-up-20180202
This commit is contained in:
@ -1650,6 +1650,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
Column curCol;
|
||||
ColStruct curColStruct;
|
||||
ColStructList newColStructList;
|
||||
std::vector<uint64_t> colNewValueList;
|
||||
DctnryStructList newDctnryStructList;
|
||||
HWM hwm = 0;
|
||||
HWM oldHwm = 0;
|
||||
@ -1660,6 +1661,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
bool newExtent = false;
|
||||
RIDList ridList;
|
||||
ColumnOp* colOp = NULL;
|
||||
std::vector<BRM::LBID_t> dictLbids;
|
||||
|
||||
// Set tmp file suffix to modify HDFS db file
|
||||
bool useTmpSuffix = false;
|
||||
@ -2115,6 +2117,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
timer.stop("tokenize");
|
||||
#endif
|
||||
memcpy(colValPtr, &dctTuple.token, 8);
|
||||
dictLbids.push_back(dctTuple.token.fbo);
|
||||
}
|
||||
|
||||
dctStr_iter++;
|
||||
@ -2171,6 +2174,7 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
timer.stop("tokenize");
|
||||
#endif
|
||||
memcpy(colValPtr, &dctTuple.token, 8);
|
||||
dictLbids.push_back(dctTuple.token.fbo);
|
||||
}
|
||||
|
||||
dctStr_iter++;
|
||||
@ -2297,6 +2301,19 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
//Prepare the valuelist for the new extent
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
for (unsigned i = 1; i <= totalColumns; i++)
|
||||
{
|
||||
// Copy values to second value list
|
||||
for (uint64_t j = rowsLeft; j > 0; j--)
|
||||
{
|
||||
colNewValueList.push_back(colValueList[(totalRow * i) - j]);
|
||||
}
|
||||
}
|
||||
|
||||
// end of allocate row id
|
||||
|
||||
#ifdef PROFILE
|
||||
@ -2337,6 +2354,26 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we create a new extent for this batch
|
||||
for (unsigned i = 0; i < newColStructList.size(); i++)
|
||||
{
|
||||
colOp = m_colOp[op(newColStructList[i].fCompressionType)];
|
||||
width = newColStructList[i].colWidth;
|
||||
successFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / width, width, curFbo, curBio);
|
||||
|
||||
if (successFlag)
|
||||
{
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
newColStructList[i],
|
||||
curFbo));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (lbids.size() > 0)
|
||||
@ -2346,7 +2383,8 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
// Write row(s) to database file(s)
|
||||
//----------------------------------------------------------------------
|
||||
bool versioning = !(isAutoCommitOn && insertSelect);
|
||||
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
|
||||
AddDictToList(txnid, dictLbids);
|
||||
rc = writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList, colNewValueList, tableOid, useTmpSuffix, versioning); // @bug 5572 HDFS tmp file
|
||||
}
|
||||
|
||||
return rc;
|
||||
@ -5085,6 +5123,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
std::vector<uint64_t>& colValueList,
|
||||
RID* rowIdArray,
|
||||
const ColStructList& newColStructList,
|
||||
std::vector<uint64_t>& newColValueList,
|
||||
const int32_t tableOid,
|
||||
bool useTmpSuffix,
|
||||
bool versioning)
|
||||
@ -5095,7 +5134,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
Column curCol;
|
||||
ColStructList::size_type totalColumn;
|
||||
ColStructList::size_type i;
|
||||
size_t totalRow;
|
||||
size_t totalRow1, totalRow2;
|
||||
|
||||
setTransId(txnid);
|
||||
|
||||
@ -5103,11 +5142,22 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
#ifdef PROFILE
|
||||
StopWatch timer;
|
||||
#endif
|
||||
totalRow = colValueList.size() / totalColumn;
|
||||
|
||||
valArray = malloc(sizeof(uint64_t) * totalRow);
|
||||
totalRow1 = colValueList.size() / totalColumn;
|
||||
|
||||
if (totalRow == 0)
|
||||
if (newColValueList.size() > 0)
|
||||
{
|
||||
totalRow2 = newColValueList.size() / newColStructList.size();
|
||||
totalRow1 -= totalRow2;
|
||||
}
|
||||
else
|
||||
{
|
||||
totalRow2 = 0;
|
||||
}
|
||||
|
||||
valArray = malloc(sizeof(uint64_t) * totalRow1);
|
||||
|
||||
if (totalRow1 == 0)
|
||||
return rc;
|
||||
|
||||
TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);
|
||||
@ -5160,7 +5210,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
if (versioning)
|
||||
{
|
||||
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i],
|
||||
colStructList[i].colWidth, totalRow, firstPart, rangeList);
|
||||
colStructList[i].colWidth, totalRow1, firstPart, rangeList);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
@ -5181,9 +5231,9 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
uint16_t tmp16;
|
||||
uint32_t tmp32;
|
||||
|
||||
for (size_t j = 0; j < totalRow; j++)
|
||||
for (size_t j = 0; j < totalRow1; j++)
|
||||
{
|
||||
uint64_t curValue = colValueList[(totalRow * i) + j];
|
||||
uint64_t curValue = colValueList[((totalRow1 + totalRow2) * i) + j];
|
||||
|
||||
switch (colStructList[i].colType)
|
||||
{
|
||||
@ -5226,7 +5276,7 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
#ifdef PROFILE
|
||||
timer.start("writeRow ");
|
||||
#endif
|
||||
rc = colOp->writeRow(curCol, totalRow, firstPart, valArray);
|
||||
rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
|
||||
#ifdef PROFILE
|
||||
timer.stop("writeRow ");
|
||||
#endif
|
||||
@ -5242,7 +5292,149 @@ int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid,
|
||||
} // end of for (i = 0
|
||||
|
||||
if (valArray != NULL)
|
||||
{
|
||||
free(valArray);
|
||||
valArray = NULL;
|
||||
}
|
||||
|
||||
// MCOL-1176 - Write second extent
|
||||
if (totalRow2)
|
||||
{
|
||||
valArray = malloc(sizeof(uint64_t) * totalRow2);
|
||||
|
||||
for (i = 0; i < newColStructList.size(); i++)
|
||||
{
|
||||
//@Bug 2205 Check if all rows go to the new extent
|
||||
//Write the first batch
|
||||
RID* secondPart = rowIdArray + totalRow1;
|
||||
ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];
|
||||
|
||||
// set params
|
||||
colOp->initColumn(curCol);
|
||||
// need to pass real dbRoot, partition, and segment to setColParam
|
||||
colOp->setColParam(curCol, 0, newColStructList[i].colWidth,
|
||||
newColStructList[i].colDataType, newColStructList[i].colType, newColStructList[i].dataOid,
|
||||
newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
|
||||
newColStructList[i].fColPartition, newColStructList[i].fColSegment);
|
||||
|
||||
ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
|
||||
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
||||
|
||||
while (it != aColExtsInfo.end())
|
||||
{
|
||||
if ((it->dbRoot == newColStructList[i].fColDbRoot) && (it->partNum == newColStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
|
||||
break;
|
||||
|
||||
it++;
|
||||
}
|
||||
|
||||
if (it == aColExtsInfo.end()) //add this one to the list
|
||||
{
|
||||
ColExtInfo aExt;
|
||||
aExt.dbRoot = newColStructList[i].fColDbRoot;
|
||||
aExt.partNum = newColStructList[i].fColPartition;
|
||||
aExt.segNum = newColStructList[i].fColSegment;
|
||||
aExt.compType = newColStructList[i].fCompressionType;
|
||||
aColExtsInfo.push_back(aExt);
|
||||
aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
|
||||
}
|
||||
|
||||
rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
break;
|
||||
|
||||
// handling versioning
|
||||
vector<LBIDRange> rangeList;
|
||||
|
||||
if (versioning)
|
||||
{
|
||||
rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
|
||||
newColStructList[i].colWidth, totalRow2, secondPart, rangeList);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
if (newColStructList[i].fCompressionType == 0)
|
||||
{
|
||||
curCol.dataFile.pFile->flush();
|
||||
}
|
||||
|
||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//totalRow1 -= totalRow2;
|
||||
// have to init the size here
|
||||
// nullArray = (bool*) malloc(sizeof(bool) * totalRow);
|
||||
uint8_t tmp8;
|
||||
uint16_t tmp16;
|
||||
uint32_t tmp32;
|
||||
|
||||
for (size_t j = 0; j < totalRow2; j++)
|
||||
{
|
||||
uint64_t curValue = newColValueList[(totalRow2 * i) + j];
|
||||
|
||||
switch (newColStructList[i].colType)
|
||||
{
|
||||
case WriteEngine::WR_VARBINARY : // treat same as char for now
|
||||
case WriteEngine::WR_CHAR:
|
||||
case WriteEngine::WR_BLOB:
|
||||
case WriteEngine::WR_TEXT:
|
||||
((uint64_t*)valArray)[j] = curValue;
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_INT:
|
||||
case WriteEngine::WR_UINT:
|
||||
case WriteEngine::WR_FLOAT:
|
||||
tmp32 = curValue;
|
||||
((uint32_t*)valArray)[j] = tmp32;
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_ULONGLONG:
|
||||
case WriteEngine::WR_LONGLONG:
|
||||
case WriteEngine::WR_DOUBLE:
|
||||
case WriteEngine::WR_TOKEN:
|
||||
((uint64_t*)valArray)[j] = curValue;
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_BYTE:
|
||||
case WriteEngine::WR_UBYTE:
|
||||
tmp8 = curValue;
|
||||
((uint8_t*)valArray)[j] = tmp8;
|
||||
break;
|
||||
|
||||
case WriteEngine::WR_SHORT:
|
||||
case WriteEngine::WR_USHORT:
|
||||
tmp16 = curValue;
|
||||
((uint16_t*)valArray)[j] = tmp16;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.start("writeRow ");
|
||||
#endif
|
||||
rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
|
||||
#ifdef PROFILE
|
||||
timer.stop("writeRow ");
|
||||
#endif
|
||||
colOp->closeColumnFile(curCol);
|
||||
|
||||
if (versioning)
|
||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
|
||||
|
||||
// check error
|
||||
if (rc != NO_ERROR)
|
||||
break;
|
||||
|
||||
} // end of for (i = 0
|
||||
}
|
||||
|
||||
if (valArray != NULL)
|
||||
free(valArray);
|
||||
|
||||
|
||||
#ifdef PROFILE
|
||||
timer.finish();
|
||||
@ -5766,6 +5958,7 @@ int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId)
|
||||
|
||||
// BUG 4312
|
||||
RemoveTxnFromLBIDMap(txnid);
|
||||
RemoveTxnFromDictMap(txnid);
|
||||
|
||||
config::Config* config = config::Config::makeConfig();
|
||||
prefix = config->getConfig("SystemConfig", "DBRMRoot");
|
||||
@ -6013,6 +6206,7 @@ int WriteEngineWrapper::rollbackVersion(const TxnID& txnid, int sessionId)
|
||||
{
|
||||
// BUG 4312
|
||||
RemoveTxnFromLBIDMap(txnid);
|
||||
RemoveTxnFromDictMap(txnid);
|
||||
|
||||
return BRMWrapper::getInstance()->rollBackVersion(txnid, sessionId);
|
||||
}
|
||||
@ -6114,6 +6308,7 @@ int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid,
|
||||
int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map<FID, FID>& columnOids)
|
||||
{
|
||||
RemoveTxnFromLBIDMap(txnId);
|
||||
RemoveTxnFromDictMap(txnId);
|
||||
|
||||
for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
|
||||
{
|
||||
@ -6129,6 +6324,28 @@ int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map<FID,
|
||||
return rc;
|
||||
}
|
||||
|
||||
void WriteEngineWrapper::AddDictToList(const TxnID txnid,
|
||||
std::vector<BRM::LBID_t>& lbids)
|
||||
{
|
||||
std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
|
||||
|
||||
mapIter = m_dictLBIDMap.find(txnid);
|
||||
|
||||
if (mapIter == m_dictLBIDMap.end())
|
||||
{
|
||||
dictLBIDRec_t tempRecord;
|
||||
tempRecord.insert(lbids.begin(), lbids.end());
|
||||
m_dictLBIDMap[txnid] = tempRecord;
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
dictLBIDRec_t& txnRecord = mapIter->second;
|
||||
txnRecord.insert(lbids.begin(), lbids.end());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Add an lbid to a list of lbids for sending to markExtentsInvalid.
|
||||
@ -6225,6 +6442,17 @@ int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid,
|
||||
return rtn;
|
||||
}
|
||||
|
||||
void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid)
|
||||
{
|
||||
std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;
|
||||
|
||||
mapIter = m_dictLBIDMap.find(txnid);
|
||||
|
||||
if (mapIter != m_dictLBIDMap.end())
|
||||
{
|
||||
m_dictLBIDMap.erase(txnid);
|
||||
}
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
|
Reference in New Issue
Block a user