1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-641 Clean up primitives code

Add int128_t support into ByteStream

Fixed UTs broken after collation patch
This commit is contained in:
Roman Nozdrin
2020-07-17 14:56:59 +00:00
parent 638202417f
commit 1588ebe439
23 changed files with 225 additions and 197 deletions

View File

@ -68,6 +68,8 @@ const uint64_t mcs_pow_10[20] =
constexpr uint32_t maxPowOf10 = sizeof(mcs_pow_10)/sizeof(mcs_pow_10[0])-1;
constexpr int128_t Decimal128Null = int128_t(0x8000000000000000LL) << 64;
constexpr int128_t Decimal128Empty = (int128_t(0x8000000000000000LL) << 64) + 1;
/**
@brief The function to produce scale multiplier/divisor for

View File

@ -1023,7 +1023,6 @@ inline bool isUnsigned(const execplan::CalpontSystemCatalog::ColDataType type)
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
// TODO MCOL-641 add decimal here
return true;
default:

View File

@ -60,7 +60,7 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm)
baseRid(0),
ridCount(0),
needStrValues(false),
hasWideDecimalType(false),
wideColumnsWidths(0),
filterCount(0),
projectCount(0),
needRidsAtDelivery(false),
@ -102,7 +102,7 @@ void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, vector<B
filterCount++;
_hasScan = true;
if (utils::isWide(cc->getWidth()))
hasWideDecimalType = true;
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == scan.sessionId());
}
@ -117,9 +117,6 @@ void BatchPrimitiveProcessorJL::addFilterStep(const PseudoColStep& pcs)
cc->setStepUuid(uuid);
filterSteps.push_back(cc);
filterCount++;
// TODO MCOL-641 How do we get to this execution path?
//if (utils::isWide(cc->getWidth()))
// hasWideDecimalType = true;
idbassert(sessionID == pcs.sessionId());
}
@ -135,7 +132,7 @@ void BatchPrimitiveProcessorJL::addFilterStep(const pColStep& step)
filterSteps.push_back(cc);
filterCount++;
if (utils::isWide(cc->getWidth()))
hasWideDecimalType = true;
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == step.sessionId());
}
@ -190,9 +187,6 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PseudoColStep& step)
colWidths.push_back(cc->getWidth());
tupleLength += cc->getWidth();
projectCount++;
// TODO MCOL-641 How do we get to this execution path?
//if (utils::isWide(cc->getWidth()))
// hasWideDecimalType = true;
idbassert(sessionID == step.sessionId());
}
@ -210,7 +204,7 @@ void BatchPrimitiveProcessorJL::addProjectStep(const pColStep& step)
tupleLength += cc->getWidth();
projectCount++;
if (utils::isWide(cc->getWidth()))
hasWideDecimalType = true;
wideColumnsWidths |= cc->getWidth();
idbassert(sessionID == step.sessionId());
}
@ -229,7 +223,7 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step)
projectCount++;
if (utils::isWide(cc->getWidth()))
hasWideDecimalType = true;
wideColumnsWidths |= cc->getWidth();
if (filterCount == 0 && !sendRowGroups)
sendValues = true;
@ -739,7 +733,7 @@ void BatchPrimitiveProcessorJL::deserializeAggregateResult(ByteStream* in,
void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>* out,
bool* validCPData, uint64_t* lbid, __int128* min, __int128* max,
uint32_t* cachedIO, uint32_t* physIO, uint32_t* touchedBlocks, bool* countThis,
uint32_t threadID, bool* hasBinaryColumn, const execplan::CalpontSystemCatalog::ColType& colType) const
uint32_t threadID, bool* hasWideColumn, const execplan::CalpontSystemCatalog::ColType& colType) const
{
uint64_t tmp64;
unsigned __int128 tmp128;
@ -775,21 +769,32 @@ void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>*
{
in >> *lbid;
in >> tmp8;
*hasBinaryColumn = (tmp8 > 8);
if (*hasBinaryColumn)
*hasWideColumn = (tmp8 > utils::MAXLEGACYWIDTH);
if (UNLIKELY(*hasWideColumn))
{
idbassert(colType.colWidth > 8);
in >> tmp128;
*min = tmp128;
in >> tmp128;
*max = tmp128;
idbassert(colType.colWidth > utils::MAXLEGACYWIDTH);
if (LIKELY(datatypes::Decimal::isWideDecimalType(colType)))
{
in >> tmp128;
*min = tmp128;
in >> tmp128;
*max = tmp128;
}
else
{
std::ostringstream oss;
oss << __func__ << " WARNING!!! Not implemented for the data type ";
oss << colType.colDataType << std::endl;
std::cout << oss.str();
idbassert(false);
}
}
else
{
in >> tmp64;
*min = static_cast<__int128>(tmp64);
*min = static_cast<int128_t>(tmp64);
in >> tmp64;
*max = static_cast<__int128>(tmp64);
*max = static_cast<int128_t>(tmp64);
}
}
else
@ -1010,11 +1015,14 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const
if (sendTupleJoinRowGroupData)
flags |= JOIN_ROWGROUP_DATA;
if (hasWideDecimalType)
flags |= HAS_WIDE_DECIMAL;
if (wideColumnsWidths)
flags |= HAS_WIDE_COLUMNS;
bs << flags;
if (wideColumnsWidths)
bs << wideColumnsWidths;
bs << bop;
bs << (uint8_t) (forHJ ? 1 : 0);

View File

@ -288,7 +288,7 @@ private:
uint16_t ridCount;
bool needStrValues;
bool hasWideDecimalType;
uint16_t wideColumnsWidths;
std::vector<SCommand> filterSteps;
std::vector<SCommand> projectSteps;

View File

@ -174,6 +174,7 @@ private:
// create two columns RG. 1st is the sorting key, second is the data column
std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
std::vector<uint32_t> charSetNumVec;
std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
offsets.push_back(2); offsets.push_back(10); offsets.push_back(18);
roids.push_back(oid); roids.push_back(oid);
@ -182,11 +183,13 @@ private:
types.push_back(execplan::CalpontSystemCatalog::UBIGINT);
cscale.push_back(0); cscale.push_back(0);
cprecision.push_back(20); cprecision.push_back(20);
charSetNumVec.push_back(8); charSetNumVec.push_back(8);
rowgroup::RowGroup inRG(2, //column count
offsets, //oldOffset
roids, // column oids
tkeys, //keys
types, // types
charSetNumVec, // charset numbers
cscale, //scale
cprecision, // precision
20, // sTableThreshold

View File

@ -198,7 +198,7 @@ const uint16_t HAS_JOINER = 0x10; //16;
const uint16_t SEND_RIDS_AT_DELIVERY = 0x20; //32;
const uint16_t HAS_ROWGROUP = 0x40; //64;
const uint16_t JOIN_ROWGROUP_DATA = 0x80; //128
const uint16_t HAS_WIDE_DECIMAL = 0x100; //256;
const uint16_t HAS_WIDE_COLUMNS = 0x100; //256;
//TODO: put this in a namespace to stop global ns pollution
enum PrimFlags
@ -705,8 +705,8 @@ struct NewColResultHeader
uint16_t NVALS;
uint16_t ValidMinMax; // 1 if Min/Max are valid, otherwise 0
uint32_t OutputType;
__int128 Min; // Minimum value in this block for signed data types
__int128 Max; // Maximum value in this block for signed data types
int128_t Min; // Minimum value in this block for signed data types
int128_t Max; // Maximum value in this block for signed data types
uint32_t CacheIO; // I/O count from buffer cache
uint32_t PhysicalIO; // Physical I/O count from disk
// if OutputType was OT_DATAVALUE, what follows is DataType[NVALS]

View File

@ -16,3 +16,9 @@ target_link_libraries(processor ${NETSNMP_LIBRARIES})
INSTALL (TARGETS processor DESTINATION ${ENGINE_LIBDIR})
#if (WITH_PP_SCAN_UT)
# add_executable(pp_scan_unittest pp-scan-unittest.cpp)
# target_link_libraries(pp_scan_unittest ${ENGINE_LDFLAGS} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit)
# install(TARGETS pp_scan_unittest DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-engine)
#endif()

View File

@ -1,4 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016-2020 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -40,7 +41,6 @@ using namespace boost;
#include "stats.h"
#include "primproc.h"
#include "dataconvert.h"
#include "widedecimalutils.h"
#include "mcs_decimal.h"
using namespace logging;
using namespace dbbc;
@ -278,18 +278,17 @@ inline bool isEmptyVal(uint8_t type, const uint8_t* val8);
template<>
inline bool isEmptyVal<32>(uint8_t type, const uint8_t* ival) // For BINARY
{
const uint64_t* val = reinterpret_cast<const uint64_t*>(ival);
return ((val[0] == joblist::BINARYEMPTYVALUELOW)
&& (val[1] == joblist::BINARYNULLVALUELOW)
&& (val[2] == joblist::BINARYNULLVALUELOW)
&& (val[3] == joblist::BINARYEMPTYVALUEHIGH));
std::cout << __func__ << " WARNING!!! Not implemented for 32 byte data types." << std::endl;
return false;
}
template<>
inline bool isEmptyVal<16>(uint8_t type, const uint8_t* ival) // For BINARY
{
const int128_t* val = reinterpret_cast<const int128_t*>(ival);
return utils::isWideDecimalEmptyValue (*val);
// Wide-DECIMAL supplies a universal NULL/EMPTY magics for all 16 byte
// data types.
return *val == datatypes::Decimal128Empty;
}
template<>
@ -413,22 +412,22 @@ inline bool isEmptyVal<1>(uint8_t type, const uint8_t* ival)
template<int>
inline bool isNullVal(uint8_t type, const uint8_t* val8);
// WIP This method only works for wide DECIMAL so far.
template<>
inline bool isNullVal<16>(uint8_t type, const uint8_t* ival) // For BINARY
inline bool isNullVal<16>(uint8_t type, const uint8_t* ival)
{
const int128_t* val = reinterpret_cast<const int128_t*>(ival);
return utils::isWideDecimalNullValue (*val);
// Wide-DECIMAL supplies a universal NULL/EMPTY magics for all 16 byte
// data types.
return *val == datatypes::Decimal128Null;
}
template<>
inline bool isNullVal<32>(uint8_t type, const uint8_t* ival) // For BINARY
inline bool isNullVal<32>(uint8_t type, const uint8_t* ival)
{
const uint64_t* val = reinterpret_cast<const uint64_t*>(ival);
return ((val[0] == joblist::BINARYNULLVALUELOW)
&& (val[1] == joblist::BINARYNULLVALUELOW)
&& (val[2] == joblist::BINARYNULLVALUELOW)
&& (val[3] == joblist::BINARYNULLVALUEHIGH));
std::cout << __func__ << " WARNING!!! Not implemented for 32 byte data types."
<< std::endl;
return false;
}
template<>
@ -558,9 +557,6 @@ inline bool isNullVal(uint32_t length, uint8_t type, const uint8_t* val8)
{
switch (length)
{
case 32:
return isNullVal<32>(type, val8);
case 16:
return isNullVal<16>(type, val8);
@ -576,6 +572,7 @@ inline bool isNullVal(uint32_t length, uint8_t type, const uint8_t* val8)
case 1:
return isNullVal<1>(type, val8);
};
std::cout << __func__ << " WARNING!!! Not implemented for " << length << " bytes data types." << std::endl;
return false;
}
@ -619,7 +616,7 @@ inline bool isMinMaxValid(const NewColRequestHeader* in)
case CalpontSystemCatalog::DECIMAL:
case CalpontSystemCatalog::UDECIMAL:
return (in->DataSize <= 16 );
return (in->DataSize <= datatypes::MAXDECIMALWIDTH);
default:
return false;
@ -703,7 +700,7 @@ inline bool colCompare(int128_t val1, int128_t val2, uint8_t COP, uint8_t rf, in
inline bool colCompareUnsigned(uint64_t val1, uint64_t val2, uint8_t COP, uint8_t rf, int type, uint8_t width, const idb_regex_t& regex, bool isNull = false)
{
// cout << "comparing unsigned" << hex << val1 << " to " << val2 << endl;
// cout << "comparing unsigned" << hex << val1 << " to " << val2 << endl;
if (COMPARE_NIL == COP) return false;
@ -758,8 +755,7 @@ inline void store(const NewColRequestHeader* in,
switch (in->DataSize)
{
case 32:
ptr2 += (rid << 5);
memcpy(ptr1, ptr2, 32);
std::cout << __func__ << " WARNING!!! Not implemented for 32 byte data types." << std::endl;
break;
case 16:
@ -768,6 +764,7 @@ inline void store(const NewColRequestHeader* in,
break;
default:
std::cout << __func__ << " WARNING!!! unspecified column width." << std::endl;
case 8:
ptr2 += (rid << 3);
memcpy(ptr1, ptr2, 8);
@ -904,8 +901,6 @@ inline uint8_t* nextBinColValue(int type,
}
else
{
//FIXME: not complete nor tested . How make execution flow pass here
// whe is ridArray not NULL ? fidn by id? how?
while (*index < NVALS &&
isEmptyVal<W>(type, &val8[ridArray[*index] * W]))
{
@ -920,20 +915,22 @@ inline uint8_t* nextBinColValue(int type,
*rid = ridArray[(*index)++];
}
*isNull = isNullVal<W>(type, &val8[*rid * W]);
*isEmpty = isEmptyVal<W>(type, &val8[*rid * W]);
//cout << "nextUnsignedColValue index " << *index << " rowid " << *rid << endl;
uint32_t curValueOffset = *rid * W;
*isNull = isNullVal<W>(type, &val8[curValueOffset]);
*isEmpty = isEmptyVal<W>(type, &val8[curValueOffset]);
//cout << "nextColBinValue " << *index << " rowid " << *rid << endl;
// at this point, nextRid is the index to return, and index is...
// if RIDs are not specified, nextRid + 1,
// if RIDs are specified, it's the next index in the rid array.
return &val8[*rid * W];
return &val8[curValueOffset];
#ifdef PRIM_DEBUG
throw logic_error("PrimitiveProcessor::nextColBinValue() bad width");
#endif
return NULL;
}
}
template<int W>
inline int64_t nextColValue(int type,
@ -1549,8 +1546,11 @@ inline void p_Col_ridArray(NewColRequestHeader* in,
#endif
}
// for BINARY
template<int W>
// There are number of hardcoded type-dependant objects
// that effectively makes this template int128-based only.
// Use type based template method for Min,Max values.
// prestored_set_128 must be a template with a type arg.
template<int W, typename T>
inline void p_Col_bin_ridArray(NewColRequestHeader* in,
NewColResultHeader* out,
unsigned outSize,
@ -1563,11 +1563,6 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
idb_regex_t placeholderRegex;
placeholderRegex.used = false;
//FIXME: pCol is setting it to 8192 cause logicalBlockMode is true
/*if(itemsPerBlk == BLOCK_SIZE){
itemsPerBlk = BLOCK_SIZE/W;
}*/
if (in->NVALS > 0)
ridArray = reinterpret_cast<uint16_t*>(&in8[sizeof(NewColRequestHeader) +
(in->NOPS * filterSize)]);
@ -1590,6 +1585,7 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
if (out->ValidMinMax)
{
// Assume that isUnsigned returns true for 8-bytes DTs only
if (isUnsigned((CalpontSystemCatalog::ColDataType)in->DataType))
{
out->Min = -1;
@ -1597,8 +1593,8 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
}
else
{
utils::int128Max(out->Min);
utils::int128Min(out->Max);
out->Min = datatypes::Decimal::maxInt128;
out->Max = datatypes::Decimal::minInt128;
}
}
else
@ -1625,7 +1621,7 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
scoped_array<idb_regex_t> std_regex;
idb_regex_t* regex = NULL;
// no pre-parsed column filter is set, parse the filter in the message
// no pre-parsed column filter is set, parse the filter in the message
if (parsedColumnFilter.get() == NULL) {
std_regex.reset(new idb_regex_t[in->NOPS]);
regex = &(std_regex[0]);
@ -1643,7 +1639,6 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
regex[argIndex].used = false;
}
}
// WIP MCOL-641
// we have a pre-parsed filter, and it's in the form of op and value arrays
else if (parsedColumnFilter->columnFilterMode == TWO_ARRAYS)
{
@ -1658,11 +1653,11 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
bval = (binWtype*)nextBinColValue<W>(in->DataType, ridArray, in->NVALS, &nextRidIndex, &done, &isNull,
&isEmpty, &rid, in->OutputType, reinterpret_cast<uint8_t*>(block), itemsPerBlk);
int128_t val;
T val;
while (!done)
{
val = *reinterpret_cast<int128_t*>(bval);
val = *reinterpret_cast<T*>(bval);
if (cops == NULL) // implies parsedColumnFilter && columnFilterMode == SET
{
@ -1695,8 +1690,7 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
{
for (argIndex = 0; argIndex < in->NOPS; argIndex++)
{
// WIP MCOL-641
int128_t filterVal = *reinterpret_cast<int128_t*>(argVals[argIndex]);
T filterVal = *reinterpret_cast<T*>(argVals[argIndex]);
cmp = colCompare(val, filterVal, cops[argIndex],
rfs[argIndex], in->DataType, W, isNull);
@ -1731,20 +1725,13 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
{
if (in->DataType == CalpontSystemCatalog::CHAR || in->DataType == CalpontSystemCatalog::VARCHAR)
{
// !!! colCompare is overloaded with int128_t only yet.
if (colCompare(out->Min, val, COMPARE_GT, false, in->DataType, W, placeholderRegex))
out->Min = val;
if (colCompare(out->Max, val, COMPARE_LT, false, in->DataType, W, placeholderRegex))
out->Max = val;
}
else if (isUnsigned((CalpontSystemCatalog::ColDataType)in->DataType))
{
if (static_cast<uint128_t>(out->Min) > static_cast<uint128_t>(val))
out->Min = val;
if (static_cast<uint128_t>(out->Max) < static_cast<uint128_t>(val))
out->Max = val;
}
else
{
if (out->Min > val)
@ -1767,6 +1754,7 @@ inline void p_Col_bin_ridArray(NewColRequestHeader* in,
#else
fStatsPtr->markEvent(in->LBID, pthread_self(), in->hdr.SessionID, 'K');
#endif
}
} //namespace anon
@ -1818,14 +1806,6 @@ void PrimitiveProcessor::p_Col(NewColRequestHeader* in, NewColResultHeader* out,
switch (in->DataSize)
{
case 32:
p_Col_bin_ridArray<32>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
case 16:
p_Col_bin_ridArray<16>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
case 8:
p_Col_ridArray<8>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
@ -1842,6 +1822,13 @@ void PrimitiveProcessor::p_Col(NewColRequestHeader* in, NewColResultHeader* out,
p_Col_ridArray<1>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
case 16:
p_Col_bin_ridArray<16, int128_t>(in, out, outSize, written, block, fStatsPtr, itemsPerBlk, parsedColumnFilter);
break;
case 32:
std::cout << __func__ << " WARNING!!! Not implemented for 32 byte data types." << std::endl;
default:
idbassert(0);
break;
@ -1932,8 +1919,6 @@ boost::shared_ptr<ParsedColumnFilter> parseColumnFilter
case 8:
ret->prestored_argVals[argIndex] = *reinterpret_cast<const uint64_t*>(args->val);
break;
case 16:
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
}
}
else

View File

@ -329,6 +329,7 @@ private:
int dict_OffsetIndex, currentOffsetIndex; // used by p_dictionary
int fDebugLevel;
dbbc::Stats* fStatsPtr; // pointer for pmstats
// To be removed b/c always true
bool logicalBlockMode;
boost::shared_ptr<ParsedColumnFilter> parsedColumnFilter;

View File

@ -1,5 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016, 2017 MariaDB Corporation
Copyright (C) 2016-2020 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
@ -101,7 +101,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
baseRid(0),
ridCount(0),
needStrValues(false),
hasWideDecimalType(false),
wideColumnsWidths(0),
filterCount(0),
projectCount(0),
sendRidsAtDelivery(false),
@ -113,7 +113,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
minVal(MAX64),
maxVal(MIN64),
lbidForCP(0),
hasBinaryColumn(false),
hasWideColumnOut(false),
busyLoaderCount(0),
physIO(0),
cachedIO(0),
@ -147,7 +147,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
baseRid(0),
ridCount(0),
needStrValues(false),
hasWideDecimalType(false),
wideColumnsWidths(0),
filterCount(0),
projectCount(0),
sendRidsAtDelivery(false),
@ -159,7 +159,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
minVal(MAX64),
maxVal(MIN64),
lbidForCP(0),
hasBinaryColumn(false),
hasWideColumnOut(false),
busyLoaderCount(0),
physIO(0),
cachedIO(0),
@ -242,13 +242,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
doJoin = tmp16 & HAS_JOINER;
hasRowGroup = tmp16 & HAS_ROWGROUP;
getTupleJoinRowGroupData = tmp16 & JOIN_ROWGROUP_DATA;
hasWideDecimalType = tmp16 & HAS_WIDE_DECIMAL;
bool hasWideColumnsIn = tmp16 & HAS_WIDE_COLUMNS;
// This used to signify that there was input row data from previous jobsteps, and
// it never quite worked right. No need to fix it or update it; all BPP's have started
// with a scan for years. Took it out.
assert(!hasRowGroup);
if (hasWideColumnsIn)
bs >> wideColumnsWidths;
bs >> bop;
bs >> forHJ;
@ -581,7 +584,6 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
/* skip the header */
bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t));
// TODO MCOL-641
bs >> count;
bs >> startPos;
@ -1024,7 +1026,7 @@ void BatchPrimitiveProcessor::initProcessor()
fFiltRidCount[i] = 0;
fFiltCmdRids[i].reset(new uint16_t[LOGICAL_BLOCK_RIDS]);
fFiltCmdValues[i].reset(new int64_t[LOGICAL_BLOCK_RIDS]);
if (hasWideDecimalType)
if (wideColumnsWidths | datatypes::MAXDECIMALWIDTH)
fFiltCmdBinaryValues[i].reset(new int128_t[LOGICAL_BLOCK_RIDS]);
if (filtOnString) fFiltStrValues[i].reset(new string[LOGICAL_BLOCK_RIDS]);
@ -1092,15 +1094,15 @@ void BatchPrimitiveProcessor::initProcessor()
fAggregator->setInputOutput(fe2 ? fe2Output : outputRG, &fAggregateRG);
}
if (!hasBinaryColumn)
if (LIKELY(!hasWideColumnOut))
{
minVal = MAX64;
maxVal = MIN64;
}
else
{
utils::int128Min(bigMaxVal);
utils::int128Max(bigMinVal);
max128Val = datatypes::Decimal::minInt128;
min128Val = datatypes::Decimal::maxInt128;
}
// @bug 1269, initialize data used by execute() for async loading blocks
@ -1546,11 +1548,6 @@ void BatchPrimitiveProcessor::execute()
projectSteps[j]->projectIntoRowGroup(fe1Input, projectForFE1[j]);
for (j = 0; j < ridCount; j++, fe1In.nextRow())
// TODO MCOL-641
// WHERE clause on a numeric and a non-numeric column
// leads to this execution path:
// SELECT a, b from t1 where a!=b
// Here, a is e.g., decimal(38), b is varchar(15)
if (fe1->evaluate(&fe1In))
{
applyMapping(fe1ToProjection, fe1In, &fe1Out);
@ -1993,15 +1990,16 @@ void BatchPrimitiveProcessor::writeProjectionPreamble()
{
*serialized << (uint8_t) 1;
*serialized << lbidForCP;
if (hasBinaryColumn)
if (UNLIKELY(hasWideColumnOut))
{
*serialized << (uint8_t) 16; // width of min/max value
*serialized << (unsigned __int128) bigMinVal;
*serialized << (unsigned __int128) bigMaxVal;
// PSA width
*serialized << (uint8_t) wideColumnWidthOut;
*serialized << min128Val;
*serialized << max128Val;
}
else
{
*serialized << (uint8_t) 8; // width of min/max value
*serialized << (uint8_t) utils::MAXLEGACYWIDTH; // width of min/max value
*serialized << (uint64_t) minVal;
*serialized << (uint64_t) maxVal;
}
@ -2093,15 +2091,19 @@ void BatchPrimitiveProcessor::makeResponse()
{
*serialized << (uint8_t) 1;
*serialized << lbidForCP;
if (hasBinaryColumn)
if (UNLIKELY(hasWideColumnOut))
{
*serialized << (uint8_t) 16; // width of min/max value
*serialized << (unsigned __int128) bigMinVal;
*serialized << (unsigned __int128) bigMaxVal;
// PSA width
// Remove the assert for >16 bytes DTs.
assert(wideColumnWidthOut == datatypes::MAXDECIMALWIDTH);
*serialized << (uint8_t) wideColumnWidthOut;
*serialized << min128Val;
*serialized << max128Val;
}
else
{
*serialized << (uint8_t) 8; // width of min/max value
*serialized << (uint8_t) utils::MAXLEGACYWIDTH; // width of min/max value
*serialized << (uint64_t) minVal;
*serialized << (uint64_t) maxVal;
}
@ -2207,16 +2209,18 @@ int BatchPrimitiveProcessor::operator()()
}
allocLargeBuffers();
if (!hasBinaryColumn)
if (LIKELY(!hasWideColumnOut))
{
minVal = MAX64;
maxVal = MIN64;
}
else
{
utils::int128Min(bigMaxVal);
utils::int128Max(bigMinVal);
max128Val = datatypes::Decimal::minInt128;
min128Val = datatypes::Decimal::maxInt128;
}
validCPData = false;
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute");
@ -2351,7 +2355,7 @@ SBPP BatchPrimitiveProcessor::duplicate()
bpp->stepID = stepID;
bpp->uniqueID = uniqueID;
bpp->needStrValues = needStrValues;
bpp->hasWideDecimalType = hasWideDecimalType;
bpp->wideColumnsWidths = wideColumnsWidths;
bpp->gotAbsRids = gotAbsRids;
bpp->gotValues = gotValues;
bpp->LBIDTrace = LBIDTrace;

View File

@ -52,6 +52,7 @@
#include "rowaggregation.h"
#include "funcexpwrapper.h"
#include "bppsendthread.h"
#include "columnwidth.h"
namespace primitiveprocessor
{
@ -205,16 +206,15 @@ private:
uint16_t relRids[LOGICAL_BLOCK_RIDS];
int64_t values[LOGICAL_BLOCK_RIDS];
int128_t binaryValues[LOGICAL_BLOCK_RIDS];
int128_t wide128Values[LOGICAL_BLOCK_RIDS];
boost::scoped_array<uint64_t> absRids;
boost::scoped_array<std::string> strValues;
uint16_t ridCount;
bool needStrValues;
bool hasWideDecimalType;
uint16_t wideColumnsWidths;
/* Common space for primitive data */
static const uint32_t BUFFER_SIZE = 131072;
uint8_t blockData[BLOCK_SIZE * 16];
uint8_t blockData[BLOCK_SIZE * utils::MAXCOLUMNWIDTH];
boost::scoped_array<uint8_t> outputMsg;
uint32_t outMsgSize;
@ -233,18 +233,18 @@ private:
// CP data from a scanned column
union
{
__int128 bigMinVal;
int128_t min128Val;
int64_t minVal;
};
union
{
__int128 bigMaxVal;
int128_t max128Val;
int64_t maxVal;
};
uint64_t lbidForCP;
// MCOL-641
bool hasBinaryColumn;
bool hasWideColumnOut;
uint8_t wideColumnWidthOut;
// IO counters
boost::mutex counterLock;
uint32_t busyLoaderCount;

View File

@ -40,7 +40,6 @@ using namespace std;
#include "primitiveserver.h"
#include "primproc.h"
#include "stats.h"
#include "widedecimalutils.h"
using namespace messageqcpp;
using namespace rowgroup;
@ -94,17 +93,17 @@ void ColumnCommand::execute()
if (fFilterFeeder == LEFT_FEEDER)
{
values = bpp->fFiltCmdValues[0].get();
binaryValues = bpp->fFiltCmdBinaryValues[0].get();
wide128Values = bpp->fFiltCmdBinaryValues[0].get();
}
else if (fFilterFeeder == RIGHT_FEEDER)
{
values = bpp->fFiltCmdValues[1].get();
binaryValues = bpp->fFiltCmdBinaryValues[1].get();
wide128Values = bpp->fFiltCmdBinaryValues[1].get();
}
else
{
values = bpp->values;
binaryValues = bpp->binaryValues;
wide128Values = bpp->wide128Values;
}
_execute();
@ -153,9 +152,10 @@ void ColumnCommand::loadData()
bool lastBlockReached = false;
oidLastLbid = getLastLbid();
uint32_t blocksToLoad = 0;
// WIP MCOL-641
BRM::LBID_t* lbids = (BRM::LBID_t*) alloca(16 * sizeof(BRM::LBID_t));
uint8_t** blockPtrs = (uint8_t**) alloca(16 * sizeof(uint8_t*));
// The number of elements allocated equals to the number of
// iteratations of the first loop here.
BRM::LBID_t* lbids = (BRM::LBID_t*) alloca(colType.colWidth * sizeof(BRM::LBID_t));
uint8_t** blockPtrs = (uint8_t**) alloca(colType.colWidth * sizeof(uint8_t*));
int i;
@ -282,11 +282,24 @@ void ColumnCommand::issuePrimitive()
//if (wasVersioned && outMsg->ValidMinMax)
// cout << "CC: versioning overriding min max data\n";
bpp->lbidForCP = lbid;
if (primMsg->DataSize > 8)
if (UNLIKELY(utils::isWide(colType.colWidth)))
{
bpp->hasBinaryColumn = true;
bpp->bigMaxVal = outMsg->Max;
bpp->bigMinVal = outMsg->Min;
if (datatypes::Decimal::isWideDecimalType(colType))
{
bpp->hasWideColumnOut = true;
// colWidth is int32 and wideColumnWidthOut's
// value is expected to be at most uint8.
bpp->wideColumnWidthOut = colType.colWidth;
bpp->max128Val = outMsg->Max;
bpp->min128Val = outMsg->Min;
}
else
{
ostringstream os;
os << " WARNING!!! Not implemented for ";
os << primMsg->DataSize << " column.";
throw PrimitiveColumnProjectResultExcept(os.str());
}
}
else
{
@ -315,7 +328,7 @@ void ColumnCommand::process_OT_BOTH()
bpp->relRids[i] = *((uint16_t*) &bpp->outputMsg[pos]);
pos += 2;
binaryValues[i] = *((int128_t*) &bpp->outputMsg[pos]);
wide128Values[i] = *((int128_t*) &bpp->outputMsg[pos]);
pos += 16;
}
@ -394,8 +407,7 @@ void ColumnCommand::process_OT_DATAVALUE()
{
case 16:
{
memcpy(binaryValues, outMsg + 1, outMsg->NVALS << 4);
cout << " CC: first value is " << values[0] << endl;
memcpy(wide128Values, outMsg + 1, outMsg->NVALS << 4);
break;
}
@ -586,6 +598,8 @@ void ColumnCommand::prep(int8_t outputType, bool absRids)
// JFYI This switch results are used by index scan code that is unused
// as of 1.5
switch (colType.colWidth)
{
case 1:
@ -608,8 +622,6 @@ void ColumnCommand::prep(int8_t outputType, bool absRids)
mask = 0x01;
break;
case 16:
// WIP MCOL-641
cout << __FILE__<< ":" <<__LINE__ << " Set shift and mask for 16 Bytes"<< endl;
shift = 1;
mask = 0x01;
break;
@ -809,7 +821,6 @@ void ColumnCommand::projectResultRG(RowGroup& rg, uint32_t pos)
}
case 16:
{
cout << __FILE__<< ":" <<__LINE__ << " ColumnCommand::projectResultRG " << endl;
for (i = 0; i < outMsg->NVALS; ++i, msg8 += gapSize)
{
r.setBinaryField_offset((int128_t*)msg8, colType.colWidth, offset);

View File

@ -147,7 +147,7 @@ private:
uint16_t filterCount;
bool makeAbsRids;
int64_t* values; // this is usually bpp->values; RTSCommand needs to use a different container
int128_t* binaryValues;
int128_t* wide128Values;
uint8_t mask, shift; // vars for the selective block loader

View File

@ -176,7 +176,7 @@ Command* FilterCommand::makeFilterCommand(ByteStream& bs, vector<SCommand>& cmds
FilterCommand::FilterCommand() : Command(FILTER_COMMAND), fBOP(0),
hasWideDecimalType(false)
hasWideColumns(false)
{
}
@ -251,7 +251,7 @@ void FilterCommand::setColTypes(const execplan::CalpontSystemCatalog::ColType& l
rightColType = right;
if (datatypes::Decimal::isWideDecimalType(left) || datatypes::Decimal::isWideDecimalType(right))
hasWideDecimalType = true;
hasWideColumns = true;
}
@ -262,7 +262,7 @@ void FilterCommand::doFilter()
bool (FilterCommand::*compareFunc)(uint64_t, uint64_t);
if (hasWideDecimalType)
if (hasWideColumns)
compareFunc = &FilterCommand::binaryCompare;
else
compareFunc = &FilterCommand::compare;
@ -280,10 +280,8 @@ void FilterCommand::doFilter()
if ((this->*compareFunc)(i, j) == true)
{
bpp->relRids[bpp->ridCount] = bpp->fFiltCmdRids[0][i];
// WIP MCOL-641 How is bpp->(binary)values used given that
// we are setting the relRids?
if (datatypes::Decimal::isWideDecimalType(leftColType))
bpp->binaryValues[bpp->ridCount] = bpp->fFiltCmdBinaryValues[0][i];
bpp->wide128Values[bpp->ridCount] = bpp->fFiltCmdBinaryValues[0][i];
else
bpp->values[bpp->ridCount] = bpp->fFiltCmdValues[0][i];
bpp->ridMap |= 1 << (bpp->relRids[bpp->ridCount] >> 10);

View File

@ -82,7 +82,7 @@ protected:
// binary operator
uint8_t fBOP;
bool hasWideDecimalType;
bool hasWideColumns;
// column type for null check
execplan::CalpontSystemCatalog::ColType leftColType;

View File

@ -78,8 +78,7 @@ void PassThruCommand::project()
switch (colWidth)
{
case 16:
cout << __FILE__<< ":" <<__LINE__ << " Fix for 16 Bytes ?" << endl;
bpp->serialized->append((uint8_t*) bpp->binaryValues, bpp->ridCount << 4);
bpp->serialized->append((uint8_t*) bpp->wide128Values, bpp->ridCount << 4);
break;
case 8:
@ -156,10 +155,9 @@ void PassThruCommand::projectIntoRowGroup(RowGroup& rg, uint32_t col)
break;
case 16:
cout << __FILE__ << ":" << __LINE__ << " PassThruCommand::projectIntoRowGroup" << " Addition for 16 Bytes" << endl;
for (i = 0; i < bpp->ridCount; i++)
{
r.setBinaryField_offset(&bpp->binaryValues[i], 16, offset);
r.setBinaryField_offset(&bpp->wide128Values[i], 16, offset);
r.nextRow(rowSize);
}
}

View File

@ -512,10 +512,9 @@ int ServicePrimProc::Child()
// set to smallest extent size
// do not allow to read beyond the end of an extent
const int MaxReadAheadSz = (extentRows) / BLOCK_SIZE;
//defaultBufferSize = 512 * 1024; // @bug 2627 - changed default dict buffer from 256K to 512K, allows for cols w/ length of 61.
// WIP MCOL-641 Check with Patrick on this. Changed it from 100*1024 to 128*1024
// to match with BatchPrimitiveProcessor::BUFFER_SIZE
defaultBufferSize = 128 * 1024; // 1/17/12 - made the dict buffer dynamic, max size for a numeric col is 80k + ovrhd
// Max size for a primitive message column buffer. Must be big enough
// to accomodate 8192 values of the widest non-dict column + 8192 RIDs + ohead.
defaultBufferSize = 150 * 1024;
// This parm controls whether we rotate through the output sockets

View File

@ -141,6 +141,7 @@ private:
std::cout << std::endl << "------------------------------------------------------------" << std::endl;
uint32_t oid =3001;
std::vector<uint32_t> offsets, roids, tkeys, cscale, cprecision;
std::vector<uint32_t> charSetNumVec;
std::vector<execplan::CalpontSystemCatalog::ColDataType> types;
offsets.push_back(2); offsets.push_back(2+width);
roids.push_back(oid);
@ -148,11 +149,13 @@ private:
types.push_back(cscDt);
cscale.push_back(0);
cprecision.push_back(precision);
charSetNumVec.push_back(8);
rowgroup::RowGroup inRG(1, //column count
offsets, //oldOffset
roids, // column oids
tkeys, //keys
types, // types
charSetNumVec, // charset numbers
cscale, //scale
cprecision, // precision
20, // sTableThreshold

View File

@ -43,6 +43,7 @@ protected:
std::vector<decltype(precision)> precisionVec;
std::vector<uint32_t> roids, tkeys, cscale;
std::vector<uint32_t> widthVec;
std::vector<uint32_t> charSetNumVec;
types.push_back(execplan::CalpontSystemCatalog::DECIMAL);
types.push_back(execplan::CalpontSystemCatalog::UDECIMAL);
@ -68,6 +69,7 @@ protected:
roids.push_back(oid + i);
tkeys.push_back(i + 1);
cscale.push_back(0);
charSetNumVec.push_back(8);
}
rowgroup::RowGroup inRG(roids.size(), // column count
@ -75,6 +77,7 @@ protected:
roids, // column oids
tkeys, // keys
types, // types
charSetNumVec, // charset numbers
cscale, // scale
precisionVec, // precision
20, // sTableThreshold

View File

@ -23,6 +23,7 @@
namespace utils
{
const uint8_t MAXLEGACYWIDTH = 8ULL;
const uint8_t MAXCOLUMNWIDTH = 16ULL;
inline bool isWide(uint8_t width)
{

View File

@ -235,18 +235,6 @@ ByteStream& ByteStream::operator<<(const uint64_t o)
return *this;
}
// WIP MCOL-641
ByteStream& ByteStream::operator<<(const int128_t& o)
{
if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead))
growBuf(fMaxLen + BlockSize);
*((int128_t*) fCurInPtr) = o;
fCurInPtr += 16;
return *this;
}
ByteStream& ByteStream::operator<<(const uint128_t& o)
{
if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead))
@ -258,6 +246,17 @@ ByteStream& ByteStream::operator<<(const uint128_t& o)
return *this;
}
ByteStream& ByteStream::operator<<(const int128_t& o)
{
if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead))
growBuf(fMaxLen + BlockSize);
*((int128_t*) fCurInPtr) = o;
fCurInPtr += 16;
return *this;
}
ByteStream& ByteStream::operator<<(const string& s)
{
int32_t len = s.size();
@ -342,15 +341,14 @@ ByteStream& ByteStream::operator>>(uint64_t& o)
return *this;
}
// WIP MCOL-641
ByteStream& ByteStream::operator>>(int128_t& o)
ByteStream& ByteStream::operator>>(uint128_t& o)
{
peek(o);
fCurOutPtr += 16;
return *this;
}
ByteStream& ByteStream::operator>>(uint128_t& o)
ByteStream& ByteStream::operator>>(int128_t& o)
{
peek(o);
fCurOutPtr += 16;
@ -437,16 +435,6 @@ void ByteStream::peek(uint64_t& o) const
o = *((uint64_t*) fCurOutPtr);
}
// WIP MCOL-641
void ByteStream::peek(int128_t& o) const
{
if (length() < 16)
throw underflow_error("ByteStream>int128_t: not enough data in stream to fill datatype");
o = *((int128_t*) fCurOutPtr);
}
void ByteStream::peek(uint128_t& o) const
{
@ -456,6 +444,15 @@ void ByteStream::peek(uint128_t& o) const
o = *((uint128_t*) fCurOutPtr);
}
void ByteStream::peek(int128_t& o) const
{
if (length() < 16)
throw underflow_error("ByteStream>int128_t: not enough data in stream to fill datatype");
o = *((int128_t*) fCurOutPtr);
}
void ByteStream::peek(string& s) const
{
int32_t len;

View File

@ -47,6 +47,7 @@ class ByteStreamTestSuite;
using int128_t = __int128;
using uint128_t = unsigned __int128;
using int128_t = __int128;
namespace messageqcpp
{
@ -77,7 +78,7 @@ public:
typedef uint16_t doublebyte;
typedef uint32_t quadbyte;
typedef uint64_t octbyte;
typedef uint128_t hexbyte;
typedef int128_t hexbyte;
typedef boost::uuids::uuid uuid;
/**
@ -147,7 +148,6 @@ public:
* push an uint64_t onto the end of the stream. The byte order is whatever the native byte order is.
*/
EXPORT ByteStream& operator<<(const uint64_t o);
// WIP MCOL-641
/**
* push an uint128_t onto the end of the stream. The byte order is whatever the native byte order is.
*/
@ -156,6 +156,10 @@ public:
* push an uint128_t onto the end of the stream. The byte order is whatever the native byte order is.
*/
EXPORT ByteStream& operator<<(const uint128_t& o);
/**
* push an int128_t onto the end of the stream. The byte order is whatever the native byte order is.
*/
EXPORT ByteStream& operator<<(const int128_t& o);
/**
* push a float onto the end of the stream. The byte order is
* whatever the native byte order is.
@ -220,7 +224,6 @@ public:
* extract an uint64_t from the front of the stream. The byte order is whatever the native byte order is.
*/
EXPORT ByteStream& operator>>(uint64_t& o);
// WIP MCOL-641
/**
* extract an uint128_t from the front of the stream. The byte order is whatever the native byte order is.
*/
@ -229,6 +232,10 @@ public:
* extract an uint128_t from the front of the stream. The byte order is whatever the native byte order is.
*/
EXPORT ByteStream& operator>>(uint128_t& o);
/**
* extract an int128_t from the front of the stream. The byte order is whatever the native byte order is.
*/
EXPORT ByteStream& operator>>(int128_t& o);
/**
* extract a float from the front of the stream. The byte
* order is whatever the native byte order is.
@ -299,7 +306,6 @@ public:
* Peek at an uint64_t from the front of the stream. The byte order is whatever the native byte order is.
*/
EXPORT void peek(uint64_t& o) const;
// WIP MCOL-641
/**
* Peek at an uint128_t from the front of the stream. The byte order is whatever the native byte order is.
*/
@ -308,6 +314,10 @@ public:
* Peek at an uint128_t from the front of the stream. The byte order is whatever the native byte order is.
*/
EXPORT void peek(uint128_t& o) const;
/**
* Peek at an int128_t from the front of the stream. The byte order is whatever the native byte order is.
*/
EXPORT void peek(int128_t& o) const;
/**
* Peek at a float from the front of the stream. The byte order
* is whatever the native byte order is.