1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Revert "Merge pull request #2022 from mariadb-corporation/bar-develop-MCOL-4791"

This reverts commit 4016e25e5b, reversing
changes made to 85435f6b1e.
This commit is contained in:
Gagan Goel
2021-07-13 11:06:56 +00:00
parent 90e5218c71
commit b3a560300c
22 changed files with 318 additions and 227 deletions

View File

@ -108,44 +108,6 @@ namespace
namespace execplan
{
ColumnCommandDataType::ColumnCommandDataType(const CalpontSystemCatalog::DataType &rhs,
bool fudge)
:DataType(rhs),
mIsDict(false)
{
if (!fudge)
return;
//If this is a dictionary column, fudge the numbers...
if (colDataType == CalpontSystemCatalog::VARCHAR )
colWidth++;
//If this is a dictionary column, fudge the numbers...
if ((colDataType == CalpontSystemCatalog::VARBINARY)
|| (colDataType == CalpontSystemCatalog::BLOB)
|| (colDataType == CalpontSystemCatalog::TEXT))
{
colWidth = 8;
mIsDict = true;
}
// TODO MCOL-641
else if (colWidth > 8
&& colDataType != CalpontSystemCatalog::DECIMAL
&& colDataType != CalpontSystemCatalog::UDECIMAL)
{
colWidth = 8;
mIsDict = true;
}
//Round colWidth up
if (colWidth == 3)
colWidth = 4;
else if (colWidth == 5 || colWidth == 6 || colWidth == 7)
colWidth = 8;
}
const SOP opeq(new Operator("="));
const string colDataTypeToString(CalpontSystemCatalog::ColDataType cdt)
@ -6128,34 +6090,43 @@ CalpontSystemCatalog::ColType::ColType() :
constraintType(NO_CONSTRAINT),
defaultValue(""),
colPosition(-1),
compressionType(NO_COMPRESSION),
columnOID(0),
autoincrement(0),
nextvalue(0)
nextvalue(0),
cs(NULL)
{
charsetNumber = default_charset_info->number;
}
CalpontSystemCatalog::ColType::ColType(const ColType& rhs)
:DataType(rhs)
:TypeHolderStd(rhs)
{
constraintType = rhs.constraintType;
ddn = rhs.ddn;
defaultValue = rhs.defaultValue;
colPosition = rhs.colPosition;
compressionType = rhs.compressionType;
columnOID = rhs.columnOID;
autoincrement = rhs.autoincrement;
nextvalue = rhs.nextvalue;
charsetNumber = rhs.charsetNumber;
cs = rhs.cs;
}
CalpontSystemCatalog::ColType& CalpontSystemCatalog::ColType::operator=(const ColType& rhs)
{
DataType::operator=(rhs);
TypeHolderStd::operator=(rhs);
constraintType = rhs.constraintType;
ddn = rhs.ddn;
defaultValue = rhs.defaultValue;
colPosition = rhs.colPosition;
compressionType = rhs.compressionType;
columnOID = rhs.columnOID;
autoincrement = rhs.autoincrement;
nextvalue = rhs.nextvalue;
charsetNumber = rhs.charsetNumber;
cs = rhs.cs;
return *this;
}
@ -6163,11 +6134,11 @@ CalpontSystemCatalog::ColType& CalpontSystemCatalog::ColType::operator=(const Co
CHARSET_INFO* CalpontSystemCatalog::DataType::getCharset()
CHARSET_INFO* CalpontSystemCatalog::ColType::getCharset()
{
if (!mCharset)
mCharset= & datatypes::Charset(charsetNumber).getCharset();
return mCharset;
if (!cs)
cs= & datatypes::Charset(charsetNumber).getCharset();
return cs;
}
const string CalpontSystemCatalog::ColType::toString() const

View File

@ -198,19 +198,26 @@ public:
typedef std::vector<DictOID> DictOIDList;
class DataType: public datatypes::SystemCatalog::TypeHolderStd
/** the structure returned by colType
*
* defaultValue is only meaningful when constraintType == DEFAULT_CONSTRAINT
*/
struct ColType: public datatypes::SystemCatalog::TypeHolderStd
{
public:
ColType();
ConstraintType constraintType;
DictOID ddn;
std::string defaultValue;
int32_t colPosition; // temporally put here. may need to have ColInfo struct later
int32_t compressionType;
OID columnOID;
bool autoincrement; //set to true if SYSCOLUMN autoincrement is <20>y<EFBFBD>
uint64_t nextvalue; //next autoincrement value
uint32_t charsetNumber;
protected:
const CHARSET_INFO* mCharset;
public:
DataType()
:compressionType(NO_COMPRESSION),
charsetNumber(63), // &my_charset_bin
mCharset(nullptr)
{ }
const CHARSET_INFO* cs;
ColType(const ColType& rhs);
ColType& operator=(const ColType& rhs);
CHARSET_INFO* getCharset();
// for F&E use. only serialize necessary info for now
@ -235,27 +242,6 @@ public:
b >> (uint32_t&)compressionType;
b >> charsetNumber;
}
};
/** the structure returned by colType
*
* defaultValue is only meaningful when constraintType == DEFAULT_CONSTRAINT
*/
struct ColType: public DataType
{
ColType();
ConstraintType constraintType;
DictOID ddn;
std::string defaultValue;
int32_t colPosition; // temporally put here. may need to have ColInfo struct later
OID columnOID;
bool autoincrement; //set to true if SYSCOLUMN autoincrement is <20>y<EFBFBD>
uint64_t nextvalue; //next autoincrement value
ColType(const ColType& rhs);
ColType& operator=(const ColType& rhs);
/**
* @brief convert a columns data, represnted as a string,
@ -899,51 +885,6 @@ private:
static uint32_t fModuleID;
};
class ColumnCommandDataType: public CalpontSystemCatalog::DataType
{
bool mIsDict;
public:
ColumnCommandDataType()
:DataType(),
mIsDict(false)
{ }
explicit ColumnCommandDataType(const DataType &rhs, bool fudge);
bool isDict() const
{
return mIsDict;
}
void serialize(messageqcpp::ByteStream& bs) const
{
bs << (uint8_t) colDataType;
bs << (uint8_t) colWidth;
bs << (uint8_t) scale;
bs << (uint8_t) compressionType;
bs << (uint32_t) charsetNumber;
bs << (uint8_t) mIsDict;
}
void unserialize(messageqcpp::ByteStream& bs)
{
uint8_t tmp8;
uint32_t tmp32;
bs >> tmp8;
colDataType = (execplan::CalpontSystemCatalog::ColDataType) tmp8;
bs >> tmp8;
colWidth = tmp8;
bs >> tmp8;
scale = tmp8;
bs >> tmp8;
compressionType = tmp8;
bs >> tmp32;
charsetNumber = tmp32;
mCharset = nullptr;
bs >> tmp8;
mIsDict = static_cast<bool>(tmp8);
}
};
/** convenience function to make a TableColName from 3 strings
*/
const CalpontSystemCatalog::TableColName make_tcn(const std::string& s, const std::string& t, const std::string& c, int lower_case_table_names=0);
@ -956,12 +897,12 @@ const CalpontSystemCatalog::TableAliasName make_aliastable(const std::string& s,
const CalpontSystemCatalog::TableAliasName make_aliasview(const std::string& s, const std::string& t, const std::string& a, const std::string& v,
const bool fisColumnStore = true, int lower_case_table_names=0);
inline bool isNull(int128_t val, const execplan::CalpontSystemCatalog::DataType& ct)
inline bool isNull(int128_t val, const execplan::CalpontSystemCatalog::ColType& ct)
{
return datatypes::Decimal::isWideDecimalNullValue(val);
}
inline bool isNull(int64_t val, const execplan::CalpontSystemCatalog::DataType& ct)
inline bool isNull(int64_t val, const execplan::CalpontSystemCatalog::ColType& ct)
{
bool ret = false;

View File

@ -757,8 +757,7 @@ void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in,
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out,
bool* validCPData, uint64_t* lbid, int128_t* min, int128_t* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis,
uint32_t threadID, bool* hasWideColumn,
const ColumnCommandDataType& colType) const
uint32_t threadID, bool* hasWideColumn, const execplan::CalpontSystemCatalog::ColType& colType) const
{
uint64_t tmp64;
int128_t tmp128;

View File

@ -171,8 +171,7 @@ public:
void getRowGroupData(messageqcpp::ByteStream& in, std::vector<rowgroup::RGData>* out,
bool* validCPData, uint64_t* lbid, int128_t* min, int128_t* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis,
uint32_t threadID, bool* hasBinaryColumn,
const execplan::ColumnCommandDataType & colType) const;
uint32_t threadID, bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const;
void deserializeAggregateResult(messageqcpp::ByteStream* in,
std::vector<rowgroup::RGData>* out) const;
bool countThisMsg(messageqcpp::ByteStream& in) const;

View File

@ -45,7 +45,6 @@ namespace joblist
{
ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> lastLBID)
:colType(scan.fColType)
{
BRM::DBRM dbrm;
isScan = true;
@ -54,11 +53,13 @@ ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> l
traceFlags = scan.fTraceFlags;
filterString = scan.fFilterString;
filterCount = scan.fFilterCount;
colType = scan.fColType;
BOP = scan.fBOP;
extents = scan.extents;
OID = scan.fOid;
colName = scan.fName;
rpbShift = scan.rpbShift;
fIsDict = scan.fIsDict;
fLastLbid = lastLBID;
//cout << "CCJL inherited lastlbids: ";
@ -83,7 +84,6 @@ ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> l
}
ColumnCommandJL::ColumnCommandJL(const pColStep& step)
:colType(step.fColType)
{
BRM::DBRM dbrm;
@ -93,6 +93,7 @@ ColumnCommandJL::ColumnCommandJL(const pColStep& step)
traceFlags = step.fTraceFlags;
filterString = step.fFilterString;
filterCount = step.fFilterCount;
colType = step.fColType;
BOP = step.fBOP;
extents = step.extents;
divShift = step.divShift;
@ -100,6 +101,7 @@ ColumnCommandJL::ColumnCommandJL(const pColStep& step)
rpbShift = step.rpbShift;
OID = step.fOid;
colName = step.fName;
fIsDict = step.fIsDict;
ResourceManager* rm = ResourceManager::instance();
numDBRoots = rm->getDBRootCount();
@ -146,7 +148,11 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const
cout << endl;
#endif
colType.serialize(bs);
bs << (uint8_t) colType.colDataType;
bs << (uint8_t) colType.colWidth;
bs << (uint8_t) colType.scale;
bs << (uint8_t) colType.compressionType;
bs << (uint32_t) colType.charsetNumber;
bs << BOP;
bs << filterCount;
serializeInlineVector(bs, fLastLbid);

View File

@ -72,13 +72,13 @@ public:
{
return extents;
}
const execplan::ColumnCommandDataType& getColType() const
const execplan::CalpontSystemCatalog::ColType& getColType() const
{
return colType;
}
bool isDict() const
{
return colType.isDict();
return fIsDict;
}
void scan(bool b)
@ -96,7 +96,7 @@ protected:
uint32_t currentExtentIndex;
messageqcpp::ByteStream filterString;
std::vector<struct BRM::EMEntry> extents;
execplan::ColumnCommandDataType colType;
execplan::CalpontSystemCatalog::ColType colType;
private:
ColumnCommandJL();
@ -112,6 +112,8 @@ private:
uint16_t filterCount;
std::vector<BRM::LBID_t> fLastLbid;
bool fIsDict;
// @Bug 2889. Added two members below for drop partition enhancement.
// RJD: make sure that we keep enough significant digits around for partition math
uint64_t fFilesPerColumnPartition;

View File

@ -351,7 +351,7 @@ int LBIDList::getMinMaxFromEntries(T& min, T& max, int32_t& seq,
template <typename T>
void LBIDList::UpdateMinMax(T min, T max, int64_t lbid,
const ColumnCommandDataType & type,
const CalpontSystemCatalog::ColType & type,
bool validData)
{
MinMaxPartition* mmp = NULL;
@ -392,7 +392,7 @@ void LBIDList::UpdateMinMax(T min, T max, int64_t lbid,
{
if (datatypes::isCharType(type.colDataType))
{
datatypes::Charset cs(const_cast<execplan::ColumnCommandDataType &>(type).getCharset());
datatypes::Charset cs(const_cast<CalpontSystemCatalog::ColType &>(type).getCharset());
if (datatypes::TCharShort::strnncollsp(cs, min, mmp->min, type.colWidth) < 0 ||
mmp->min == numeric_limits<int64_t>::max())
mmp->min = min;
@ -450,7 +450,7 @@ void LBIDList::UpdateMinMax(T min, T max, int64_t lbid,
}
}
void LBIDList::UpdateAllPartitionInfo(const ColumnCommandDataType & colType)
void LBIDList::UpdateAllPartitionInfo(const execplan::CalpontSystemCatalog::ColType& colType)
{
MinMaxPartition* mmp = NULL;
#ifdef DEBUG
@ -693,13 +693,13 @@ static inline bool compareStr(const datatypes::Charset &cs,
template<typename T>
bool LBIDList::checkSingleValue(T min, T max, T value,
const ColumnCommandDataType & type)
const execplan::CalpontSystemCatalog::ColType & type)
{
if (isCharType(type.colDataType))
{
// MCOL-641 LBIDList::CasualPartitionDataType() returns false if
// width > 8 for a character type, so T cannot be int128_t here
datatypes::Charset cs(const_cast<execplan::ColumnCommandDataType&>(type).getCharset());
datatypes::Charset cs(const_cast<execplan::CalpontSystemCatalog::ColType&>(type).getCharset());
return datatypes::TCharShort::strnncollsp(cs, value, min, type.colWidth) >= 0 &&
datatypes::TCharShort::strnncollsp(cs, value, max, type.colWidth) <= 0;
}
@ -716,13 +716,13 @@ bool LBIDList::checkSingleValue(T min, T max, T value,
template<typename T>
bool LBIDList::checkRangeOverlap(T min, T max, T tmin, T tmax,
const ColumnCommandDataType & type)
const execplan::CalpontSystemCatalog::ColType & type)
{
if (isCharType(type.colDataType))
{
// MCOL-641 LBIDList::CasualPartitionDataType() returns false if
// width > 8 for a character type, so T cannot be int128_t here
datatypes::Charset cs(const_cast<execplan::ColumnCommandDataType&>(type).getCharset());
datatypes::Charset cs(const_cast<execplan::CalpontSystemCatalog::ColType&>(type).getCharset());
return datatypes::TCharShort::strnncollsp(cs, tmin, max, type.colWidth) <= 0 &&
datatypes::TCharShort::strnncollsp(cs, tmax, min, type.colWidth) >= 0;
}
@ -740,7 +740,7 @@ bool LBIDList::checkRangeOverlap(T min, T max, T tmin, T tmax,
bool LBIDList::CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange,
const messageqcpp::ByteStream* bs,
const uint16_t NOPS,
const ColumnCommandDataType& ct,
const execplan::CalpontSystemCatalog::ColType& ct,
const uint8_t BOP)
{
@ -952,27 +952,27 @@ bool LBIDList::GetMinMax<int64_t>(int64_t* min, int64_t* max, int64_t* seq,
template
void LBIDList::UpdateMinMax<int128_t>(int128_t min, int128_t max, int64_t lbid,
const ColumnCommandDataType & type, bool validData = true);
const execplan::CalpontSystemCatalog::ColType & type, bool validData = true);
template
void LBIDList::UpdateMinMax<int64_t>(int64_t min, int64_t max, int64_t lbid,
const ColumnCommandDataType & type, bool validData = true);
const execplan::CalpontSystemCatalog::ColType & type, bool validData = true);
template
bool LBIDList::checkSingleValue<int128_t>(int128_t min, int128_t max, int128_t value,
const ColumnCommandDataType & type);
const execplan::CalpontSystemCatalog::ColType & type);
template
bool LBIDList::checkSingleValue<int64_t>(int64_t min, int64_t max, int64_t value,
const ColumnCommandDataType & type);
const execplan::CalpontSystemCatalog::ColType & type);
template
bool LBIDList::checkRangeOverlap<int128_t>(int128_t min, int128_t max, int128_t tmin, int128_t tmax,
const ColumnCommandDataType &type);
const execplan::CalpontSystemCatalog::ColType & type);
template
bool LBIDList::checkRangeOverlap<int64_t>(int64_t min, int64_t max, int64_t tmin, int64_t tmax,
const ColumnCommandDataType &type);
const execplan::CalpontSystemCatalog::ColType & type);
} //namespace joblist

View File

@ -104,25 +104,25 @@ public:
template <typename T>
void UpdateMinMax(T min, T max, int64_t lbid,
const execplan::ColumnCommandDataType & type, bool validData = true);
const execplan::CalpontSystemCatalog::ColType & type, bool validData = true);
void UpdateAllPartitionInfo(const execplan::ColumnCommandDataType & colType);
void UpdateAllPartitionInfo(const execplan::CalpontSystemCatalog::ColType& colType);
bool IsRangeBoundary(uint64_t lbid);
bool CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange,
const messageqcpp::ByteStream* MsgDataPtr,
const uint16_t NOPS,
const execplan::ColumnCommandDataType& ct,
const execplan::CalpontSystemCatalog::ColType& ct,
const uint8_t BOP);
template<typename T>
bool checkSingleValue(T min, T max, T value,
const execplan::ColumnCommandDataType & type);
const execplan::CalpontSystemCatalog::ColType & type);
template<typename T>
bool checkRangeOverlap(T min, T max, T tmin, T tmax,
const execplan::ColumnCommandDataType & type);
const execplan::CalpontSystemCatalog::ColType & type);
// check the column data type and the column size to determine if it
// is a data type to apply casual paritioning.

View File

@ -49,11 +49,11 @@ PassThruCommandJL::PassThruCommandJL(const PassThruStep& p)
{
OID = p.oid();
colName = p.name();
colWidth = p.colType().colWidth;
colWidth = p.colWidth;
// cout << "PassThru col width = " << (int) colWidth << " for OID " << OID << endl;
/* Is this ever a dictionary column? */
if (p.isDictCol())
if (p.isDictColumn)
tableColumnType = TableColumn::STRING;
else
switch (colWidth)

View File

@ -208,17 +208,40 @@ using namespace execplan;
namespace joblist
{
PassThruStep::PassThruStep(const pColStep& rhs)
:JobStep(rhs),
fColType(rhs.colType()),
fRm(rhs.fRm)
PassThruStep::PassThruStep(
execplan::CalpontSystemCatalog::OID oid,
execplan::CalpontSystemCatalog::OID tableOid,
const execplan::CalpontSystemCatalog::ColType& colType,
const JobInfo& jobInfo) :
JobStep(jobInfo),
fOid(oid),
fTableOid(tableOid),
isEM(jobInfo.isExeMgr),
fSwallowRows(false),
fRm(jobInfo.rm)
{
colWidth = colType.colWidth;
realWidth = colType.colWidth;
isDictColumn = ((colType.colDataType == CalpontSystemCatalog::VARCHAR && colType.colWidth > 7)
|| (colType.colDataType == CalpontSystemCatalog::CHAR && colType.colWidth > 8)
|| (colType.colDataType == CalpontSystemCatalog::TEXT)
|| (colType.colDataType == CalpontSystemCatalog::BLOB));
fColType = colType;
fPseudoType = 0;
}
PassThruStep::PassThruStep(const pColStep& rhs) : JobStep(rhs), fRm(rhs.fRm)
{
fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation();
colWidth = rhs.fColType.colWidth;
realWidth = rhs.realWidth;
fOid = rhs.oid();
fTableOid = rhs.tableOid();
fSwallowRows = rhs.getSwallowRows();
isDictColumn = rhs.isDictCol();
fColType = rhs.colType();
isEM = rhs.isExeMgr();
const PseudoColStep* pcs = dynamic_cast<const PseudoColStep*>(&rhs);
@ -228,17 +251,17 @@ PassThruStep::PassThruStep(const pColStep& rhs)
}
PassThruStep::PassThruStep(const PseudoColStep& rhs)
:JobStep(rhs),
fColType(rhs.colType()),
fRm(rhs.fRm)
PassThruStep::PassThruStep(const PseudoColStep& rhs) : JobStep(rhs), fRm(rhs.fRm)
{
fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation();
colWidth = rhs.fColType.colWidth;
realWidth = rhs.realWidth;
fOid = rhs.oid();
fTableOid = rhs.tableOid();
fSwallowRows = rhs.getSwallowRows();
isDictColumn = rhs.isDictCol();
fColType = rhs.colType();
fPseudoType = rhs.pseudoColumnId();
isEM = rhs.isExeMgr();
}

View File

@ -119,7 +119,7 @@ namespace joblist
pColScanStep::pColScanStep(
CalpontSystemCatalog::OID o,
CalpontSystemCatalog::OID t,
const CalpontSystemCatalog::DataType& ct,
const CalpontSystemCatalog::ColType& ct,
const JobInfo& jobInfo) :
JobStep(jobInfo),
fRm(jobInfo.rm),
@ -128,7 +128,7 @@ pColScanStep::pColScanStep(
fFilterCount(0),
fOid(o),
fTableOid(t),
fColType(ct, t != 0 /* For cross-engine we preserve the exact data type */),
fColType(ct),
fBOP(BOP_OR),
sentCount(0),
recvCount(0),
@ -153,6 +153,41 @@ pColScanStep::pColScanStep(
recvWaiting = 0;
recvExited = 0;
rDoNothing = false;
fIsDict = false;
//If this is a dictionary column, fudge the numbers...
if ( fColType.colDataType == CalpontSystemCatalog::VARCHAR )
{
if (8 > fColType.colWidth && 4 <= fColType.colWidth )
fColType.colDataType = CalpontSystemCatalog::CHAR;
fColType.colWidth++;
}
//If this is a dictionary column, fudge the numbers...
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY)
|| (fColType.colDataType == CalpontSystemCatalog::BLOB)
|| (fColType.colDataType == CalpontSystemCatalog::TEXT))
{
fColType.colWidth = 8;
fIsDict = true;
}
// MCOL-641 WIP
else if (fColType.colWidth > 8
&& fColType.colDataType != CalpontSystemCatalog::DECIMAL
&& fColType.colDataType != CalpontSystemCatalog::UDECIMAL)
{
fColType.colWidth = 8;
fIsDict = true;
//TODO: is this right?
fColType.colDataType = CalpontSystemCatalog::VARCHAR;
}
//Round colWidth up
if (fColType.colWidth == 3)
fColType.colWidth = 4;
else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
fColType.colWidth = 8;
err = dbrm.lookup(fOid, lbidRanges);
@ -973,8 +1008,7 @@ uint64_t pColScanStep::getFBO(uint64_t lbid)
pColScanStep::pColScanStep(const pColStep& rhs) :
JobStep(rhs),
fRm(rhs.resourceManager()),
fMsgHeader(),
fColType(rhs.colType())
fMsgHeader()
{
fNumThreads = fRm->getJlNumScanReceiveThreads();
fFilterCount = rhs.filterCount();
@ -982,7 +1016,9 @@ pColScanStep::pColScanStep(const pColStep& rhs) :
isFilterFeeder = rhs.getFeederFlag();
fOid = rhs.oid();
fTableOid = rhs.tableOid();
fColType = rhs.colType();
fBOP = rhs.BOP();
fIsDict = rhs.isDictCol();
sentCount = 0;
recvCount = 0;
fScanLbidReqLimit = fRm->getJlScanLbidReqLimit();

View File

@ -102,14 +102,14 @@ struct pColStepAggregator
pColStep::pColStep(
CalpontSystemCatalog::OID o,
CalpontSystemCatalog::OID t,
const CalpontSystemCatalog::DataType& ct,
const CalpontSystemCatalog::ColType& ct,
const JobInfo& jobInfo) :
JobStep(jobInfo),
fRm(jobInfo.rm),
sysCat(jobInfo.csc),
fOid(o),
fTableOid(t),
fColType(ct, t != 0 /* For cross-engine we preserve the exact data type */),
fColType(ct),
fFilterCount(0),
fBOP(BOP_NONE),
ridList(0),
@ -117,6 +117,7 @@ pColStep::pColStep(
msgsRecvd(0),
finishedSending(false),
recvWaiting(false),
fIsDict(false),
isEM(jobInfo.isExeMgr),
ridCount(0),
fFlushInterval(jobInfo.flushInterval),
@ -156,7 +157,40 @@ pColStep::pColStep(
throw runtime_error(oss.str());
}
realWidth = ct.colWidth;
realWidth = fColType.colWidth;
if ( fColType.colDataType == CalpontSystemCatalog::VARCHAR )
{
if (8 > fColType.colWidth && 4 <= fColType.colWidth )
fColType.colDataType = CalpontSystemCatalog::CHAR;
fColType.colWidth++;
}
//If this is a dictionary column, fudge the numbers...
if ((fColType.colDataType == CalpontSystemCatalog::VARBINARY)
|| (fColType.colDataType == CalpontSystemCatalog::BLOB)
|| (fColType.colDataType == CalpontSystemCatalog::TEXT))
{
fColType.colWidth = 8;
fIsDict = true;
}
// WIP MCOL-641
else if (fColType.colWidth > 8
&& fColType.colDataType != CalpontSystemCatalog::DECIMAL
&& fColType.colDataType != CalpontSystemCatalog::UDECIMAL)
{
fColType.colWidth = 8;
fIsDict = true;
//TODO: is this right?
fColType.colDataType = CalpontSystemCatalog::VARCHAR;
}
//Round colWidth up
if (fColType.colWidth == 3)
fColType.colWidth = 4;
else if (fColType.colWidth == 5 || fColType.colWidth == 6 || fColType.colWidth == 7)
fColType.colWidth = 8;
idbassert(fColType.colWidth > 0);
ridsPerBlock = BLOCK_SIZE / fColType.colWidth;
@ -247,6 +281,7 @@ pColStep::pColStep(const pColScanStep& rhs) :
msgsRecvd(0),
finishedSending(false),
recvWaiting(false),
fIsDict(rhs.isDictCol()),
ridCount(0),
// Per Cindy, it's save to put fFlushInterval to be 0
fFlushInterval(0),
@ -356,6 +391,7 @@ pColStep::pColStep(const PassThruStep& rhs) :
msgsRecvd(0),
finishedSending(false),
recvWaiting(false),
fIsDict(rhs.isDictCol()),
ridCount(0),
// Per Cindy, it's save to put fFlushInterval to be 0
fFlushInterval(0),

View File

@ -343,27 +343,18 @@ struct ColRequestHeaderDataType: public datatypes::Charset
int32_t CompType;
uint16_t DataSize;
uint8_t DataType; // enum ColDataType defined in calpont system catalog header file
private:
bool mIsDict;
public:
ColRequestHeaderDataType()
:Charset(my_charset_bin),
CompType(0),
DataSize(0),
DataType(0),
mIsDict(false)
DataType(0)
{ }
ColRequestHeaderDataType(const execplan::ColumnCommandDataType &rhs)
ColRequestHeaderDataType(const execplan::CalpontSystemCatalog::ColType &rhs)
:Charset(rhs.charsetNumber),
CompType(rhs.compressionType),
DataSize(rhs.colWidth),
DataType(rhs.colDataType),
mIsDict(rhs.isDict())
DataType(rhs.colDataType)
{ }
bool isDict() const
{
return mIsDict;
}
};

View File

@ -141,7 +141,7 @@ public:
pColStep(
execplan::CalpontSystemCatalog::OID oid,
execplan::CalpontSystemCatalog::OID tableOid,
const execplan::CalpontSystemCatalog::DataType& ct,
const execplan::CalpontSystemCatalog::ColType& ct,
const JobInfo& jobInfo);
pColStep(const pColScanStep& rhs);
@ -165,7 +165,7 @@ public:
virtual bool isDictCol() const
{
return fColType.isDict();
return fIsDict;
};
bool isExeMgr() const
{
@ -265,7 +265,7 @@ public:
{
return fBOP;
}
const execplan::ColumnCommandDataType& colType() const
const execplan::CalpontSystemCatalog::ColType& colType() const
{
return fColType;
}
@ -352,7 +352,7 @@ private:
boost::shared_ptr<execplan::CalpontSystemCatalog> sysCat;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
execplan::ColumnCommandDataType fColType;
execplan::CalpontSystemCatalog::ColType fColType;
uint32_t fFilterCount;
int8_t fBOP;
int8_t fOutputType;
@ -364,7 +364,7 @@ private:
uint32_t extentSize, divShift, modMask, ridsPerBlock, rpbShift, blockSizeShift, numExtents;
uint64_t rpbMask;
uint64_t msgsSent, msgsRecvd;
bool finishedSending, recvWaiting;
bool finishedSending, recvWaiting, fIsDict;
bool isEM;
int64_t ridCount;
uint32_t fFlushInterval;
@ -429,7 +429,7 @@ public:
pColScanStep(
execplan::CalpontSystemCatalog::OID oid,
execplan::CalpontSystemCatalog::OID tableOid,
const execplan::CalpontSystemCatalog::DataType& ct,
const execplan::CalpontSystemCatalog::ColType& ct,
const JobInfo& jobInfo);
pColScanStep(const pColStep& rhs);
@ -449,7 +449,7 @@ public:
virtual bool isDictCol() const
{
return fColType.isDict();
return fIsDict;
};
/** @brief The main loop for the send-side thread
@ -534,7 +534,7 @@ public:
{
return fTableOid;
}
const execplan::ColumnCommandDataType& colType() const
const execplan::CalpontSystemCatalog::ColType& colType() const
{
return fColType;
}
@ -631,7 +631,7 @@ private:
uint32_t fFilterCount;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
execplan::ColumnCommandDataType fColType;
execplan::CalpontSystemCatalog::ColType fColType;
int8_t fBOP;
int8_t fOutputType;
uint32_t sentCount;
@ -645,7 +645,7 @@ private:
boost::mutex cpMutex;
boost::condition condvar;
boost::condition condvarWakeupProducer;
bool finishedSending, sendWaiting, rDoNothing;
bool finishedSending, sendWaiting, rDoNothing, fIsDict;
uint32_t recvWaiting, recvExited;
std::vector<struct BRM::EMEntry> extents;
@ -1084,6 +1084,8 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
public:
TupleBPS(const pColStep& rhs, const JobInfo& jobInfo);
TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo);
TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo);
TupleBPS(const pDictionaryScan& rhs, const JobInfo& jobInfo);
TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo);
virtual ~TupleBPS();
@ -1198,7 +1200,7 @@ public:
{
return fTableOid;
}
const execplan::ColumnCommandDataType & colType() const
const execplan::CalpontSystemCatalog::ColType& colType() const
{
return fColType;
}
@ -1373,7 +1375,7 @@ private:
std::vector<uint64_t> fProducerThreads; // thread pool handles
messageqcpp::ByteStream fFilterString;
uint32_t fFilterCount;
execplan::ColumnCommandDataType fColType;
execplan::CalpontSystemCatalog::ColType fColType;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
uint64_t fLastTupleId;
@ -1606,6 +1608,12 @@ class PassThruStep : public JobStep, public PrimitiveMsg
public:
/** @brief PassThruStep constructor
*/
PassThruStep(
execplan::CalpontSystemCatalog::OID oid,
execplan::CalpontSystemCatalog::OID tableOid,
const execplan::CalpontSystemCatalog::ColType& colType,
const JobInfo& jobInfo);
PassThruStep(const pColStep& rhs);
PassThruStep(const PseudoColStep& rhs);
@ -1634,15 +1642,20 @@ public:
{
return fTableOid;
}
uint8_t getColWidth() const
{
return colWidth;
}
bool isDictCol() const
{
return fColType.isDict();
return isDictColumn;
}
bool isExeMgr() const
{
return isEM;
}
const execplan::ColumnCommandDataType & colType() const
const execplan::CalpontSystemCatalog::ColType& colType() const
{
return fColType;
}
@ -1674,9 +1687,11 @@ private:
boost::shared_ptr<execplan::CalpontSystemCatalog> catalog;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
uint8_t colWidth;
uint16_t realWidth;
uint32_t fPseudoType;
execplan::ColumnCommandDataType fColType;
execplan::CalpontSystemCatalog::ColType fColType;
bool isDictColumn;
bool isEM;
// boost::thread* fPTThd;

View File

@ -96,7 +96,7 @@ uint32_t RowEstimator::daysThroughMonth(uint32_t mth)
// This function takes a column value and if necessary adjusts it based on rules defined in the requirements.
// The values are adjusted so that one can be subtracted from another to find a range, compare, etc.
uint64_t RowEstimator::adjustValue(const ColumnCommandDataType & ct,
uint64_t RowEstimator::adjustValue(const execplan::CalpontSystemCatalog::ColType& ct,
const uint64_t& value)
{
switch (ct.colDataType)
@ -141,7 +141,7 @@ uint64_t RowEstimator::adjustValue(const ColumnCommandDataType & ct,
// Estimates the number of distinct values given a min/max range. When the range has not been set,
// rules from the requirements are used based on the column type.
template<typename T>
uint32_t RowEstimator::estimateDistinctValues(const ColumnCommandDataType &ct,
uint32_t RowEstimator::estimateDistinctValues(const execplan::CalpontSystemCatalog::ColType& ct,
const T& min,
const T& max,
const char cpStatus)
@ -214,7 +214,7 @@ template<class T>
float RowEstimator::estimateOpFactor(const T& min, const T& max, const T& value,
char op, uint8_t lcf,
uint32_t distinctValues, char cpStatus,
const ColumnCommandDataType& ct)
const execplan::CalpontSystemCatalog::ColType& ct)
{
float factor = 1.0;
@ -295,7 +295,7 @@ float RowEstimator::estimateOpFactor(const T& min, const T& max, const T& value,
float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry,
const messageqcpp::ByteStream* bs,
const uint16_t NOPS,
const ColumnCommandDataType& ct,
const execplan::CalpontSystemCatalog::ColType& ct,
const uint8_t BOP,
const uint32_t& rowsInExtent)
{

View File

@ -84,13 +84,13 @@ private:
* @param value The column value.
*
*/
uint64_t adjustValue(const execplan::ColumnCommandDataType & ct,
uint64_t adjustValue(const execplan::CalpontSystemCatalog::ColType& ct,
const uint64_t& value);
uint32_t daysThroughMonth(uint32_t mth);
template<typename T>
uint32_t estimateDistinctValues(const execplan::ColumnCommandDataType & ct,
uint32_t estimateDistinctValues(const execplan::CalpontSystemCatalog::ColType& ct,
const T& min,
const T& max,
const char cpStatus);
@ -108,7 +108,7 @@ private:
template<class T>
float estimateOpFactor(const T& min, const T& max, const T& value, char op, uint8_t lcf,
uint32_t distinctValues, char cpStatus,
const execplan::ColumnCommandDataType & ct);
const execplan::CalpontSystemCatalog::ColType& ct);
/** @brief returns a factor between 0 and 1 for the estimate of rows that will qualify
* the given operation(s).
@ -126,7 +126,7 @@ private:
float estimateRowReturnFactor(const BRM::EMEntry& emEntry,
const messageqcpp::ByteStream* msgDataPtr,
const uint16_t NOPS,
const execplan::ColumnCommandDataType & ct,
const execplan::CalpontSystemCatalog::ColType& ct,
const uint8_t BOP,
const uint32_t& rowsInExtent);

View File

@ -195,10 +195,7 @@ void TupleBPS::initializeConfigParms()
}
TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
BatchPrimitive(jobInfo),
fColType(rhs.colType()),
pThread(0),
fRm(jobInfo.rm)
BatchPrimitive(jobInfo), pThread(0), fRm(jobInfo.rm)
{
fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation();
@ -237,6 +234,7 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
fStepCount = 1;
fCPEvaluated = false;
fEstimatedRows = 0;
fColType = rhs.colType();
alias(rhs.alias());
view(rhs.view());
name(rhs.name());
@ -282,9 +280,7 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo) :
}
TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
BatchPrimitive(jobInfo),
fColType(rhs.colType()),
fRm(jobInfo.rm)
BatchPrimitive(jobInfo), fRm(jobInfo.rm)
{
fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation();
@ -321,6 +317,7 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
fStepCount = 1;
fCPEvaluated = false;
fEstimatedRows = 0;
fColType = rhs.colType();
alias(rhs.alias());
view(rhs.view());
name(rhs.name());
@ -367,9 +364,7 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) :
}
TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
BatchPrimitive(jobInfo),
fColType(rhs.colType()),
fRm(jobInfo.rm)
BatchPrimitive(jobInfo), fRm(jobInfo.rm)
{
fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation();
@ -391,6 +386,7 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
fStepCount = 1;
fCPEvaluated = false;
fEstimatedRows = 0;
fColType = rhs.colType();
alias(rhs.alias());
view(rhs.view());
name(rhs.name());
@ -435,6 +431,70 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) :
}
TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo) :
BatchPrimitive(jobInfo), fRm(jobInfo.rm)
{
fInputJobStepAssociation = rhs.inputAssociation();
fOutputJobStepAssociation = rhs.outputAssociation();
fDec = 0;
fOid = rhs.oid();
fTableOid = rhs.tableOid();
totalMsgs = 0;
msgsSent = 0;
msgsRecvd = 0;
ridsReturned = 0;
ridsRequested = 0;
fNumBlksSkipped = 0;
fBlockTouched = 0;
fMsgBytesIn = 0;
fMsgBytesOut = 0;
fExtentsPerSegFile = DEFAULT_EXTENTS_PER_SEG_FILE;
recvWaiting = 0;
fSwallowRows = false;
fStepCount = 1;
fCPEvaluated = false;
fEstimatedRows = 0;
alias(rhs.alias());
view(rhs.view());
name(rhs.name());
finishedSending = sendWaiting = false;
recvExited = 0;
fBPP.reset(new BatchPrimitiveProcessorJL(fRm));
initializeConfigParms();
fBPP->setSessionID(fSessionId);
fBPP->setStepID(fStepId);
fBPP->setQueryContext(fVerId);
fBPP->setTxnID(fTxnId);
fTraceFlags = rhs.fTraceFlags;
fBPP->setTraceFlags(fTraceFlags);
fBPP->setOutputType(ROW_GROUP);
fPhysicalIO = 0;
fCacheIO = 0;
BPPIsAllocated = false;
uniqueID = UniqueNumberGenerator::instance()->getUnique32();
fBPP->setUniqueID(uniqueID);
fBPP->setUuid(fStepUuid);
fCardinality = rhs.cardinality();
doJoin = false;
hasPMJoin = false;
hasUMJoin = false;
fRunExecuted = false;
isFilterFeeder = false;
smallOuterJoiner = -1;
// @1098 initialize scanFlags to be true
scanFlags.assign(numExtents, true);
runtimeCPFlags.assign(numExtents, true);
bop = BOP_AND;
runRan = joinRan = false;
fDelivery = false;
fExtendedInfo = "TBPS: ";
fQtc.stepParms().stepType = StepTeleStats::T_BPS;
hasPCFilter = hasPMFilter = hasRIDFilter = hasSegmentFilter = hasDBRootFilter = hasSegmentDirFilter =
hasPartitionFilter = hasMaxFilter = hasMinFilter = hasLBIDFilter = hasExtentIDFilter = false;
}
TupleBPS::~TupleBPS()
{
@ -3182,7 +3242,7 @@ void TupleBPS::addCPPredicates(uint32_t OID, const vector<int128_t>& vals, bool
if (cmd != NULL && cmd->getOID() == OID)
{
const ColumnCommandDataType &colType = cmd->getColType();
const execplan::CalpontSystemCatalog::ColType& colType = cmd->getColType();
if (!ll.CasualPartitionDataType(colType.colDataType, colType.colWidth)
|| cmd->isDict())