You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-641 1. Templatized convertValueNum() function.
2. Allocate int128_t buffers in batchprimitiveprocessor if a query involves wide decimal columns.
This commit is contained in:
committed by
Roman Nozdrin
parent
9b714274db
commit
62d0c82d75
@ -44,6 +44,7 @@
|
||||
#include "bytestream.h"
|
||||
#include "joblisttypes.h"
|
||||
#include "stdexcept"
|
||||
#include "widedecimalutils.h"
|
||||
|
||||
#undef min
|
||||
#undef max
|
||||
@ -1046,6 +1047,11 @@ inline bool isSignedInteger(const execplan::CalpontSystemCatalog::ColDataType ty
|
||||
}
|
||||
}
|
||||
|
||||
inline bool isNull(int128_t val, const execplan::CalpontSystemCatalog::ColType& ct)
|
||||
{
|
||||
return utils::isWideDecimalNullValue(val);
|
||||
}
|
||||
|
||||
inline bool isNull(int64_t val, const execplan::CalpontSystemCatalog::ColType& ct)
|
||||
{
|
||||
bool ret = false;
|
||||
|
@ -60,6 +60,7 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
|
||||
baseRid(0),
|
||||
ridCount(0),
|
||||
needStrValues(false),
|
||||
hasWideDecimalType(false),
|
||||
filterCount(0),
|
||||
projectCount(0),
|
||||
needRidsAtDelivery(false),
|
||||
@ -100,6 +101,8 @@ void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, vector<B
|
||||
filterSteps.push_back(cc);
|
||||
filterCount++;
|
||||
_hasScan = true;
|
||||
if (utils::isWide(cc->getWidth()))
|
||||
hasWideDecimalType = true;
|
||||
idbassert(sessionID == scan.sessionId());
|
||||
}
|
||||
|
||||
@ -114,6 +117,9 @@ void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
|
||||
cc->setStepUuid(uuid);
|
||||
filterSteps.push_back(cc);
|
||||
filterCount++;
|
||||
// TODO MCOL-641 How do we get to this execution path?
|
||||
//if (utils::isWide(cc->getWidth()))
|
||||
// hasWideDecimalType = true;
|
||||
idbassert(sessionID == pcs.sessionId());
|
||||
}
|
||||
|
||||
@ -128,6 +134,8 @@ void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
|
||||
cc->setStepUuid(uuid);
|
||||
filterSteps.push_back(cc);
|
||||
filterCount++;
|
||||
if (utils::isWide(cc->getWidth()))
|
||||
hasWideDecimalType = true;
|
||||
idbassert(sessionID == step.sessionId());
|
||||
}
|
||||
|
||||
@ -182,6 +190,9 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
|
||||
colWidths.push_back(cc->getWidth());
|
||||
tupleLength += cc->getWidth();
|
||||
projectCount++;
|
||||
// TODO MCOL-641 How do we get to this execution path?
|
||||
//if (utils::isWide(cc->getWidth()))
|
||||
// hasWideDecimalType = true;
|
||||
idbassert(sessionID == step.sessionId());
|
||||
}
|
||||
|
||||
@ -198,6 +209,8 @@ void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
|
||||
colWidths.push_back(cc->getWidth());
|
||||
tupleLength += cc->getWidth();
|
||||
projectCount++;
|
||||
if (utils::isWide(cc->getWidth()))
|
||||
hasWideDecimalType = true;
|
||||
idbassert(sessionID == step.sessionId());
|
||||
}
|
||||
|
||||
@ -215,6 +228,9 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
|
||||
tupleLength += cc->getWidth();
|
||||
projectCount++;
|
||||
|
||||
if (utils::isWide(cc->getWidth()))
|
||||
hasWideDecimalType = true;
|
||||
|
||||
if (filterCount == 0 && !sendRowGroups)
|
||||
sendValues = true;
|
||||
|
||||
@ -958,7 +974,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
|
||||
{
|
||||
ISMPacketHeader ism;
|
||||
uint32_t i;
|
||||
uint8_t flags = 0;
|
||||
uint16_t flags = 0;
|
||||
|
||||
ism.Command = BATCH_PRIMITIVE_CREATE;
|
||||
|
||||
@ -994,6 +1010,9 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
|
||||
if (sendTupleJoinRowGroupData)
|
||||
flags |= JOIN_ROWGROUP_DATA;
|
||||
|
||||
if (hasWideDecimalType)
|
||||
flags |= HAS_WIDE_DECIMAL;
|
||||
|
||||
bs << flags;
|
||||
|
||||
bs << bop;
|
||||
|
@ -281,10 +281,15 @@ private:
|
||||
|
||||
uint16_t relRids[LOGICAL_BLOCK_RIDS];
|
||||
boost::scoped_array<uint64_t> absRids;
|
||||
// TODO MCOL-641 Do we need uint128_t buffers here?
|
||||
// When would sendValues=true, in which case values[]
|
||||
// is sent to primproc?
|
||||
uint64_t values[LOGICAL_BLOCK_RIDS];
|
||||
uint16_t ridCount;
|
||||
bool needStrValues;
|
||||
|
||||
bool hasWideDecimalType;
|
||||
|
||||
std::vector<SCommand> filterSteps;
|
||||
std::vector<SCommand> projectSteps;
|
||||
//@bug 1136
|
||||
|
@ -332,7 +332,7 @@ string extractTableAlias(const SSC& sc)
|
||||
//------------------------------------------------------------------------------
|
||||
CalpontSystemCatalog::OID isDictCol(const CalpontSystemCatalog::ColType& colType)
|
||||
{
|
||||
if (colType.colDataType == CalpontSystemCatalog::BINARY) return 0;
|
||||
if (utils::isWideDecimalType(colType)) return 0;
|
||||
|
||||
if (colType.colWidth > 8) return colType.ddn.dictOID;
|
||||
|
||||
|
@ -88,7 +88,7 @@ using namespace logging;
|
||||
#include "jlf_common.h"
|
||||
#include "jlf_subquery.h"
|
||||
#include "jlf_tuplejoblist.h"
|
||||
|
||||
#include "columnwidth.h"
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -312,11 +312,17 @@ int64_t valueNullNum(const CalpontSystemCatalog::ColType& ct, const string& time
|
||||
return n;
|
||||
}
|
||||
|
||||
int64_t convertValueNum(const string& str, const CalpontSystemCatalog::ColType& ct, bool isNull, uint8_t& rf, const string& timeZone)
|
||||
template <typename T>
|
||||
void convertValueNum(const string& str, const CalpontSystemCatalog::ColType& ct, bool isNull, uint8_t& rf, const string& timeZone, T& v)
|
||||
{
|
||||
if (str.size() == 0 || isNull ) return valueNullNum(ct, timeZone);
|
||||
if (str.size() == 0 || isNull )
|
||||
{
|
||||
v = valueNullNum(ct, timeZone);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t v = 0;
|
||||
|
||||
v = 0;
|
||||
rf = 0;
|
||||
bool pushWarning = false;
|
||||
boost::any anyVal = DataConvert::convertColumnData(ct, str, pushWarning, timeZone, false, true, false);
|
||||
@ -450,8 +456,10 @@ int64_t convertValueNum(const string& str, const CalpontSystemCatalog::ColType&
|
||||
#else
|
||||
v = boost::any_cast<int32_t>(anyVal);
|
||||
#endif
|
||||
else
|
||||
else if (ct.colWidth == execplan::CalpontSystemCatalog::EIGHT_BYTE)
|
||||
v = boost::any_cast<long long>(anyVal);
|
||||
else
|
||||
v = boost::any_cast<int128_t>(anyVal);
|
||||
|
||||
break;
|
||||
|
||||
@ -485,8 +493,6 @@ int64_t convertValueNum(const string& str, const CalpontSystemCatalog::ColType&
|
||||
|
||||
rf = (data[0] == '-') ? ROUND_NEG : ROUND_POS;
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
//TODO: make this totaly case-insensitive
|
||||
@ -1840,8 +1846,8 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
{
|
||||
// @bug 1151 string longer than colwidth of char/varchar.
|
||||
int64_t value = 0;
|
||||
int128_t value128 = 0;
|
||||
uint8_t rf = 0;
|
||||
unsigned __int128 val128 = 0;
|
||||
#ifdef FAILED_ATOI_IS_ZERO
|
||||
|
||||
//if cvn throws (because there's non-digit data in the string, treat that as zero rather than
|
||||
@ -1849,7 +1855,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
try
|
||||
{
|
||||
bool isNull = ConstantColumn::NULLDATA == cc->type();
|
||||
value = convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone);
|
||||
convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value);
|
||||
|
||||
if (ct.colDataType == CalpontSystemCatalog::FLOAT && !isNull)
|
||||
{
|
||||
@ -1887,21 +1893,14 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
}
|
||||
|
||||
#else
|
||||
bool isNull = ConstantColumn::NULLDATA == cc->type();
|
||||
// WIP MCOL-641 width check must be a f() not a literal
|
||||
// make a template from convertValueNum to avoid extra if
|
||||
// this condition doesn't support UDECIMAL
|
||||
if (ct.colDataType == CalpontSystemCatalog::DECIMAL &&
|
||||
ct.colWidth == 16)
|
||||
{
|
||||
bool saturate = false;
|
||||
val128 = dataconvert::string_to_ll<int128_t>(constval, saturate);
|
||||
// TODO MCOL-641 check saturate
|
||||
}
|
||||
if (utils::isWideDecimalType(ct))
|
||||
convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value128);
|
||||
else
|
||||
{
|
||||
bool isNull = ConstantColumn::NULLDATA == cc->type();
|
||||
value = convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone);
|
||||
}
|
||||
convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value);
|
||||
|
||||
if (ct.colDataType == CalpontSystemCatalog::FLOAT && !isNull)
|
||||
{
|
||||
@ -1935,10 +1934,8 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
|
||||
if (sc->isColumnStore())
|
||||
{
|
||||
// WIP MCOL-641
|
||||
if (ct.colDataType == CalpontSystemCatalog::DECIMAL &&
|
||||
ct.colWidth == 16)
|
||||
pcs->addFilter(cop, val128, rf);
|
||||
if (utils::isWideDecimalType(ct))
|
||||
pcs->addFilter(cop, value128, rf);
|
||||
else
|
||||
pcs->addFilter(cop, value, rf);
|
||||
}
|
||||
@ -3008,12 +3005,17 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo)
|
||||
//add each filter to pColStep
|
||||
int8_t cop = op2num(sop);
|
||||
int64_t value = 0;
|
||||
int128_t value128 = 0;
|
||||
string constval = cc->constval();
|
||||
|
||||
// @bug 1151 string longer than colwidth of char/varchar.
|
||||
uint8_t rf = 0;
|
||||
bool isNull = ConstantColumn::NULLDATA == cc->type();
|
||||
value = convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone);
|
||||
|
||||
if (utils::isWideDecimalType(ct))
|
||||
convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value128);
|
||||
else
|
||||
convertValueNum(constval, ct, isNull, rf, jobInfo.timeZone, value);
|
||||
|
||||
if (ct.colDataType == CalpontSystemCatalog::FLOAT && !isNull)
|
||||
{
|
||||
@ -3030,6 +3032,9 @@ const JobStepVector doConstantFilter(const ConstantFilter* cf, JobInfo& jobInfo)
|
||||
if (ConstantColumn::NULLDATA == cc->type() && (opeq == *sop || opne == *sop))
|
||||
cop = COMPARE_NIL;
|
||||
|
||||
if (utils::isWideDecimalType(ct))
|
||||
pcs->addFilter(cop, value128, rf);
|
||||
else
|
||||
pcs->addFilter(cop, value, rf);
|
||||
}
|
||||
}
|
||||
@ -3453,7 +3458,6 @@ JLF_ExecPlanToJobList::walkTree(execplan::ParseTree* n, JobInfo& jobInfo)
|
||||
break;
|
||||
|
||||
case CONSTANTFILTER:
|
||||
//cout << "ConstantFilter" << endl;
|
||||
jsv = doConstantFilter(dynamic_cast<const ConstantFilter*>(tn), jobInfo);
|
||||
JLF_ExecPlanToJobList::addJobSteps(jsv, jobInfo, false);
|
||||
break;
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "brm.h"
|
||||
#include "brmtypes.h"
|
||||
#include "dataconvert.h"
|
||||
#include "columnwidth.h"
|
||||
|
||||
#define IS_VERBOSE (fDebug >= 4)
|
||||
#define IS_DETAIL (fDebug >= 3)
|
||||
@ -808,7 +809,12 @@ bool LBIDList::CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange,
|
||||
|
||||
// Should we also check for empty here?
|
||||
// TODO MCOL-641
|
||||
if (isNull(value, ct)) // This will work even if the data column is unsigned.
|
||||
if (utils::isWideDecimalType(ct))
|
||||
{
|
||||
if (isNull(bigValue, ct))
|
||||
continue;
|
||||
}
|
||||
else if (isNull(value, ct)) // This will work even if the data column is unsigned.
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
@ -635,13 +635,13 @@ void pColStep::addFilter(int8_t COP, int64_t value, uint8_t roundFlag)
|
||||
}
|
||||
|
||||
// WIP MCOL-641
|
||||
void pColStep::addFilter(int8_t COP, unsigned __int128 value, uint8_t roundFlag)
|
||||
void pColStep::addFilter(int8_t COP, const int128_t& value, uint8_t roundFlag)
|
||||
{
|
||||
fFilterString << (uint8_t) COP;
|
||||
fFilterString << roundFlag;
|
||||
|
||||
// bitwise copies into the filter ByteStream
|
||||
fFilterString << value;
|
||||
fFilterString << *reinterpret_cast<const uint128_t*>(&value);
|
||||
|
||||
fFilterCount++;
|
||||
}
|
||||
|
@ -190,14 +190,15 @@ enum ISMPACKETCOMMAND
|
||||
#undef PRIM_DELIVERBASE
|
||||
|
||||
/* Flags for BPP messages */
|
||||
const uint8_t NEED_STR_VALUES = 0x01; //1;
|
||||
const uint8_t GOT_ABS_RIDS = 0x02; //2;
|
||||
const uint8_t GOT_VALUES = 0x04; //4;
|
||||
const uint8_t LBID_TRACE = 0x08; //8;
|
||||
const uint8_t HAS_JOINER = 0x10; //16;
|
||||
const uint8_t SEND_RIDS_AT_DELIVERY = 0x20; //32;
|
||||
const uint8_t HAS_ROWGROUP = 0x40; //64;
|
||||
const uint8_t JOIN_ROWGROUP_DATA = 0x80; //128
|
||||
const uint16_t NEED_STR_VALUES = 0x01; //1;
|
||||
const uint16_t GOT_ABS_RIDS = 0x02; //2;
|
||||
const uint16_t GOT_VALUES = 0x04; //4;
|
||||
const uint16_t LBID_TRACE = 0x08; //8;
|
||||
const uint16_t HAS_JOINER = 0x10; //16;
|
||||
const uint16_t SEND_RIDS_AT_DELIVERY = 0x20; //32;
|
||||
const uint16_t HAS_ROWGROUP = 0x40; //64;
|
||||
const uint16_t JOIN_ROWGROUP_DATA = 0x80; //128
|
||||
const uint16_t HAS_WIDE_DECIMAL = 0x100; //256;
|
||||
|
||||
//TODO: put this in a namespace to stop global ns pollution
|
||||
enum PrimFlags
|
||||
|
@ -196,7 +196,7 @@ public:
|
||||
void addFilter(int8_t COP, int64_t value, uint8_t roundFlag = 0);
|
||||
void addFilter(int8_t COP, float value);
|
||||
// WIP MCOL-641
|
||||
void addFilter(int8_t COP, unsigned __int128 value, uint8_t roundFlag = 0);
|
||||
void addFilter(int8_t COP, const int128_t& value, uint8_t roundFlag = 0);
|
||||
|
||||
/** @brief Sets the DataList to get RID values from.
|
||||
*
|
||||
|
@ -100,6 +100,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
|
||||
baseRid(0),
|
||||
ridCount(0),
|
||||
needStrValues(false),
|
||||
hasWideDecimalType(false),
|
||||
filterCount(0),
|
||||
projectCount(0),
|
||||
sendRidsAtDelivery(false),
|
||||
@ -145,6 +146,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
|
||||
baseRid(0),
|
||||
ridCount(0),
|
||||
needStrValues(false),
|
||||
hasWideDecimalType(false),
|
||||
filterCount(0),
|
||||
projectCount(0),
|
||||
sendRidsAtDelivery(false),
|
||||
@ -218,6 +220,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
||||
{
|
||||
uint32_t i;
|
||||
uint8_t tmp8;
|
||||
uint16_t tmp16;
|
||||
Command::CommandType type;
|
||||
|
||||
bs.advance(sizeof(ISMPacketHeader)); // skip the header
|
||||
@ -229,15 +232,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
||||
bs >> uniqueID;
|
||||
bs >> versionInfo;
|
||||
|
||||
bs >> tmp8;
|
||||
needStrValues = tmp8 & NEED_STR_VALUES;
|
||||
gotAbsRids = tmp8 & GOT_ABS_RIDS;
|
||||
gotValues = tmp8 & GOT_VALUES;
|
||||
LBIDTrace = tmp8 & LBID_TRACE;
|
||||
sendRidsAtDelivery = tmp8 & SEND_RIDS_AT_DELIVERY;
|
||||
doJoin = tmp8 & HAS_JOINER;
|
||||
hasRowGroup = tmp8 & HAS_ROWGROUP;
|
||||
getTupleJoinRowGroupData = tmp8 & JOIN_ROWGROUP_DATA;
|
||||
bs >> tmp16;
|
||||
needStrValues = tmp16 & NEED_STR_VALUES;
|
||||
gotAbsRids = tmp16 & GOT_ABS_RIDS;
|
||||
gotValues = tmp16 & GOT_VALUES;
|
||||
LBIDTrace = tmp16 & LBID_TRACE;
|
||||
sendRidsAtDelivery = tmp16 & SEND_RIDS_AT_DELIVERY;
|
||||
doJoin = tmp16 & HAS_JOINER;
|
||||
hasRowGroup = tmp16 & HAS_ROWGROUP;
|
||||
getTupleJoinRowGroupData = tmp16 & JOIN_ROWGROUP_DATA;
|
||||
hasWideDecimalType = tmp16 & HAS_WIDE_DECIMAL;
|
||||
|
||||
// This used to signify that there was input row data from previous jobsteps, and
|
||||
// it never quite worked right. No need to fix it or update it; all BPP's have started
|
||||
@ -1019,6 +1023,8 @@ void BatchPrimitiveProcessor::initProcessor()
|
||||
fFiltRidCount[i] = 0;
|
||||
fFiltCmdRids[i].reset(new uint16_t[LOGICAL_BLOCK_RIDS]);
|
||||
fFiltCmdValues[i].reset(new int64_t[LOGICAL_BLOCK_RIDS]);
|
||||
if (hasWideDecimalType)
|
||||
fFiltCmdBinaryValues[i].reset(new int128_t[LOGICAL_BLOCK_RIDS]);
|
||||
|
||||
if (filtOnString) fFiltStrValues[i].reset(new string[LOGICAL_BLOCK_RIDS]);
|
||||
}
|
||||
@ -1539,6 +1545,11 @@ void BatchPrimitiveProcessor::execute()
|
||||
projectSteps[j]->projectIntoRowGroup(fe1Input, projectForFE1[j]);
|
||||
|
||||
for (j = 0; j < ridCount; j++, fe1In.nextRow())
|
||||
// TODO MCOL-641
|
||||
// WHERE clause on a numeric and a non-numeric column
|
||||
// leads to this execution path:
|
||||
// SELECT a, b from t1 where a!=b
|
||||
// Here, a is e.g., decimal(38), b is varchar(15)
|
||||
if (fe1->evaluate(&fe1In))
|
||||
{
|
||||
applyMapping(fe1ToProjection, fe1In, &fe1Out);
|
||||
@ -2339,6 +2350,7 @@ SBPP BatchPrimitiveProcessor::duplicate()
|
||||
bpp->stepID = stepID;
|
||||
bpp->uniqueID = uniqueID;
|
||||
bpp->needStrValues = needStrValues;
|
||||
bpp->hasWideDecimalType = hasWideDecimalType;
|
||||
bpp->gotAbsRids = gotAbsRids;
|
||||
bpp->gotValues = gotValues;
|
||||
bpp->LBIDTrace = LBIDTrace;
|
||||
|
@ -205,10 +205,12 @@ private:
|
||||
|
||||
uint16_t relRids[LOGICAL_BLOCK_RIDS];
|
||||
int64_t values[LOGICAL_BLOCK_RIDS];
|
||||
int128_t binaryValues[LOGICAL_BLOCK_RIDS];
|
||||
boost::scoped_array<uint64_t> absRids;
|
||||
boost::scoped_array<std::string> strValues;
|
||||
uint16_t ridCount;
|
||||
bool needStrValues;
|
||||
bool hasWideDecimalType;
|
||||
|
||||
/* Common space for primitive data */
|
||||
static const uint32_t BUFFER_SIZE = 131072;
|
||||
@ -274,6 +276,7 @@ private:
|
||||
bool filtOnString;
|
||||
boost::scoped_array<uint16_t> fFiltCmdRids[2];
|
||||
boost::scoped_array<int64_t> fFiltCmdValues[2];
|
||||
boost::scoped_array<int128_t> fFiltCmdBinaryValues[2];
|
||||
boost::scoped_array<std::string> fFiltStrValues[2];
|
||||
uint64_t fFiltRidCount[2];
|
||||
|
||||
|
@ -71,7 +71,6 @@ ColumnCommand::~ColumnCommand() { }
|
||||
|
||||
void ColumnCommand::_execute()
|
||||
{
|
||||
// cout << "CC: executing" << endl;
|
||||
if (_isScan)
|
||||
makeScanMsg();
|
||||
else if (bpp->ridCount == 0) // this would cause a scan
|
||||
@ -93,11 +92,20 @@ void ColumnCommand::_execute()
|
||||
void ColumnCommand::execute()
|
||||
{
|
||||
if (fFilterFeeder == LEFT_FEEDER)
|
||||
{
|
||||
values = bpp->fFiltCmdValues[0].get();
|
||||
binaryValues = bpp->fFiltCmdBinaryValues[0].get();
|
||||
}
|
||||
else if (fFilterFeeder == RIGHT_FEEDER)
|
||||
{
|
||||
values = bpp->fFiltCmdValues[1].get();
|
||||
binaryValues = bpp->fFiltCmdBinaryValues[1].get();
|
||||
}
|
||||
else
|
||||
{
|
||||
values = bpp->values;
|
||||
binaryValues = bpp->binaryValues;
|
||||
}
|
||||
|
||||
_execute();
|
||||
}
|
||||
@ -258,7 +266,6 @@ void ColumnCommand::issuePrimitive()
|
||||
|
||||
loadData();
|
||||
|
||||
// cout << "issuing primitive for LBID " << primMsg->LBID << endl;
|
||||
if (!suppressFilter)
|
||||
bpp->pp.setParsedColumnFilter(parsedColumnFilter);
|
||||
else
|
||||
@ -295,7 +302,6 @@ void ColumnCommand::process_OT_BOTH()
|
||||
|
||||
bpp->ridCount = outMsg->NVALS;
|
||||
bpp->ridMap = outMsg->RidFlags;
|
||||
// cout << "rid Count is " << bpp->ridCount << endl;
|
||||
|
||||
/* this is verbose and repetative to minimize the work per row */
|
||||
switch (colType.colWidth)
|
||||
@ -308,24 +314,12 @@ void ColumnCommand::process_OT_BOTH()
|
||||
|
||||
bpp->relRids[i] = *((uint16_t*) &bpp->outputMsg[pos]);
|
||||
pos += 2;
|
||||
// WIP
|
||||
// values[i] is 8 Bytes wide so coping the pointer to bpp->outputMsg[pos] and crossing fingers
|
||||
// I dont know the liveness of bpp->outputMsg but also I dont know if there is other memory area I can use
|
||||
values[i] = (int64_t) &bpp->outputMsg[pos];
|
||||
|
||||
// cout<< "CC: BIN16 " << i << " "
|
||||
// << hex
|
||||
// << *((int64_t*)values[i])
|
||||
// << " "
|
||||
// << *(((int64_t*)values[i]) +1)
|
||||
// << endl;
|
||||
binaryValues[i] = *((int128_t*) &bpp->outputMsg[pos]);
|
||||
pos += 16;
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
|
||||
|
||||
case 8:
|
||||
for (i = 0, pos = sizeof(NewColResultHeader); i < outMsg->NVALS; ++i)
|
||||
{
|
||||
@ -389,28 +383,24 @@ void ColumnCommand::process_OT_RID()
|
||||
memcpy(bpp->relRids, outMsg + 1, outMsg->NVALS << 1);
|
||||
bpp->ridCount = outMsg->NVALS;
|
||||
bpp->ridMap = outMsg->RidFlags;
|
||||
// cout << "rid Count is " << bpp->ridCount << endl;
|
||||
}
|
||||
|
||||
void ColumnCommand::process_OT_DATAVALUE()
|
||||
{
|
||||
bpp->ridCount = outMsg->NVALS;
|
||||
|
||||
// cout << "rid Count is " << bpp->ridCount << endl;
|
||||
switch (colType.colWidth)
|
||||
{
|
||||
case 16:
|
||||
{
|
||||
memcpy(values, outMsg + 1, outMsg->NVALS << 3);
|
||||
memcpy(binaryValues, outMsg + 1, outMsg->NVALS << 4);
|
||||
cout << " CC: first value is " << values[0] << endl;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case 8:
|
||||
{
|
||||
memcpy(values, outMsg + 1, outMsg->NVALS << 3);
|
||||
// cout << " CC: first value is " << values[0] << endl;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -488,8 +478,6 @@ void ColumnCommand::processResult()
|
||||
for (uint64_t i = 0; i < bpp->ridCount; i++)
|
||||
bpp->fFiltCmdRids[1][i] = bpp->relRids[i];
|
||||
}
|
||||
|
||||
// cout << "processed " << outMsg->NVALS << " rows" << endl;
|
||||
}
|
||||
|
||||
void ColumnCommand::createCommand(ByteStream& bs)
|
||||
@ -823,7 +811,7 @@ void ColumnCommand::projectResultRG(RowGroup& rg, uint32_t pos)
|
||||
cout << __FILE__<< ":" <<__LINE__ << " ColumnCommand::projectResultRG " << endl;
|
||||
for (i = 0; i < outMsg->NVALS; ++i, msg8 += gapSize)
|
||||
{
|
||||
r.setBinaryField_offset(msg8, colType.colWidth, offset);
|
||||
r.setBinaryField_offset((int128_t*)msg8, colType.colWidth, offset);
|
||||
r.nextRow(rowSize);
|
||||
}
|
||||
break;
|
||||
|
@ -147,6 +147,7 @@ private:
|
||||
uint16_t filterCount;
|
||||
bool makeAbsRids;
|
||||
int64_t* values; // this is usually bpp->values; RTSCommand needs to use a different container
|
||||
int128_t* binaryValues;
|
||||
|
||||
uint8_t mask, shift; // vars for the selective block loader
|
||||
|
||||
|
@ -174,7 +174,8 @@ Command* FilterCommand::makeFilterCommand(ByteStream& bs, vector<SCommand>& cmds
|
||||
}
|
||||
|
||||
|
||||
FilterCommand::FilterCommand() : Command(FILTER_COMMAND), fBOP(0)
|
||||
FilterCommand::FilterCommand() : Command(FILTER_COMMAND), fBOP(0),
|
||||
hasWideDecimalType(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -247,6 +248,9 @@ void FilterCommand::setColTypes(const execplan::CalpontSystemCatalog::ColType& l
|
||||
{
|
||||
leftColType = left;
|
||||
rightColType = right;
|
||||
|
||||
if (utils::isWideDecimalType(left) || utils::isWideDecimalType(right))
|
||||
hasWideDecimalType = true;
|
||||
}
|
||||
|
||||
|
||||
@ -255,6 +259,13 @@ void FilterCommand::doFilter()
|
||||
bpp->ridMap = 0;
|
||||
bpp->ridCount = 0;
|
||||
|
||||
bool (FilterCommand::*compareFunc)(uint64_t, uint64_t);
|
||||
|
||||
if (hasWideDecimalType)
|
||||
compareFunc = &FilterCommand::binaryCompare;
|
||||
else
|
||||
compareFunc = &FilterCommand::compare;
|
||||
|
||||
// rids in [0] is used for scan [1], so [1] is a subset of [0], and same order.
|
||||
// -- see makeFilterCommand() above.
|
||||
for (uint64_t i = 0, j = 0; j < bpp->fFiltRidCount[1]; )
|
||||
@ -265,9 +276,14 @@ void FilterCommand::doFilter()
|
||||
}
|
||||
else
|
||||
{
|
||||
if (compare(i, j) == true)
|
||||
if ((this->*compareFunc)(i, j) == true)
|
||||
{
|
||||
bpp->relRids[bpp->ridCount] = bpp->fFiltCmdRids[0][i];
|
||||
// WIP MCOL-641 How is bpp->(binary)values used given that
|
||||
// we are setting the relRids?
|
||||
if (utils::isWideDecimalType(leftColType))
|
||||
bpp->binaryValues[bpp->ridCount] = bpp->fFiltCmdBinaryValues[0][i];
|
||||
else
|
||||
bpp->values[bpp->ridCount] = bpp->fFiltCmdValues[0][i];
|
||||
bpp->ridMap |= 1 << (bpp->relRids[bpp->ridCount] >> 10);
|
||||
bpp->ridCount++;
|
||||
@ -321,6 +337,70 @@ bool FilterCommand::compare(uint64_t i, uint64_t j)
|
||||
}
|
||||
}
|
||||
|
||||
bool FilterCommand::binaryCompare(uint64_t i, uint64_t j)
|
||||
{
|
||||
// We type-promote to int128_t if either of the columns are
|
||||
// not int128_t
|
||||
int128_t leftVal, rightVal;
|
||||
|
||||
if (utils::isWideDecimalType(leftColType))
|
||||
{
|
||||
if (execplan::isNull(bpp->fFiltCmdBinaryValues[0][i], leftColType))
|
||||
return false;
|
||||
leftVal = bpp->fFiltCmdBinaryValues[0][i];
|
||||
}
|
||||
else
|
||||
{
|
||||
if (execplan::isNull(bpp->fFiltCmdValues[0][i], leftColType))
|
||||
return false;
|
||||
leftVal = bpp->fFiltCmdValues[0][i];
|
||||
}
|
||||
|
||||
if (utils::isWideDecimalType(rightColType))
|
||||
{
|
||||
if (execplan::isNull(bpp->fFiltCmdBinaryValues[1][j], rightColType))
|
||||
return false;
|
||||
rightVal = bpp->fFiltCmdBinaryValues[1][j];
|
||||
}
|
||||
else
|
||||
{
|
||||
if (execplan::isNull(bpp->fFiltCmdValues[1][j], rightColType))
|
||||
return false;
|
||||
rightVal = bpp->fFiltCmdValues[1][j];
|
||||
}
|
||||
|
||||
switch (fBOP)
|
||||
{
|
||||
case COMPARE_GT:
|
||||
return leftVal > rightVal;
|
||||
break;
|
||||
|
||||
case COMPARE_LT:
|
||||
return leftVal < rightVal;
|
||||
break;
|
||||
|
||||
case COMPARE_EQ:
|
||||
return leftVal == rightVal;
|
||||
break;
|
||||
|
||||
case COMPARE_GE:
|
||||
return leftVal >= rightVal;
|
||||
break;
|
||||
|
||||
case COMPARE_LE:
|
||||
return leftVal <= rightVal;
|
||||
break;
|
||||
|
||||
case COMPARE_NE:
|
||||
return leftVal != rightVal;
|
||||
break;
|
||||
|
||||
default:
|
||||
return false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool FilterCommand::operator==(const FilterCommand& c) const
|
||||
{
|
||||
|
@ -76,9 +76,14 @@ protected:
|
||||
// compare method, take the indices to the values array
|
||||
virtual bool compare(uint64_t, uint64_t);
|
||||
|
||||
// compare method, take the indices to the values array
|
||||
virtual bool binaryCompare(uint64_t, uint64_t);
|
||||
|
||||
// binary operator
|
||||
uint8_t fBOP;
|
||||
|
||||
bool hasWideDecimalType;
|
||||
|
||||
// column type for null check
|
||||
execplan::CalpontSystemCatalog::ColType leftColType;
|
||||
execplan::CalpontSystemCatalog::ColType rightColType;
|
||||
|
@ -79,6 +79,8 @@ void PassThruCommand::project()
|
||||
{
|
||||
case 16:
|
||||
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
|
||||
bpp->serialized->append((uint8_t*) bpp->binaryValues, bpp->ridCount << 4);
|
||||
break;
|
||||
|
||||
case 8:
|
||||
bpp->serialized->append((uint8_t*) bpp->values, bpp->ridCount << 3);
|
||||
@ -121,7 +123,6 @@ void PassThruCommand::projectIntoRowGroup(RowGroup& rg, uint32_t col)
|
||||
case 1:
|
||||
for (i = 0; i < bpp->ridCount; i++)
|
||||
{
|
||||
// cout << "PTC: " << bpp->values[i] << endl;
|
||||
r.setUintField_offset<1>(bpp->values[i], offset);
|
||||
r.nextRow(rowSize);
|
||||
}
|
||||
@ -131,7 +132,6 @@ void PassThruCommand::projectIntoRowGroup(RowGroup& rg, uint32_t col)
|
||||
case 2:
|
||||
for (i = 0; i < bpp->ridCount; i++)
|
||||
{
|
||||
// cout << "PTC: " << bpp->values[i] << endl;
|
||||
r.setUintField_offset<2>(bpp->values[i], offset);
|
||||
r.nextRow(rowSize);
|
||||
}
|
||||
@ -150,7 +150,6 @@ void PassThruCommand::projectIntoRowGroup(RowGroup& rg, uint32_t col)
|
||||
case 8:
|
||||
for (i = 0; i < bpp->ridCount; i++)
|
||||
{
|
||||
// cout << "PTC: " << bpp->values[i] << endl;
|
||||
r.setUintField_offset<8>(bpp->values[i], offset);
|
||||
r.nextRow(rowSize);
|
||||
}
|
||||
@ -160,15 +159,7 @@ void PassThruCommand::projectIntoRowGroup(RowGroup& rg, uint32_t col)
|
||||
cout << __FILE__ << ":" << __LINE__ << " PassThruCommand::projectIntoRowGroup" << " Addition for 16 Bytes" << endl;
|
||||
for (i = 0; i < bpp->ridCount; i++)
|
||||
{
|
||||
cout << "PTC: " << "BIN16 " << i << " "
|
||||
<< hex
|
||||
<< *((int64_t*) bpp->values[i])
|
||||
<< " "
|
||||
<< *(((int64_t*) bpp->values[i]) +1)
|
||||
<< endl;
|
||||
// values[i] is 8 bytes so it contains the pointer to bpp->outputMsg set by ColumnCommand::process_OT_BOTH()
|
||||
r.setBinaryField_offset((uint128_t*)bpp->values[i], 16, offset);
|
||||
|
||||
r.setBinaryField_offset(&bpp->binaryValues[i], 16, offset);
|
||||
r.nextRow(rowSize);
|
||||
}
|
||||
}
|
||||
|
@ -513,7 +513,9 @@ int ServicePrimProc::Child()
|
||||
// do not allow to read beyond the end of an extent
|
||||
const int MaxReadAheadSz = (extentRows) / BLOCK_SIZE;
|
||||
//defaultBufferSize = 512 * 1024; // @bug 2627 - changed default dict buffer from 256K to 512K, allows for cols w/ length of 61.
|
||||
defaultBufferSize = 100 * 1024; // 1/17/12 - made the dict buffer dynamic, max size for a numeric col is 80k + ovrhd
|
||||
// WIP MCOL-641 Check with Patrick on this. Changed it from 100*1024 to 128*1024
|
||||
// to match with BatchPrimitiveProcessor::BUFFER_SIZE
|
||||
defaultBufferSize = 128 * 1024; // 1/17/12 - made the dict buffer dynamic, max size for a numeric col is 80k + ovrhd
|
||||
|
||||
|
||||
// This parm controls whether we rotate through the output sockets
|
||||
|
@ -18,6 +18,7 @@
|
||||
#ifndef UTILS_COLWIDTH_H
|
||||
#define UTILS_COLWIDTH_H
|
||||
|
||||
#include "calpontsystemcatalog.h"
|
||||
#include "branchpred.h"
|
||||
|
||||
namespace utils
|
||||
@ -35,6 +36,13 @@ namespace utils
|
||||
return width <= MAXLEGACYWIDTH;
|
||||
}
|
||||
|
||||
inline bool isWideDecimalType(const execplan::CalpontSystemCatalog::ColType& ct)
|
||||
{
|
||||
return ((ct.colDataType == execplan::CalpontSystemCatalog::DECIMAL ||
|
||||
ct.colDataType == execplan::CalpontSystemCatalog::UDECIMAL) &&
|
||||
ct.colWidth == MAXCOLUMNWIDTH);
|
||||
}
|
||||
|
||||
/** @brief Map a DECIMAL precision to data width in bytes */
|
||||
inline uint8_t widthByPrecision(unsigned p)
|
||||
{
|
||||
|
@ -1147,7 +1147,7 @@ void TupleJoiner::updateCPData(const Row& r)
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (utils::isWide(r.getColumnWidth(colIdx))
|
||||
else if (r.getColumnWidth(colIdx) == utils::MAXCOLUMNWIDTH
|
||||
&& (r.getColType(colIdx) == CalpontSystemCatalog::DECIMAL
|
||||
|| r.getColType(colIdx) == CalpontSystemCatalog::UDECIMAL))
|
||||
{
|
||||
|
@ -236,7 +236,7 @@ ByteStream& ByteStream::operator<<(const uint64_t o)
|
||||
}
|
||||
|
||||
// WIP MCOL-641
|
||||
ByteStream& ByteStream::operator<<(const uint128_t o)
|
||||
ByteStream& ByteStream::operator<<(const uint128_t& o)
|
||||
{
|
||||
if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead))
|
||||
growBuf(fMaxLen + BlockSize);
|
||||
|
@ -148,9 +148,9 @@ public:
|
||||
EXPORT ByteStream& operator<<(const uint64_t o);
|
||||
// WIP MCOL-641
|
||||
/**
|
||||
* push an unsigned __int128 onto the end of the stream. The byte order is whatever the native byte order is.
|
||||
* push an uint128_t onto the end of the stream. The byte order is whatever the native byte order is.
|
||||
*/
|
||||
EXPORT ByteStream& operator<<(const uint128_t o);
|
||||
EXPORT ByteStream& operator<<(const uint128_t& o);
|
||||
/**
|
||||
* push a float onto the end of the stream. The byte order is
|
||||
* whatever the native byte order is.
|
||||
@ -217,7 +217,7 @@ public:
|
||||
EXPORT ByteStream& operator>>(uint64_t& o);
|
||||
// WIP MCOL-641
|
||||
/**
|
||||
* extract an unsigned __int128 from the front of the stream. The byte order is whatever the native byte order is.
|
||||
* extract an uint128_t from the front of the stream. The byte order is whatever the native byte order is.
|
||||
*/
|
||||
EXPORT ByteStream& operator>>(uint128_t& o);
|
||||
/**
|
||||
@ -292,7 +292,7 @@ public:
|
||||
EXPORT void peek(uint64_t& o) const;
|
||||
// WIP MCOL-641
|
||||
/**
|
||||
* Peek at an unsigned __int128 from the front of the stream. The byte order is whatever the native byte order is.
|
||||
* Peek at an uint128_t from the front of the stream. The byte order is whatever the native byte order is.
|
||||
*/
|
||||
EXPORT void peek(uint128_t& o) const;
|
||||
/**
|
||||
|
@ -639,7 +639,7 @@ string Row::toString() const
|
||||
break;
|
||||
case CalpontSystemCatalog::DECIMAL:
|
||||
case CalpontSystemCatalog::UDECIMAL:
|
||||
if (utils::isWide(colWidths[i]))
|
||||
if (colWidths[i] == utils::MAXCOLUMNWIDTH)
|
||||
{
|
||||
unsigned int buflen = precision[i] + 3;
|
||||
char *buf = (char*)alloca(buflen);
|
||||
|
@ -70,6 +70,9 @@ namespace WriteEngine
|
||||
{
|
||||
StopWatch timer;
|
||||
|
||||
using dataconvert::int128_t;
|
||||
using dataconvert::uint128_t;
|
||||
|
||||
/**@brief WriteEngineWrapper Constructor
|
||||
*/
|
||||
WriteEngineWrapper::WriteEngineWrapper() : m_opType(NOOP)
|
||||
@ -219,10 +222,6 @@ void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colSt
|
||||
}
|
||||
}
|
||||
|
||||
// MCOL-641 WIP
|
||||
using int128_t = __int128;
|
||||
using uint128_t = unsigned __int128;
|
||||
|
||||
/*@convertValArray - Convert interface values to internal values
|
||||
*/
|
||||
/***********************************************************
|
||||
|
Reference in New Issue
Block a user