You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2026-01-06 08:21:10 +03:00
MCOL-641 1. Minor refactoring of decimalToString for int128_t.
2. Update unit tests for decimalToString. 3. Allow support for wide decimal in TupleConstantStep::fillInConstants().
This commit is contained in:
committed by
Roman Nozdrin
parent
2e8e7d52c3
commit
9b714274db
@@ -994,7 +994,9 @@ void BulkLoadBuffer::convert(char* field, int fieldLength,
|
||||
}
|
||||
else
|
||||
{
|
||||
dataconvert::atoi128(string(field), bigllVal);
|
||||
bool saturate = false;
|
||||
bigllVal = dataconvert::string_to_ll<int128_t>(string(field), saturate);
|
||||
// TODO MCOL-641 check saturate
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
@@ -815,24 +815,19 @@ uint8_t WE_DMLCommandProc::rollbackVersion(ByteStream& bs, std::string& err)
|
||||
uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::string& err, ByteStream::quadbyte& PMId)
|
||||
{
|
||||
int rc = 0;
|
||||
//cout << "processBatchInsert received bytestream length " << bs.length() << endl;
|
||||
|
||||
InsertDMLPackage insertPkg;
|
||||
ByteStream::quadbyte tmp32;
|
||||
bs >> tmp32;
|
||||
//cout << "processBatchInsert got transaction id " << tmp32 << endl;
|
||||
bs >> PMId;
|
||||
//cout << "processBatchInsert gor PMId " << PMId << endl;
|
||||
insertPkg.read( bs);
|
||||
uint32_t sessionId = insertPkg.get_SessionID();
|
||||
//cout << " processBatchInsert for session " << sessionId << endl;
|
||||
DMLTable* tablePtr = insertPkg.get_Table();
|
||||
bool isAutocommitOn = insertPkg.get_isAutocommitOn();
|
||||
|
||||
if (idbdatafile::IDBPolicy::useHdfs())
|
||||
isAutocommitOn = true;
|
||||
|
||||
//cout << "This session isAutocommitOn is " << isAutocommitOn << endl;
|
||||
BRM::TxnID txnid;
|
||||
txnid.id = tmp32;
|
||||
txnid.valid = true;
|
||||
@@ -858,7 +853,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
try
|
||||
{
|
||||
ridList = systemCatalogPtr->columnRIDs(tableName, true);
|
||||
roPair = systemCatalogPtr->tableRID( tableName);
|
||||
roPair = systemCatalogPtr->tableRID(tableName);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
@@ -867,7 +862,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
std::vector<OID> dctnryStoreOids(ridList.size()) ;
|
||||
std::vector<Column> columns;
|
||||
DctnryStructList dctnryList;
|
||||
@@ -919,14 +913,10 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
if (i == 0)
|
||||
{
|
||||
rc = pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bFirstExtentOnThisPM, bEmptyPM, trkErrMsg);
|
||||
/* cout << "bEmptyPM = " << (int) bEmptyPM << " bFirstExtentOnThisPM= " << (int)bFirstExtentOnThisPM <<
|
||||
" oid:dbroot:hwm = " << ridList[i].objnum << ":"<<dbRootExtent.fDbRoot << ":"
|
||||
<<":"<<dbRootExtent.fLocalHwm << " err = " << trkErrMsg << endl; */
|
||||
}
|
||||
else
|
||||
pDBRootExtentTracker->assignFirstSegFile(*(dbRootExtTrackerVec[0].get()), dbRootExtent);
|
||||
|
||||
|
||||
colDBRootExtentInfo.push_back(dbRootExtent);
|
||||
|
||||
Column aColumn;
|
||||
@@ -984,7 +974,7 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
std::vector<BRM::LBIDRange> rangeList;
|
||||
|
||||
// use of MetaFile for bulk rollback support
|
||||
if ( fIsFirstBatchPm && isAutocommitOn)
|
||||
if (fIsFirstBatchPm && isAutocommitOn)
|
||||
{
|
||||
//save meta data, version last block for each dbroot at the start of batch insert
|
||||
try
|
||||
@@ -992,10 +982,8 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
fRBMetaWriter->init(tblOid, tableName.table);
|
||||
fRBMetaWriter->saveBulkRollbackMetaData(columns, dctnryStoreOids, dbRootHWMInfoColVec);
|
||||
|
||||
//cout << "Saved meta files" << endl;
|
||||
if (!bFirstExtentOnThisPM)
|
||||
{
|
||||
//cout << "Backing up hwm chunks" << endl;
|
||||
for (unsigned i = 0; i < dctnryList.size(); i++) //back up chunks for compressed dictionary
|
||||
{
|
||||
// @bug 5572 HDFS tmp file - Ignoring return flag, don't need in this context
|
||||
@@ -1310,7 +1298,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
int error = NO_ERROR;
|
||||
|
||||
//fWriteEngine.setDebugLevel(WriteEngine::DEBUG_3);
|
||||
//cout << "Batch inserting a row with transaction id " << txnid.id << endl;
|
||||
if (colValuesList.size() > 0)
|
||||
{
|
||||
if (colValuesList[0].size() > 0)
|
||||
@@ -1361,7 +1348,6 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
if ( isWarningSet && ( rc == NO_ERROR ) )
|
||||
{
|
||||
rc = dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING;
|
||||
//cout << "Got warning" << endl;
|
||||
Message::Args args;
|
||||
string cols = "'" + colNames[0] + "'";
|
||||
|
||||
|
||||
@@ -154,10 +154,7 @@ void DmlReadThread::operator()()
|
||||
|
||||
case WE_SVR_BATCH_INSERT:
|
||||
{
|
||||
//timer.start("processBatchInsert");
|
||||
rc = fWeDMLprocessor->processBatchInsert(ibs, errMsg, PMId);
|
||||
//timer.stop("processBatchInsert");
|
||||
//cout << "fWeDMLprocessor " << fWeDMLprocessor << " is processing batchinsert ..." << endl;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@@ -1129,7 +1129,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
//----------------------------------------------------------------------
|
||||
if (bFirstExtentOnThisPM)
|
||||
{
|
||||
//cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
|
||||
std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
|
||||
BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
|
||||
|
||||
@@ -1234,8 +1233,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
|
||||
colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
|
||||
colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
|
||||
//cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
|
||||
//<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
|
||||
dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
|
||||
dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
|
||||
dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
|
||||
@@ -1271,7 +1268,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
{
|
||||
aExt.hwm = extents[i].startBlkOffset;
|
||||
aExt.isNewExt = true;
|
||||
//cout << "adding a ext to metadata" << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1279,12 +1275,10 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
|
||||
aExt.isNewExt = false;
|
||||
aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
|
||||
//cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
|
||||
}
|
||||
|
||||
aExt.current = true;
|
||||
aColExtsInfo.push_back(aExt);
|
||||
//cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
|
||||
}
|
||||
|
||||
tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
|
||||
@@ -1387,7 +1381,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
if (it != aColExtsInfo.end())
|
||||
{
|
||||
hwm = it->hwm;
|
||||
//cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and seg is " << colStructList[0].fColSegment << endl;
|
||||
}
|
||||
|
||||
oldHwm = hwm; //Save this info for rollback
|
||||
@@ -1422,8 +1415,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent, rowsLeft, newHwm, newFile,
|
||||
newColStructList, newDctnryStructList, dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm);
|
||||
|
||||
//cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
|
||||
//cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
|
||||
if (rc != NO_ERROR) //Clean up is already done
|
||||
return rc;
|
||||
|
||||
@@ -1441,7 +1432,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
((totalRow - rowsLeft) > 0) &&
|
||||
(rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
|
||||
{
|
||||
for (unsigned k=0; k<colStructList.size(); k++)
|
||||
for (unsigned k=0; k<colStructList.size(); k++)
|
||||
{
|
||||
// Skip the selected column
|
||||
if (k == colId)
|
||||
@@ -1505,7 +1496,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
return rc;
|
||||
}
|
||||
|
||||
for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
|
||||
for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
|
||||
{
|
||||
if (dctStr_iter->length() == 0)
|
||||
{
|
||||
@@ -1560,7 +1551,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
|
||||
for (uint32_t rows = 0; rows < rowsLeft; rows++)
|
||||
for (uint32_t rows = 0; rows < rowsLeft; rows++)
|
||||
{
|
||||
if (dctStr_iter->length() == 0)
|
||||
{
|
||||
@@ -1627,13 +1618,11 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
lastRidNew = rowIdArray[totalRow - 1];
|
||||
}
|
||||
|
||||
//cout << "rowid allocated is " << lastRid << endl;
|
||||
//if a new extent is created, all the columns in this table should have their own new extent
|
||||
//First column already processed
|
||||
|
||||
//@Bug 1701. Close the file (if uncompressed)
|
||||
m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
|
||||
//cout << "Saving hwm info for new ext batch" << endl;
|
||||
//Update hwm to set them in the end
|
||||
bool succFlag = false;
|
||||
unsigned colWidth = 0;
|
||||
@@ -1663,7 +1652,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
colWidth = colStructList[i].colWidth;
|
||||
succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);
|
||||
|
||||
//cout << "insertcolumnrec oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid << ":" << curFbo << ":" << oldHwm << endl;
|
||||
if (succFlag)
|
||||
{
|
||||
if ((HWM)curFbo >= oldHwm)
|
||||
@@ -1677,8 +1665,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
it->current = false;
|
||||
}
|
||||
|
||||
//cout << "updated old ext info for oid " << colStructList[i].dataOid << " dbroot:part:seg:hwm:current = "
|
||||
//<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent is " << newExtent << endl;
|
||||
}
|
||||
else
|
||||
return ERR_INVALID_PARAM;
|
||||
@@ -1734,7 +1720,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
colNewValueList.push_back(newColTupleList);
|
||||
newColTupleList.clear();
|
||||
|
||||
//upate the oldvalue list for the old extent
|
||||
//update the oldvalue list for the old extent
|
||||
for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
|
||||
{
|
||||
firstPartTupleList.push_back(colTupleList[j]);
|
||||
@@ -1749,7 +1735,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
#ifdef PROFILE
|
||||
timer.start("writeColumnRec");
|
||||
#endif
|
||||
//cout << "Writing column record" << endl;
|
||||
|
||||
if (rc == NO_ERROR)
|
||||
{
|
||||
@@ -3979,7 +3964,7 @@ void WriteEngineWrapper::printInputValue(const ColStructList& colStructList,
|
||||
// WIP replace with a single call
|
||||
char buf[utils::MAXLENGTH16BYTES];
|
||||
int128_t val = boost::any_cast<int128_t>(curTuple.data);
|
||||
dataconvert::DataConvert::toString(&val, 0, buf, utils::MAXLENGTH16BYTES);
|
||||
dataconvert::DataConvert::decimalToString(&val, 0, buf, utils::MAXLENGTH16BYTES, curColStruct.colDataType);
|
||||
curStr.assign(buf);
|
||||
}
|
||||
else if (curTuple.data.type() == typeid(double))
|
||||
|
||||
Reference in New Issue
Block a user