You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-5021 Add support for the AUX column in ExeMgr and PrimProc.
In the joblist code, in addition to sending the lbid of the SCAN column, we also send the corresponding lbid of the AUX column to PrimProc. In the primitives processor code in PrimProc, we load the AUX column block (8192 rows since the AUX column is implemented as a 1-byte UNSIGNED TINYINT) into memory and then pass it down to the low-level scanning (vectorized scanning as applicable) routine to build a non-Empty mask for the block being processed to filter out DELETED rows based on comparison of the AUX block row to the empty magic value for the AUX column.
This commit is contained in:
@ -90,12 +90,15 @@ BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
|
||||
{
|
||||
}
|
||||
|
||||
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, vector<BRM::LBID_t> lastScannedLBID)
|
||||
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
|
||||
vector<BRM::LBID_t> lastScannedLBID,
|
||||
bool hasAuxCol,
|
||||
const std::vector<BRM::EMEntry>& extentsAux)
|
||||
{
|
||||
SCommand cc;
|
||||
|
||||
tableOID = scan.tableOid();
|
||||
cc.reset(new ColumnCommandJL(scan, lastScannedLBID));
|
||||
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux));
|
||||
cc->setBatchPrimitiveProcessor(this);
|
||||
cc->setQueryUuid(scan.queryUuid());
|
||||
cc->setStepUuid(uuid);
|
||||
|
@ -115,7 +115,8 @@ class BatchPrimitiveProcessorJL
|
||||
threadCount = tc;
|
||||
}
|
||||
|
||||
void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID);
|
||||
void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID,
|
||||
bool hasAuxCol, const std::vector<BRM::EMEntry>& extentsAux);
|
||||
void addFilterStep(const PseudoColStep&);
|
||||
void addFilterStep(const pColStep&);
|
||||
void addFilterStep(const pDictionaryStep&);
|
||||
|
@ -43,7 +43,9 @@ using namespace messageqcpp;
|
||||
|
||||
namespace joblist
|
||||
{
|
||||
ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> lastLBID)
|
||||
ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> lastLBID,
|
||||
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_) :
|
||||
extentsAux(extentsAux_), hasAuxCol(hasAuxCol_)
|
||||
{
|
||||
BRM::DBRM dbrm;
|
||||
isScan = true;
|
||||
@ -88,6 +90,7 @@ ColumnCommandJL::ColumnCommandJL(const pColStep& step)
|
||||
BRM::DBRM dbrm;
|
||||
|
||||
isScan = false;
|
||||
hasAuxCol = false;
|
||||
|
||||
/* grab necessary vars from step */
|
||||
traceFlags = step.fTraceFlags;
|
||||
@ -210,6 +213,10 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const
|
||||
bs << BOP;
|
||||
bs << filterCount;
|
||||
}
|
||||
if (hasAuxCol)
|
||||
bs << (uint8_t)1;
|
||||
else
|
||||
bs << (uint8_t)0;
|
||||
serializeInlineVector(bs, fLastLbid);
|
||||
|
||||
CommandJL::createCommand(bs);
|
||||
@ -218,6 +225,9 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const
|
||||
void ColumnCommandJL::runCommand(ByteStream& bs) const
|
||||
{
|
||||
bs << lbid;
|
||||
|
||||
if (hasAuxCol)
|
||||
bs << lbidAux;
|
||||
}
|
||||
|
||||
void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot)
|
||||
@ -247,11 +257,26 @@ void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot)
|
||||
"; blockNum = " << blockNum << "; OID=" << OID << " LBID=" << lbid;
|
||||
cout << os.str() << endl;
|
||||
*/
|
||||
return;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid.");
|
||||
uint32_t j;
|
||||
|
||||
for (j = 0; j < extentsAux.size(); j++)
|
||||
{
|
||||
if (extentsAux[j].dbRoot == dbRoot && extentsAux[j].partitionNum == partNum &&
|
||||
extentsAux[j].segmentNum == segNum && extentsAux[j].blockOffset == (extentNum * 1 * 1024))
|
||||
{
|
||||
lbidAux = extentsAux[j].range.start + (blockNum * 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (i == extents.size() || (hasAuxCol && j == extentsAux.size()))
|
||||
{
|
||||
throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid.");
|
||||
}
|
||||
|
||||
// ostringstream os;
|
||||
// os << "CCJL: rid=" << rid << "; dbroot=" << dbRoot << "; partitionNum=" << partitionNum << ";
|
||||
|
@ -40,7 +40,8 @@ namespace joblist
|
||||
class ColumnCommandJL : public CommandJL
|
||||
{
|
||||
public:
|
||||
ColumnCommandJL(const pColScanStep&, std::vector<BRM::LBID_t> lastLBID);
|
||||
ColumnCommandJL(const pColScanStep&, std::vector<BRM::LBID_t> lastLBID,
|
||||
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_);
|
||||
ColumnCommandJL(const pColStep&);
|
||||
ColumnCommandJL(const ColumnCommandJL&, const DictStepJL&);
|
||||
virtual ~ColumnCommandJL();
|
||||
@ -123,6 +124,10 @@ class ColumnCommandJL : public CommandJL
|
||||
uint32_t numDBRoots;
|
||||
uint32_t dbroot;
|
||||
|
||||
std::vector<struct BRM::EMEntry> extentsAux;
|
||||
bool hasAuxCol;
|
||||
uint64_t lbidAux;
|
||||
|
||||
static const unsigned DEFAULT_FILES_PER_COLUMN_PARTITION = 32;
|
||||
|
||||
public:
|
||||
|
@ -2313,7 +2313,6 @@ SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* r
|
||||
ret->errorInfo(errorInfo);
|
||||
}
|
||||
|
||||
std::cout<<ret->toString()<<std::endl;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -1244,6 +1244,9 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
|
||||
execplan::CalpontSystemCatalog::ColType fColType;
|
||||
execplan::CalpontSystemCatalog::OID fOid;
|
||||
execplan::CalpontSystemCatalog::OID fTableOid;
|
||||
execplan::CalpontSystemCatalog::OID fOidAux;
|
||||
bool hasAuxCol;
|
||||
std::vector<BRM::EMEntry> extentsAux;
|
||||
uint64_t fLastTupleId;
|
||||
BRM::LBIDRange_v lbidRanges;
|
||||
std::vector<int32_t> lastExtent;
|
||||
|
@ -501,6 +501,7 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo)
|
||||
fRunExecuted = false;
|
||||
fSwallowRows = false;
|
||||
smallOuterJoiner = -1;
|
||||
hasAuxCol = false;
|
||||
|
||||
// @1098 initialize scanFlags to be true
|
||||
scanFlags.assign(numExtents, true);
|
||||
@ -528,6 +529,25 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi
|
||||
fTableOid = rhs.tableOid();
|
||||
extentSize = rhs.extentSize;
|
||||
lbidRanges = rhs.lbidRanges;
|
||||
hasAuxCol = false;
|
||||
|
||||
// TODO MCOL-5021 Add try-catch block
|
||||
if (fTableOid >= 3000)
|
||||
{
|
||||
execplan::CalpontSystemCatalog::TableName tableName = jobInfo.csc->tableName(fTableOid);
|
||||
fOidAux = jobInfo.csc->tableAUXColumnOID(tableName);
|
||||
|
||||
if (fOidAux > 3000)
|
||||
{
|
||||
hasAuxCol = true;
|
||||
|
||||
if (dbrm.getExtents(fOidAux, extentsAux))
|
||||
throw runtime_error("TupleBPS::TupleBPS BRM extent lookup failure (1)");
|
||||
|
||||
idbassert(!extentsAux.empty());
|
||||
sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter());
|
||||
}
|
||||
}
|
||||
|
||||
/* These lines are obsoleted by initExtentMarkers. Need to remove & retest. */
|
||||
scannedExtents = rhs.extents;
|
||||
@ -650,6 +670,7 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) : BatchPrimi
|
||||
fRunExecuted = false;
|
||||
isFilterFeeder = false;
|
||||
smallOuterJoiner = -1;
|
||||
hasAuxCol = false;
|
||||
|
||||
// @1098 initialize scanFlags to be true
|
||||
scanFlags.assign(numExtents, true);
|
||||
@ -719,6 +740,7 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo)
|
||||
scanFlags.assign(numExtents, true);
|
||||
runtimeCPFlags.assign(numExtents, true);
|
||||
bop = BOP_AND;
|
||||
hasAuxCol = false;
|
||||
|
||||
runRan = joinRan = false;
|
||||
fDelivery = false;
|
||||
@ -827,7 +849,7 @@ void TupleBPS::setBPP(JobStep* jobStep)
|
||||
|
||||
if (pcss != 0)
|
||||
{
|
||||
fBPP->addFilterStep(*pcss, lastScannedLBID);
|
||||
fBPP->addFilterStep(*pcss, lastScannedLBID, hasAuxCol, extentsAux);
|
||||
|
||||
extentsMap[pcss->fOid] = tr1::unordered_map<int64_t, EMEntry>();
|
||||
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcss->fOid];
|
||||
@ -1257,6 +1279,7 @@ void TupleBPS::initExtentMarkers()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO MCOL-5021 Add support here
|
||||
void TupleBPS::reloadExtentLists()
|
||||
{
|
||||
/*
|
||||
|
Reference in New Issue
Block a user