You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-5021 Remove hard-coded values for data type, column width
and compression type for the AUX column, and replace them with constants defined in the execplan namespace.
This commit is contained in:
@ -607,14 +607,13 @@ keepGoing:
|
||||
++iter;
|
||||
}
|
||||
|
||||
// MCOL-5021
|
||||
// TODO compressionType is hardcoded to 2 (SNAPPY)
|
||||
// TODO MCOL-5021 compressionType is hardcoded to 2 (SNAPPY)
|
||||
bytestream << (fStartingColOID + size);
|
||||
bytestream << (uint8_t)datatypes::SystemCatalog::UTINYINT;
|
||||
bytestream << (uint8_t)execplan::AUX_COL_DATATYPE;
|
||||
bytestream << (uint8_t) false;
|
||||
bytestream << (uint32_t)1;
|
||||
bytestream << (uint32_t)execplan::AUX_COL_WIDTH;
|
||||
bytestream << (uint16_t)useDBRoot;
|
||||
bytestream << (uint32_t)2;
|
||||
bytestream << (uint32_t)execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
|
||||
//@Bug 4176. save oids to a log file for cleanup after fail over.
|
||||
std::vector<CalpontSystemCatalog::OID> oidList;
|
||||
|
@ -918,6 +918,14 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
|
||||
static uint32_t fModuleID;
|
||||
};
|
||||
|
||||
// MCOL-5021
|
||||
const datatypes::SystemCatalog::ColDataType AUX_COL_DATATYPE = datatypes::SystemCatalog::UTINYINT;
|
||||
const int32_t AUX_COL_WIDTH = 1;
|
||||
const CalpontSystemCatalog::CompressionType AUX_COL_COMPRESSION_TYPE = CalpontSystemCatalog::COMPRESSION2;
|
||||
const std::string AUX_COL_DATATYPE_STRING = "unsigned-tinyint";
|
||||
const uint64_t AUX_COL_MINVALUE = MIN_UTINYINT;
|
||||
const uint64_t AUX_COL_MAXVALUE = MAX_UTINYINT;
|
||||
|
||||
/** convenience function to make a TableColName from 3 strings
|
||||
*/
|
||||
const CalpontSystemCatalog::TableColName make_tcn(const std::string& s, const std::string& t,
|
||||
|
@ -93,12 +93,13 @@ BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
|
||||
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
|
||||
vector<BRM::LBID_t> lastScannedLBID,
|
||||
bool hasAuxCol,
|
||||
const std::vector<BRM::EMEntry>& extentsAux)
|
||||
const std::vector<BRM::EMEntry>& extentsAux,
|
||||
execplan::CalpontSystemCatalog::OID oidAux)
|
||||
{
|
||||
SCommand cc;
|
||||
|
||||
tableOID = scan.tableOid();
|
||||
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux));
|
||||
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
|
||||
cc->setBatchPrimitiveProcessor(this);
|
||||
cc->setQueryUuid(scan.queryUuid());
|
||||
cc->setStepUuid(uuid);
|
||||
|
@ -116,7 +116,8 @@ class BatchPrimitiveProcessorJL
|
||||
}
|
||||
|
||||
void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID,
|
||||
bool hasAuxCol, const std::vector<BRM::EMEntry>& extentsAux);
|
||||
bool hasAuxCol, const std::vector<BRM::EMEntry>& extentsAux,
|
||||
execplan::CalpontSystemCatalog::OID oidAux);
|
||||
void addFilterStep(const PseudoColStep&);
|
||||
void addFilterStep(const pColStep&);
|
||||
void addFilterStep(const pDictionaryStep&);
|
||||
|
@ -44,8 +44,10 @@ using namespace messageqcpp;
|
||||
namespace joblist
|
||||
{
|
||||
ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> lastLBID,
|
||||
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_) :
|
||||
extentsAux(extentsAux_), hasAuxCol(hasAuxCol_)
|
||||
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_,
|
||||
execplan::CalpontSystemCatalog::OID oidAux) :
|
||||
extentsAux(extentsAux_), hasAuxCol(hasAuxCol_),
|
||||
fOidAux(oidAux)
|
||||
{
|
||||
BRM::DBRM dbrm;
|
||||
isScan = true;
|
||||
@ -163,6 +165,8 @@ ColumnCommandJL::ColumnCommandJL(const ColumnCommandJL& prevCmd, const DictStepJ
|
||||
BOP = prevCmd.BOP;
|
||||
}
|
||||
isScan = prevCmd.isScan;
|
||||
hasAuxCol = prevCmd.hasAuxCol;
|
||||
extentsAux = prevCmd.extentsAux;
|
||||
colType = prevCmd.colType;
|
||||
extents = prevCmd.extents;
|
||||
OID = prevCmd.OID;
|
||||
@ -375,6 +379,20 @@ void ColumnCommandJL::reloadExtents()
|
||||
}
|
||||
|
||||
sort(extents.begin(), extents.end(), BRM::ExtentSorter());
|
||||
|
||||
if (hasAuxCol)
|
||||
{
|
||||
err = dbrm.getExtents(fOidAux, extentsAux);
|
||||
|
||||
if (err)
|
||||
{
|
||||
ostringstream os;
|
||||
os << "BRM lookup error. Could not get extents for Aux OID " << fOidAux;
|
||||
throw runtime_error(os.str());
|
||||
}
|
||||
|
||||
sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter());
|
||||
}
|
||||
}
|
||||
|
||||
bool ColumnCommandJL::getIsDict()
|
||||
|
@ -41,7 +41,8 @@ class ColumnCommandJL : public CommandJL
|
||||
{
|
||||
public:
|
||||
ColumnCommandJL(const pColScanStep&, std::vector<BRM::LBID_t> lastLBID,
|
||||
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_);
|
||||
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_,
|
||||
execplan::CalpontSystemCatalog::OID oidAux);
|
||||
ColumnCommandJL(const pColStep&);
|
||||
ColumnCommandJL(const ColumnCommandJL&, const DictStepJL&);
|
||||
virtual ~ColumnCommandJL();
|
||||
@ -73,6 +74,10 @@ class ColumnCommandJL : public CommandJL
|
||||
{
|
||||
return extents;
|
||||
}
|
||||
const std::vector<struct BRM::EMEntry>& getExtentsAux()
|
||||
{
|
||||
return extentsAux;
|
||||
}
|
||||
const execplan::CalpontSystemCatalog::ColType& getColType() const
|
||||
{
|
||||
return colType;
|
||||
@ -90,6 +95,15 @@ class ColumnCommandJL : public CommandJL
|
||||
{
|
||||
return isScan;
|
||||
}
|
||||
bool auxCol() const
|
||||
{
|
||||
return hasAuxCol;
|
||||
}
|
||||
|
||||
execplan::CalpontSystemCatalog::OID getOIDAux() const
|
||||
{
|
||||
return fOidAux;
|
||||
}
|
||||
|
||||
void reloadExtents();
|
||||
|
||||
@ -127,6 +141,7 @@ class ColumnCommandJL : public CommandJL
|
||||
std::vector<struct BRM::EMEntry> extentsAux;
|
||||
bool hasAuxCol;
|
||||
uint64_t lbidAux;
|
||||
execplan::CalpontSystemCatalog::OID fOidAux;
|
||||
|
||||
static const unsigned DEFAULT_FILES_PER_COLUMN_PARTITION = 32;
|
||||
|
||||
|
@ -530,12 +530,45 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi
|
||||
extentSize = rhs.extentSize;
|
||||
lbidRanges = rhs.lbidRanges;
|
||||
hasAuxCol = false;
|
||||
execplan::CalpontSystemCatalog::TableName tableName;
|
||||
|
||||
// TODO MCOL-5021 Add try-catch block
|
||||
if (fTableOid >= 3000)
|
||||
{
|
||||
execplan::CalpontSystemCatalog::TableName tableName = jobInfo.csc->tableName(fTableOid);
|
||||
fOidAux = jobInfo.csc->tableAUXColumnOID(tableName);
|
||||
try
|
||||
{
|
||||
tableName = jobInfo.csc->tableName(fTableOid);
|
||||
fOidAux = jobInfo.csc->tableAUXColumnOID(tableName);
|
||||
}
|
||||
catch (logging::IDBExcept& ie)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
|
||||
if (ie.errorCode() == logging::ERR_TABLE_NOT_IN_CATALOG)
|
||||
{
|
||||
oss << "Table " << tableName.toString();
|
||||
oss << " does not exist in the system catalog.";
|
||||
}
|
||||
else
|
||||
{
|
||||
oss << "Error getting AUX column OID for table " << tableName.toString();
|
||||
oss << " due to: " << ie.what();
|
||||
}
|
||||
|
||||
throw runtime_error(oss.str());
|
||||
}
|
||||
catch(std::exception& ex)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
oss << "Error getting AUX column OID for table " << tableName.toString();
|
||||
oss << " due to: " << ex.what();
|
||||
throw runtime_error(oss.str());
|
||||
}
|
||||
catch(...)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
oss << "Error getting AUX column OID for table " << tableName.toString();
|
||||
throw runtime_error(oss.str());
|
||||
}
|
||||
|
||||
if (fOidAux > 3000)
|
||||
{
|
||||
@ -546,6 +579,12 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi
|
||||
|
||||
idbassert(!extentsAux.empty());
|
||||
sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter());
|
||||
|
||||
extentsMap[fOidAux] = tr1::unordered_map<int64_t, EMEntry>();
|
||||
tr1::unordered_map<int64_t, EMEntry>& refAux = extentsMap[fOidAux];
|
||||
|
||||
for (uint32_t z = 0; z < extentsAux.size(); z++)
|
||||
refAux[extentsAux[z].range.start] = extentsAux[z];
|
||||
}
|
||||
}
|
||||
|
||||
@ -849,7 +888,7 @@ void TupleBPS::setBPP(JobStep* jobStep)
|
||||
|
||||
if (pcss != 0)
|
||||
{
|
||||
fBPP->addFilterStep(*pcss, lastScannedLBID, hasAuxCol, extentsAux);
|
||||
fBPP->addFilterStep(*pcss, lastScannedLBID, hasAuxCol, extentsAux, fOidAux);
|
||||
|
||||
extentsMap[pcss->fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
||||
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcss->fOid];
|
||||
@ -1279,7 +1318,6 @@ void TupleBPS::initExtentMarkers()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO MCOL-5021 Add support here
|
||||
void TupleBPS::reloadExtentLists()
|
||||
{
|
||||
/*
|
||||
@ -1333,6 +1371,18 @@ void TupleBPS::reloadExtentLists()
|
||||
|
||||
for (j = 0; j < extents.size(); j++)
|
||||
mref[extents[j].range.start] = extents[j];
|
||||
|
||||
if (cc->auxCol())
|
||||
{
|
||||
const vector<EMEntry>& extentsAux = cc->getExtentsAux();
|
||||
oid = cc->getOIDAux();
|
||||
|
||||
extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
|
||||
tr1::unordered_map<int64_t, struct BRM::EMEntry>& mrefAux = extentsMap[oid];
|
||||
|
||||
for (j = 0; j < extentsAux.size(); j++)
|
||||
mrefAux[extentsAux[j].range.start] = extentsAux[j];
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < projections.size(); i++)
|
||||
|
@ -1959,7 +1959,7 @@ void filterColumnData(NewColRequestHeader* in, ColResultHeader* out, uint16_t* r
|
||||
// Bit patterns in srcArray[i] representing EMPTY and NULL values
|
||||
T emptyValue = getEmptyValue<T>(dataType);
|
||||
T nullValue = getNullValue<T>(dataType);
|
||||
uint8_t emptyValueAux = getEmptyValue<uint8_t>(datatypes::SystemCatalog::UTINYINT);
|
||||
uint8_t emptyValueAux = getEmptyValue<uint8_t>(execplan::AUX_COL_DATATYPE);
|
||||
|
||||
// Precompute filter results for NULL values
|
||||
bool isNullValueMatches =
|
||||
|
@ -235,7 +235,7 @@ class BatchPrimitiveProcessor
|
||||
|
||||
/* Common space for primitive data */
|
||||
alignas(utils::MAXCOLUMNWIDTH) uint8_t blockData[BLOCK_SIZE * utils::MAXCOLUMNWIDTH];
|
||||
uint8_t blockDataAux[BLOCK_SIZE];
|
||||
uint8_t blockDataAux[BLOCK_SIZE * execplan::AUX_COL_WIDTH];
|
||||
boost::scoped_array<uint8_t> outputMsg;
|
||||
uint32_t outMsgSize;
|
||||
|
||||
|
@ -379,13 +379,13 @@ int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc
|
||||
JobColumn curColumn;
|
||||
curColumn.colName = "aux";
|
||||
curColumn.mapOid = tableAUXColOid;
|
||||
curColumn.typeName = "unsigned-tinyint";
|
||||
curColumn.width = 1;
|
||||
curColumn.definedWidth = 1;
|
||||
curColumn.compressionType = 2;
|
||||
curColumn.dctnry.fCompressionType = 2;
|
||||
curColumn.fMinIntSat = MIN_UTINYINT;
|
||||
curColumn.fMaxIntSat = MAX_UTINYINT;
|
||||
curColumn.typeName = execplan::AUX_COL_DATATYPE_STRING;
|
||||
curColumn.width = execplan::AUX_COL_WIDTH;
|
||||
curColumn.definedWidth = execplan::AUX_COL_WIDTH;
|
||||
curColumn.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
curColumn.dctnry.fCompressionType = execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
curColumn.fMinIntSat = execplan::AUX_COL_MINVALUE;
|
||||
curColumn.fMaxIntSat = execplan::AUX_COL_MAXVALUE;
|
||||
curColumn.fWithDefault = true;
|
||||
curColumn.fDefaultUInt = 1;
|
||||
curJob.jobTableList[i].colList.push_back(curColumn);
|
||||
|
@ -213,9 +213,9 @@ uint8_t WE_DMLCommandProc::processSingleInsert(messageqcpp::ByteStream& bs, std:
|
||||
if (tableAUXColOid > 3000)
|
||||
{
|
||||
CalpontSystemCatalog::ColType colType;
|
||||
colType.compressionType = 2;
|
||||
colType.colWidth = 1;
|
||||
colType.colDataType = datatypes::SystemCatalog::UTINYINT;
|
||||
colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
colType.colWidth = execplan::AUX_COL_WIDTH;
|
||||
colType.colDataType = execplan::AUX_COL_DATATYPE;
|
||||
WriteEngine::ColStruct colStruct;
|
||||
colStruct.fColDbRoot = dbroot;
|
||||
WriteEngine::DctnryStruct dctnryStruct;
|
||||
@ -996,10 +996,10 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
|
||||
if ((i == ridList.size() - 1) && (tableAUXColOid > 3000)) // AUX column
|
||||
{
|
||||
aColumn.colWidth = 1;
|
||||
aColumn.colDataType = datatypes::SystemCatalog::UTINYINT;
|
||||
aColumn.colWidth = execplan::AUX_COL_WIDTH;
|
||||
aColumn.colDataType = execplan::AUX_COL_DATATYPE;
|
||||
// TODO MCOL-5021 compressionType for the AUX column is hard-coded to 2
|
||||
aColumn.compressionType = 2;
|
||||
aColumn.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1153,9 +1153,9 @@ uint8_t WE_DMLCommandProc::processBatchInsert(messageqcpp::ByteStream& bs, std::
|
||||
if (tableAUXColOid > 3000)
|
||||
{
|
||||
CalpontSystemCatalog::ColType colType;
|
||||
colType.compressionType = 2;
|
||||
colType.colWidth = 1;
|
||||
colType.colDataType = datatypes::SystemCatalog::UTINYINT;
|
||||
colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
colType.colWidth = execplan::AUX_COL_WIDTH;
|
||||
colType.colDataType = execplan::AUX_COL_DATATYPE;
|
||||
WriteEngine::ColStruct colStruct;
|
||||
WriteEngine::DctnryStruct dctnryStruct;
|
||||
colStruct.dataOid = tableAUXColOid;
|
||||
@ -4305,9 +4305,9 @@ uint8_t WE_DMLCommandProc::processDelete(messageqcpp::ByteStream& bs, std::strin
|
||||
if (hasAUXCol)
|
||||
{
|
||||
CalpontSystemCatalog::ColType colType;
|
||||
colType.compressionType = 2;
|
||||
colType.colWidth = 1;
|
||||
colType.colDataType = datatypes::SystemCatalog::UTINYINT;
|
||||
colType.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
|
||||
colType.colWidth = execplan::AUX_COL_WIDTH;
|
||||
colType.colDataType = execplan::AUX_COL_DATATYPE;
|
||||
colStruct.dataOid = tableAUXColOid;
|
||||
colStruct.tokenFlag = false;
|
||||
colStruct.fCompressionType = colType.compressionType;
|
||||
|
Reference in New Issue
Block a user