1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #1990 from drrtuy/MCOL-4173_9

MCOL-4173 This patch adds support for wide-DECIMAL INNER, OUTER, SEMI…
This commit is contained in:
Roman Nozdrin
2021-06-24 16:15:07 +03:00
committed by GitHub
22 changed files with 2347 additions and 228 deletions

View File

@ -126,7 +126,7 @@ public:
}
uint32_t finalize() const
{
return (uint32_t) mPart1;
return (uint32_t)mPart1;
}
};

View File

@ -30,7 +30,6 @@
#include "lbidlist.h"
#include "spinlock.h"
#include "vlarray.h"
#include "mcs_string.h"
using namespace std;
@ -42,6 +41,7 @@ using namespace joblist;
namespace joiner
{
// Typed joiner ctor
TupleJoiner::TupleJoiner(
const rowgroup::RowGroup& smallInput,
const rowgroup::RowGroup& largeInput,
@ -145,6 +145,7 @@ TupleJoiner::TupleJoiner(
nullValueForJoinColumn = smallNullRow.getSignedNullValue(smallJoinColumn);
}
// Typeless joiner ctor
TupleJoiner::TupleJoiner(
const rowgroup::RowGroup& smallInput,
const rowgroup::RowGroup& largeInput,
@ -182,67 +183,31 @@ TupleJoiner::TupleJoiner(
smallNullRow.initToNull();
}
for (i = keyLength = 0; i < smallKeyColumns.size(); i++)
{
if (smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::CHAR ||
smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::VARCHAR
||
smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::TEXT)
{
keyLength += smallRG.getColumnWidth(smallKeyColumns[i]) + 2; // +2 for length
// MCOL-698: if we don't do this LONGTEXT allocates 32TB RAM
if (keyLength > 65536)
keyLength = 65536;
}
else if (smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::LONGDOUBLE)
{
keyLength += sizeof(long double);
}
else
{
keyLength += 8;
}
// Set bSignedUnsignedJoin if one or more join columns are signed to unsigned compares.
if (smallRG.isUnsigned(smallKeyColumns[i]) != largeRG.isUnsigned(largeKeyColumns[i]))
{
bSignedUnsignedJoin = true;
}
}
// note, 'numcores' is implied by tuplehashjoin on calls to insertRGData().
// TODO: make it explicit to avoid future confusion.
storedKeyAlloc.reset(new FixedAllocator[numCores]);
for (i = 0; i < (uint) numCores; i++)
storedKeyAlloc[i].setAllocSize(keyLength);
keyLength = calculateKeyLength(smallKeyColumns, smallRG, &largeKeyColumns, &largeRG);
discreteValues.reset(new bool[smallKeyColumns.size()]);
cpValues.reset(new vector<int128_t>[smallKeyColumns.size()]);
for (i = 0; i < smallKeyColumns.size(); i++)
for (i = 0; i < smallKeyColumns.size(); ++i)
{
discreteValues[i] = false;
if (isUnsigned(smallRG.getColTypes()[smallKeyColumns[i]]))
uint32_t smallKeyColumnsIdx = smallKeyColumns[i];
auto smallSideColType = smallRG.getColTypes()[smallKeyColumnsIdx];
// Set bSignedUnsignedJoin if one or more join columns are signed to unsigned compares.
if (smallRG.isUnsigned(smallKeyColumnsIdx) != largeRG.isUnsigned(largeKeyColumns[i]))
{
if (datatypes::isWideDecimalType(
smallRG.getColType(smallKeyColumns[i]),
smallRG.getColumnWidth(smallKeyColumns[i])))
{
cpValues[i].push_back((int128_t) -1);
cpValues[i].push_back(0);
}
else
{
cpValues[i].push_back((int128_t) numeric_limits<uint64_t>::max());
cpValues[i].push_back(0);
}
bSignedUnsignedJoin = true;
}
discreteValues[i] = false;
if (isUnsigned(smallSideColType))
{
cpValues[i].push_back((int128_t) numeric_limits<uint64_t>::max());
cpValues[i].push_back(0);
}
else
{
if (datatypes::isWideDecimalType(
smallRG.getColType(smallKeyColumns[i]),
smallRG.getColumnWidth(smallKeyColumns[i])))
if (datatypes::isWideDecimalType(smallSideColType,
smallRG.getColumnWidth(smallKeyColumnsIdx)))
{
cpValues[i].push_back(utils::maxInt128);
cpValues[i].push_back(utils::minInt128);
@ -254,6 +219,12 @@ TupleJoiner::TupleJoiner(
}
}
}
// note, 'numcores' is implied by tuplehashjoin on calls to insertRGData().
// TODO: make it explicit to avoid future confusion.
storedKeyAlloc.reset(new FixedAllocator[numCores]);
for (i = 0; i < (uint) numCores; i++)
storedKeyAlloc[i].setAllocSize(keyLength);
}
TupleJoiner::TupleJoiner() { }
@ -730,10 +701,12 @@ void TupleJoiner::doneInserting()
typelesshash_t::iterator thit;
uint32_t i, pmpos = 0, rowCount;
Row smallRow;
auto smallSideColIdx = smallKeyColumns[col];
auto smallSideColType = smallRG.getColType(smallSideColIdx);
smallRG.initRow(&smallRow);
if (smallRow.isCharType(smallKeyColumns[col]))
if (smallRow.isCharType(smallSideColIdx))
continue;
rowCount = size();
@ -743,7 +716,7 @@ void TupleJoiner::doneInserting()
pmpos = 0;
else if (typelessJoin)
thit = ht[bucket]->begin();
else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
else if (isLongDouble(smallRG.getColType(smallKeyColumns[0])))
ldit = ld[bucket]->begin();
else if (!smallRG.usesStringTable())
hit = h[bucket]->begin();
@ -761,7 +734,7 @@ void TupleJoiner::doneInserting()
smallRow.setPointer(thit->second);
++thit;
}
else if (smallRG.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
else if (isLongDouble(smallSideColType))
{
while (ldit == ld[bucket]->end())
ldit = ld[++bucket]->begin();
@ -783,9 +756,9 @@ void TupleJoiner::doneInserting()
++sthit;
}
if (smallRow.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
if (isLongDouble(smallSideColType))
{
double dval = (double)roundl(smallRow.getLongDoubleField(smallKeyColumns[col]));
double dval = (double)roundl(smallRow.getLongDoubleField(smallSideColIdx));
switch (largeRG.getColType(largeKeyColumns[col]))
{
case CalpontSystemCatalog::DOUBLE:
@ -802,19 +775,18 @@ void TupleJoiner::doneInserting()
}
}
}
else if (datatypes::isWideDecimalType(
smallRow.getColType(smallKeyColumns[col]),
smallRow.getColumnWidth(smallKeyColumns[col])))
else if (datatypes::isWideDecimalType(smallSideColType,
smallRow.getColumnWidth(smallSideColIdx)))
{
uniquer.insert(*((int128_t*)smallRow.getBinaryField<int128_t>(smallKeyColumns[col])));
uniquer.insert(smallRow.getTSInt128Field(smallSideColIdx).getValue());
}
else if (smallRow.isUnsigned(smallKeyColumns[col]))
else if (smallRow.isUnsigned(smallSideColIdx))
{
uniquer.insert((int64_t)smallRow.getUintField(smallKeyColumns[col]));
uniquer.insert((int64_t)smallRow.getUintField(smallSideColIdx));
}
else
{
uniquer.insert(smallRow.getIntField(smallKeyColumns[col]));
uniquer.insert(smallRow.getIntField(smallSideColIdx));
}
CHECKSIZE;
@ -1170,7 +1142,8 @@ void TupleJoiner::updateCPData(const Row& r)
r.getColType(colIdx),
r.getColumnWidth(colIdx)))
{
uval = *((int128_t*)r.getBinaryField<int128_t>(colIdx));
uval = r.getTSInt128Field(colIdx).getValue();
}
else
{
@ -1210,7 +1183,7 @@ void TupleJoiner::updateCPData(const Row& r)
r.getColType(colIdx),
r.getColumnWidth(colIdx)))
{
val = *((int128_t*)r.getBinaryField<int128_t>(colIdx));
val = r.getTSInt128Field(colIdx).getValue();
}
else
{
@ -1283,66 +1256,134 @@ public:
}
};
class TypelessDataDecoder
class WideDecimalKeyConverter
{
const uint8_t *mPtr;
const uint8_t *mEnd;
void checkAvailableData(uint32_t nbytes) const
{
if (mPtr + nbytes > mEnd)
throw runtime_error("TypelessData is too short");
}
public:
TypelessDataDecoder(const uint8_t* ptr, size_t length)
:mPtr(ptr), mEnd(ptr + length)
const Row* mR;
uint64_t convertedValue;
const uint32_t mKeyColId;
uint16_t width;
public:
WideDecimalKeyConverter(const Row& r,
const uint32_t keyColId): mR(&r),
mKeyColId(keyColId),
width(datatypes::MAXDECIMALWIDTH)
{ }
TypelessDataDecoder(const TypelessData &data)
:TypelessDataDecoder(data.data, data.len)
{ }
ConstString scanGeneric(uint32_t length)
bool isConvertedToSmallSideType() const { return width == datatypes::MAXLEGACYWIDTH; }
int64_t getConvertedTInt64() const { return (int64_t)convertedValue; }
// Returns true if the value doesn't fit into allowed range for a type.
template <typename T, typename AT>
bool numericRangeCheckAndConvert(const AT& value)
{
checkAvailableData(length);
ConstString res((const char *) mPtr, length);
mPtr += length;
return res;
if (value > AT(std::numeric_limits<T>::max()) ||
value < AT(std::numeric_limits<T>::min()))
return true;
convertedValue = (uint64_t) static_cast<T>(value);
return false;
}
uint32_t scanStringLength()
// As of MCS 6.x there is an asumption MCS can't join having
// INTEGER and non-INTEGER potentially fractional keys,
// e.g. BIGINT to DECIMAL(38,1). It can only join BIGINT to DECIMAL(38).
// convert() checks if wide-DECIMAL overflows INTEGER type range
// and sets internal width to 0 if it is. If not width is set to 8
// and convertedValue is casted to INTEGER type.
// This convert() is called in EM to cast smallSide TypelessData
// if the key columns has a skew, e.g. INT to DECIMAL(38).
inline WideDecimalKeyConverter&
convert(const bool otherSideIsIntOrNarrow,
const execplan::CalpontSystemCatalog::ColDataType otherSideType)
{
checkAvailableData(2);
uint32_t res = ((uint32_t) mPtr[0]) * 255 + mPtr[1];
mPtr += 2;
return res;
if (otherSideIsIntOrNarrow)
{
datatypes::TSInt128 integralPart = mR->getTSInt128Field(mKeyColId);
bool isUnsigned = datatypes::isUnsigned(otherSideType);
if (isUnsigned)
{
width = (numericRangeCheckAndConvert<uint64_t>(integralPart)) ? 0 : datatypes::MAXLEGACYWIDTH;
return *this;
}
width = (numericRangeCheckAndConvert<int64_t>(integralPart)) ? 0 : datatypes::MAXLEGACYWIDTH;
}
return *this;
}
ConstString scanString()
// Stores the value that might had been converted.
inline bool store(TypelessData& typelessData,
uint32_t& off,
const uint32_t keylen) const
{
return scanGeneric(scanStringLength());
// A note from convert() if there is otherSide column type range
// overflow so store() returns TD with len=0. This tells EM to skip this
// key b/c it won't match at PP. This happens it is possible to skip
// smallSide TD but can't to do the same with largeSide b/c of OUTER joins.
if (!width)
{
typelessData.len = 0;
return true;
}
if (off + width > keylen)
return true;
switch (width)
{
case datatypes::MAXDECIMALWIDTH:
{
mR->storeInt128FieldIntoPtr(mKeyColId, &typelessData.data[off]);
break;
}
default:
{
datatypes::TUInt64(convertedValue).store(&typelessData.data[off]);
}
}
off += width;
return false;
}
};
// smallSideColWidths is non-nullptr valid pointer only
// if there is a skew b/w small and large side columns widths.
uint32 TypelessData::hash(const RowGroup& r,
const std::vector<uint32_t>& keyCols) const
const std::vector<uint32_t>& keyCols,
const std::vector<uint32_t>* smallSideKeyColumnsIds,
const rowgroup::RowGroup* smallSideRG) const
{
if (mRowPtr)
return mRowPtr->hashTypeless(keyCols);
// This part is for largeSide hashing using Row at PP.
if (!isSmallSide())
{
return mRowPtr->hashTypeless(keyCols,
smallSideKeyColumnsIds,
(smallSideRG) ? &smallSideRG->getColWidths() : nullptr);
}
// This part is for smallSide hashing at PP.
TypelessDataDecoder decoder(*this);
datatypes::MariaDBHasher hasher;
for (uint32_t i = 0; i < keyCols.size(); i++)
for (auto keyColId: keyCols)
{
switch (r.getColTypes()[keyCols[i]])
switch (r.getColTypes()[keyColId])
{
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::TEXT:
{
CHARSET_INFO *cs= const_cast<RowGroup&>(r).getCharset(keyCols[i]);
CHARSET_INFO *cs= const_cast<RowGroup&>(r).getCharset(keyColId);
hasher.add(cs, decoder.scanString());
break;
}
case CalpontSystemCatalog::DECIMAL:
{
const uint32_t width = std::max(r.getColWidths()[keyColId], datatypes::MAXLEGACYWIDTH);
if (isSmallSideWithSkewedData() || width == datatypes::MAXLEGACYWIDTH)
{
int64_t val = decoder.scanTInt64();
hasher.add(&my_charset_bin, reinterpret_cast<const char*>(&val), datatypes::MAXLEGACYWIDTH);
}
else
hasher.add(&my_charset_bin, decoder.scanGeneric(width));
break;
}
default:
{
hasher.add(&my_charset_bin, decoder.scanGeneric(8));
hasher.add(&my_charset_bin, decoder.scanGeneric(datatypes::MAXLEGACYWIDTH));
break;
}
}
@ -1350,41 +1391,84 @@ uint32 TypelessData::hash(const RowGroup& r,
return hasher.finalize();
}
// this is smallSide, Row represents largeSide record.
int TypelessData::cmpToRow(const RowGroup& r,
const std::vector<uint32_t>& keyCols,
const rowgroup::Row &row) const
const rowgroup::Row &row,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG) const
{
TypelessDataDecoder a(*this);
for (uint32_t i = 0; i < keyCols.size(); i++)
{
switch (r.getColTypes()[keyCols[i]])
auto largeSideKeyColRowIdx = keyCols[i];
switch (r.getColType(largeSideKeyColRowIdx))
{
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::TEXT:
{
datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(keyCols[i]));
datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(largeSideKeyColRowIdx));
ConstString ta = a.scanString();
ConstString tb = row.getConstString(keyCols[i]);
ConstString tb = row.getConstString(largeSideKeyColRowIdx);
if (int rc= cs.strnncollsp(ta, tb))
return rc;
break;
}
case CalpontSystemCatalog::DECIMAL:
{
auto largeSideWidth = row.getColumnWidth(largeSideKeyColRowIdx);
// First branch processes skewed JOIN, e.g. INT to DECIMAL(38)
// else branch processes decimal with common width at both small- and largeSide.
if (isSmallSideWithSkewedData() &&
largeSideWidth != smallSideRG->getColumnWidth(smallSideKeyColumnsIds->operator[](i)))
{
if (largeSideWidth == datatypes::MAXLEGACYWIDTH)
{
if (int rc = a.scanTInt64() != row.getIntField(largeSideKeyColRowIdx))
return rc;
}
else
{
WideDecimalKeyConverter cv(row, largeSideKeyColRowIdx);
if (!cv.convert(true,
smallSideRG->getColType(smallSideKeyColumnsIds->operator[](i)))
.isConvertedToSmallSideType())
return 1;
if (int rc = a.scanTInt64() != cv.getConvertedTInt64())
return rc;
}
}
else
{
// There is an assumption that both sides here are equal and are either 8 or 16 bytes.
if (largeSideWidth == datatypes::MAXDECIMALWIDTH)
{
if (int rc = a.scanTInt128() != row.getTSInt128Field(largeSideKeyColRowIdx))
return rc;
}
else
{
if (int rc = a.scanTInt64() != row.getIntField(largeSideKeyColRowIdx))
return rc;
}
}
break;
}
default:
{
ConstString ta = a.scanGeneric(datatypes::MAXLEGACYWIDTH);
if (r.isUnsigned(keyCols[i]))
if (r.isUnsigned(largeSideKeyColRowIdx))
{
uint64_t tb = row.getUintField(keyCols[i]);
if (int rc= memcmp(ta.str(), &tb , datatypes::MAXLEGACYWIDTH))
uint64_t tb = row.getUintField(largeSideKeyColRowIdx);
if (int rc = memcmp(ta.str(), &tb , datatypes::MAXLEGACYWIDTH))
return rc;
}
else
{
int64_t tb = row.getIntField(keyCols[i]);
if (int rc= memcmp(ta.str(), &tb , datatypes::MAXLEGACYWIDTH))
int64_t tb = row.getIntField(largeSideKeyColRowIdx);
if (int rc = memcmp(ta.str(), &tb , datatypes::MAXLEGACYWIDTH))
return rc;
}
break;
@ -1394,39 +1478,60 @@ int TypelessData::cmpToRow(const RowGroup& r,
return 0; // Equal
}
int TypelessData::cmp(const RowGroup& r, const std::vector<uint32_t>& keyCols,
const TypelessData &da, const TypelessData &db)
const TypelessData &da, const TypelessData &db,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG)
{
idbassert((da.mRowPtr == nullptr) + (db.mRowPtr == nullptr) > 0);
if (da.mRowPtr)
return -db.cmpToRow(r, keyCols, da.mRowPtr[0]);
if (db.mRowPtr)
return da.cmpToRow(r, keyCols, db.mRowPtr[0]);
idbassert(da.isSmallSide() || db.isSmallSide());
if (!da.isSmallSide() && db.isSmallSide())
return -db.cmpToRow(r, keyCols, da.mRowPtr[0], smallSideKeyColumnsIds, smallSideRG);
if (da.isSmallSide() && !db.isSmallSide())
return da.cmpToRow(r, keyCols, db.mRowPtr[0], smallSideKeyColumnsIds, smallSideRG);
// This case happens in BPP::addToJoiner when it populates the final
// hashmap with multiple smallSide TDs from temp hashmaps.
idbassert(da.isSmallSide() && db.isSmallSide());
TypelessDataDecoder a(da);
TypelessDataDecoder b(db);
for (uint32_t i = 0; i < keyCols.size(); i++)
for (uint32_t i = 0; i < keyCols.size(); ++i)
{
switch (r.getColTypes()[keyCols[i]])
auto keyColIdx = keyCols[i];
switch (r.getColTypes()[keyColIdx])
{
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::CHAR:
case CalpontSystemCatalog::TEXT:
{
datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(keyCols[i]));
datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(keyColIdx));
ConstString ta = a.scanString();
ConstString tb = b.scanString();
if (int rc= cs.strnncollsp(ta, tb))
return rc;
break;
}
case CalpontSystemCatalog::DECIMAL:
{
auto largeSideWidth = r.getColumnWidth(keyColIdx);
// First and second branches processes skewed JOIN, e.g. INT to DECIMAL(38)
// Third processes decimal with common width at both small- and largeSide.
auto width = (da.isSmallSideWithSkewedData() &&
largeSideWidth != smallSideRG->getColumnWidth(smallSideKeyColumnsIds->operator[](i))) ? datatypes::MAXLEGACYWIDTH : std::max(r.getColWidths()[keyColIdx], datatypes::MAXLEGACYWIDTH);
ConstString ta = a.scanGeneric(width);
ConstString tb = b.scanGeneric(width);
if (int rc= memcmp(ta.str(), tb.str(), width))
return rc;
break;
}
default:
{
ConstString ta = a.scanGeneric(8);
ConstString tb = b.scanGeneric(8);
ConstString ta = a.scanGeneric(datatypes::MAXLEGACYWIDTH);
ConstString tb = b.scanGeneric(datatypes::MAXLEGACYWIDTH);
idbassert(ta.length() == tb.length());
// It is impossible to join signed to unsigned types now
// but there is a potential error, e.g. uint64 vs negative int64.
if (int rc= memcmp(ta.str(), tb.str() , ta.length()))
return rc;
break;
@ -1438,23 +1543,24 @@ int TypelessData::cmp(const RowGroup& r, const std::vector<uint32_t>& keyCols,
// Called in joblist code to produce SmallSide TypelessData to be sent to PP.
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
uint32_t keylen, FixedAllocator* fa,
const rowgroup::RowGroup& otherSideRG, const std::vector<uint32_t>& otherKeyCols)
const rowgroup::RowGroup& otherSideRG,
const std::vector<uint32_t>& otherKeyCols)
{
TypelessData ret;
uint32_t off = 0, i;
execplan::CalpontSystemCatalog::ColDataType type;
ret.data = (uint8_t*) fa->allocate();
idbassert(keyCols.size() == otherKeyCols.size());
for (i = 0; i < keyCols.size(); i++)
{
type = r.getColTypes()[keyCols[i]];
if (type == CalpontSystemCatalog::VARCHAR ||
type == CalpontSystemCatalog::CHAR ||
type == CalpontSystemCatalog::TEXT)
if (datatypes::isCharType(type))
{
// this is a string, copy a normalized version
const uint8_t* str = r.getStringPointer(keyCols[i]);
@ -1462,7 +1568,19 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
if (TypelessDataStringEncoder(str, width).store(ret.data, off, keylen))
goto toolong;
}
else if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
else if (datatypes::isWideDecimalType(type, r.getColumnWidth(keyCols[i])))
{
bool otherSideIsIntOrNarrow = otherSideRG.getColumnWidth(otherKeyCols[i]) <= datatypes::MAXLEGACYWIDTH;
// useless if otherSideIsInt is false
auto otherSideType = (otherSideIsIntOrNarrow) ? otherSideRG.getColType(otherKeyCols[i])
: datatypes::SystemCatalog::UNDEFINED;
if (WideDecimalKeyConverter(r, keyCols[i]).convert(otherSideIsIntOrNarrow, otherSideType)
.store(ret, off, keylen))
{
goto toolong;
}
}
else if (datatypes::isLongDouble(type))
{
if (off + sizeof(long double) > keylen)
goto toolong;
@ -1546,7 +1664,7 @@ toolong:
return ret;
}
// The method is used by disk-based JOIN and it is not collation or wide DECIMAL aware.
uint64_t getHashOfTypelessKey(const Row& r, const vector<uint32_t>& keyCols, uint32_t seed)
{
Hasher_r hasher;
@ -1620,14 +1738,7 @@ void TypelessData::serialize(messageqcpp::ByteStream& b) const
{
b << len;
b.append(data, len);
}
void TypelessData::deserialize(messageqcpp::ByteStream& b, utils::FixedAllocator& fa)
{
b >> len;
data = (uint8_t*) fa.allocate(len);
memcpy(data, b.buf(), len);
b.advance(len);
// Flags are not send b/c they are locally significant now.
}
void TypelessData::deserialize(messageqcpp::ByteStream& b, utils::PoolAllocator& fa)
@ -1789,9 +1900,87 @@ boost::shared_ptr<TupleJoiner> TupleJoiner::copyForDiskJoin()
return ret;
}
// Used for Typeless JOIN to detect if there is a JOIN when largeSide is wide-DECIMAL and
// smallSide is a smaller data type, e.g. INT or narrow-DECIMAL.
bool TupleJoiner::joinHasSkewedKeyColumn()
{
std::vector<uint32_t>::const_iterator largeSideKeyColumnsIter = getLargeKeyColumns().begin();
std::vector<uint32_t>::const_iterator smallSideKeyColumnsIter = getSmallKeyColumns().begin();
idbassert(getLargeKeyColumns().size() == getSmallKeyColumns().size());
while (largeSideKeyColumnsIter != getLargeKeyColumns().end())
{
auto smallSideColumnWidth = smallRG.getColumnWidth(*smallSideKeyColumnsIter);
auto largeSideColumnWidth = largeRG.getColumnWidth(*largeSideKeyColumnsIter);
bool widthIsDifferent = smallSideColumnWidth != largeSideColumnWidth;
if (widthIsDifferent && (datatypes::isWideDecimalType(smallRG.getColTypes()[*smallSideKeyColumnsIter], smallSideColumnWidth) ||
datatypes::isWideDecimalType(largeRG.getColTypes()[*largeSideKeyColumnsIter], largeSideColumnWidth)))
{
return true;
}
++largeSideKeyColumnsIter;
++smallSideKeyColumnsIter;
}
return false;
}
void TupleJoiner::setConvertToDiskJoin()
{
_convertToDiskJoin = true;
}
// The method is made to reuse the code from Typeless TupleJoiner ctor.
// It is used in the mentioned ctor and in initBPP() to calculate
// Typeless key length in case of a JOIN when large side column is INT
// and small side column is wide-DECIMAL.
// An important assumption is that if the type is DECIMAL than it must
// be wide-DECIMAL b/c MCS calls the function running Typeless TupleJoiner
// ctor.
uint32_t calculateKeyLength(const std::vector<uint32_t>& aKeyColumnsIds,
const rowgroup::RowGroup& aSmallRowGroup,
const std::vector<uint32_t>* aLargeKeyColumnsIds,
const rowgroup::RowGroup* aLargeRowGroup)
{
uint32_t keyLength = 0;
for (size_t keyColumnIdx = 0; keyColumnIdx < aKeyColumnsIds.size(); ++keyColumnIdx)
{
auto smallSideKeyColumnId = aKeyColumnsIds[keyColumnIdx];
auto largeSideKeyColumnId = (aLargeRowGroup)
? aLargeKeyColumnsIds->operator[](keyColumnIdx)
: std::numeric_limits<uint64_t>::max();
const auto& smallKeyColumnType = aSmallRowGroup.getColTypes()[smallSideKeyColumnId];
// Not used if aLargeRowGroup is 0 that happens in PrimProc.
const auto& largeKeyColumntype = (aLargeRowGroup) ? aLargeRowGroup->getColTypes()[largeSideKeyColumnId]
: datatypes::SystemCatalog::UNDEFINED;
if (datatypes::isCharType(smallKeyColumnType))
{
keyLength += aSmallRowGroup.getColumnWidth(smallSideKeyColumnId) + 2; // +2 for encoded length
// MCOL-698: if we don't do this LONGTEXT allocates 32TB RAM
if (keyLength > 65536)
return 65536;
}
else if (datatypes::isLongDouble(smallKeyColumnType))
{
keyLength += sizeof(long double);
}
else if (datatypes::isWideDecimalType(smallKeyColumnType,
aSmallRowGroup.getColumnWidth(smallSideKeyColumnId)))
{
keyLength += (aLargeRowGroup &&
!datatypes::isWideDecimalType(largeKeyColumntype,
aLargeRowGroup->getColumnWidth(smallSideKeyColumnId)))
? datatypes::MAXLEGACYWIDTH // Small=Wide, Large=Narrow/xINT
: datatypes::MAXDECIMALWIDTH; // Small=Wide, Large=Wide
}
else
// The branch covers all datatypes left including skewed DECIMAL JOIN case
// Small=Wide, Large=Narrow
{
keyLength += datatypes::MAXLEGACYWIDTH;
}
}
return keyLength;
}
};

View File

@ -40,32 +40,77 @@
#include "hasher.h"
#include "threadpool.h"
#include "columnwidth.h"
#include "mcs_string.h"
namespace joiner
{
uint32_t calculateKeyLength(const std::vector<uint32_t>& aKeyColumnsIds,
const rowgroup::RowGroup& aRowGroup,
const std::vector<uint32_t>* aLargeKeyColumnsIds = nullptr,
const rowgroup::RowGroup* aLargeRowGroup = nullptr);
constexpr uint8_t IS_SMALLSIDE = 0x01; // SmallSide of a JOIN w/o a skew in key columns widths
constexpr uint8_t IS_SMALLSIDE_SKEWED = 0x02; // SmallSide of a JOIN with a skew in key cols widths
class TypelessDataDecoder;
class TypelessData
{
public:
uint8_t* data;
union {
uint8_t* data;
const rowgroup::Row *mRowPtr;
};
uint32_t len;
const rowgroup::Row *mRowPtr;
// The flags are locally significant in PP now so serialize doesn't send it over the wire.
uint32_t mFlags;
TypelessData() : data(NULL), len(0), mRowPtr(nullptr) { }
TypelessData(const rowgroup::Row *rowPtr) : data(NULL), len(0), mRowPtr(rowPtr) { }
TypelessData() : data(nullptr), len(0), mFlags(0) { }
TypelessData(const rowgroup::Row *rowPtr) : mRowPtr(rowPtr), len(0), mFlags(0) { }
TypelessData(messageqcpp::ByteStream& bs, utils::PoolAllocator& memAllocator) : data(nullptr), len(0), mFlags(0)
{
deserialize(bs, memAllocator);
}
inline bool operator==(const TypelessData&) const;
void serialize(messageqcpp::ByteStream&) const;
void deserialize(messageqcpp::ByteStream&, utils::FixedAllocator&);
void deserialize(messageqcpp::ByteStream&, utils::PoolAllocator&);
std::string toString() const;
uint32_t hash(const rowgroup::RowGroup&, const std::vector<uint32_t>& keyCols) const;
static int cmp(const rowgroup::RowGroup&, const std::vector<uint32_t>& keyCols,
uint32_t hash(const rowgroup::RowGroup&,
const std::vector<uint32_t>& keyCols,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG) const;
static int cmp(const rowgroup::RowGroup&,
const std::vector<uint32_t>& keyCols,
const TypelessData &a,
const TypelessData &b);
int cmpToRow(const rowgroup::RowGroup& r, const std::vector<uint32_t>& keyCols,
const rowgroup::Row &db) const;
const TypelessData &b,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG);
int cmpToRow(const rowgroup::RowGroup& r,
const std::vector<uint32_t>& keyCols,
const rowgroup::Row &row,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG) const;
inline void setSmallSide()
{
mFlags |= IS_SMALLSIDE;
}
inline void setSmallSideWithSkewedData()
{
mFlags |= IS_SMALLSIDE_SKEWED;
}
inline bool isSmallSide() const
{
return mFlags & (IS_SMALLSIDE_SKEWED | IS_SMALLSIDE);
}
inline bool isSmallSideWithSkewedData() const
{
return mFlags & IS_SMALLSIDE_SKEWED;
}
};
// This operator is used in EM only so it doesn't support TD cmp operation
// using Row pointers.
inline bool TypelessData::operator==(const TypelessData& t) const
{
if (len != t.len)
@ -77,6 +122,57 @@ inline bool TypelessData::operator==(const TypelessData& t) const
return (memcmp(data, t.data, len) == 0);
}
class TypelessDataDecoder
{
const uint8_t *mPtr;
const uint8_t *mEnd;
void checkAvailableData(uint32_t nbytes) const
{
if (mPtr + nbytes > mEnd)
throw runtime_error("TypelessData is too short");
}
public:
TypelessDataDecoder(const uint8_t* ptr, size_t length)
:mPtr(ptr), mEnd(ptr + length)
{ }
TypelessDataDecoder(const TypelessData &data)
:TypelessDataDecoder(data.data, data.len)
{ }
utils::ConstString scanGeneric(uint32_t length)
{
checkAvailableData(length);
utils::ConstString res((const char *) mPtr, length);
mPtr += length;
return res;
}
uint32_t scanStringLength()
{
checkAvailableData(2);
uint32_t res = ((uint32_t) mPtr[0]) * 255 + mPtr[1];
mPtr += 2;
return res;
}
utils::ConstString scanString()
{
return scanGeneric(scanStringLength());
}
int64_t scanTInt64()
{
checkAvailableData(sizeof(int64_t));
int64_t res = *reinterpret_cast<const int64_t*>(mPtr);
mPtr += sizeof(int64_t);
return res;
}
datatypes::TSInt128 scanTInt128()
{
checkAvailableData(datatypes::MAXDECIMALWIDTH);
datatypes::TSInt128 res(mPtr);
mPtr += datatypes::MAXDECIMALWIDTH;
return res;
}
};
// Comparator for long double in the hash
class LongDoubleEq
{
@ -104,10 +200,16 @@ class TypelessDataStructure
public:
const rowgroup::RowGroup *mRowGroup;
const std::vector<uint32_t> *mMap;
const std::vector<uint32_t> *mSmallSideKeyColumnsIds;
const rowgroup::RowGroup *mSmallSideRG;
TypelessDataStructure(const rowgroup::RowGroup *rg,
const std::vector<uint32_t> *map)
const std::vector<uint32_t> *map,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG)
:mRowGroup(rg),
mMap(map)
mMap(map),
mSmallSideKeyColumnsIds(smallSideKeyColumnsIds),
mSmallSideRG(smallSideRG)
{ }
};
@ -150,12 +252,14 @@ public:
struct TypelessDataHasher: public TypelessDataStructure
{
TypelessDataHasher(const rowgroup::RowGroup *rg,
const std::vector<uint32_t> *map)
:TypelessDataStructure(rg, map)
const std::vector<uint32_t> *map,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG)
:TypelessDataStructure(rg, map, smallSideKeyColumnsIds, smallSideRG)
{ }
inline size_t operator()(const TypelessData& e) const
{
return e.hash(*mRowGroup, *mMap);
return e.hash(*mRowGroup, *mMap, mSmallSideKeyColumnsIds, mSmallSideRG);
}
};
@ -163,12 +267,14 @@ public:
{
public:
TypelessDataComparator(const rowgroup::RowGroup *rg,
const std::vector<uint32_t> *map)
:TypelessDataStructure(rg, map)
const std::vector<uint32_t> *map,
const std::vector<uint32_t> *smallSideKeyColumnsIds,
const rowgroup::RowGroup *smallSideRG)
:TypelessDataStructure(rg, map, smallSideKeyColumnsIds, smallSideRG)
{ }
bool operator()(const TypelessData& a, const TypelessData& b) const
{
return !TypelessData::cmp(*mRowGroup, *mMap, a, b);
return !TypelessData::cmp(*mRowGroup, *mMap, a, b, mSmallSideKeyColumnsIds, mSmallSideRG);
}
};
@ -365,6 +471,12 @@ public:
return nullValueForJoinColumn;
}
// Wide-DECIMAL JOIN
bool joinHasSkewedKeyColumn();
inline const vector<uint32_t>& getSmallSideColumnsWidths() const
{
return smallRG.getColWidths();
}
// Disk-based join support
void clearData();
boost::shared_ptr<TupleJoiner> copyForDiskJoin();

View File

@ -170,6 +170,14 @@ ByteStream& ByteStream::operator<<(const uint8_t b)
return *this;
}
ByteStream& ByteStream::operator<<(const bool b)
{
add(b);
return *this;
}
ByteStream& ByteStream::operator<<(const int16_t d)
{
if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead))
@ -296,6 +304,14 @@ ByteStream& ByteStream::operator>>(uint8_t& b)
return *this;
}
ByteStream& ByteStream::operator>>(bool& b)
{
peek(b);
fCurOutPtr++;
return *this;
}
ByteStream& ByteStream::operator>>(int16_t& d)
{
peek(d);
@ -382,6 +398,15 @@ void ByteStream::peek(uint8_t& b) const
b = *((int8_t*)fCurOutPtr);
}
void ByteStream::peek(bool& b) const
{
if (length() < 1)
throw underflow_error("ByteStream::peek(bool): not enough data in stream to fill datatype");
b = *((bool*)fCurOutPtr);
}
void ByteStream::peek(int16_t& d) const
{
if (length() < 2)

View File

@ -113,6 +113,7 @@ public:
* push a uint8_t onto the end of the stream
*/
EXPORT ByteStream& operator<<(const uint8_t b);
EXPORT ByteStream& operator<<(const bool b);
/**
* push a int16_t onto the end of the stream. The byte order is whatever the native byte order is.
*/
@ -195,6 +196,7 @@ public:
* extract a uint8_t from the front of the stream.
*/
EXPORT ByteStream& operator>>(uint8_t& b);
EXPORT ByteStream& operator>>(bool& b);
/**
* extract a int16_t from the front of the stream. The byte order is whatever the native byte order is.
*/
@ -273,6 +275,7 @@ public:
* Peek at a uint8_t from the front of the stream.
*/
EXPORT void peek(uint8_t& b) const;
EXPORT void peek(bool& b) const;
/**
* Peek at a int16_t from the front of the stream. The byte order is whatever the native byte order is.
*/

View File

@ -1068,7 +1068,7 @@ void RowAggregation::makeAggFieldsNull(Row& row)
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
int colWidth = fRowGroupOut->getColumnWidth(colOut);
uint32_t colWidth = fRowGroupOut->getColumnWidth(colOut);
if (LIKELY(colWidth == datatypes::MAXDECIMALWIDTH))
{
uint32_t offset = row.getOffset(colOut);
@ -1095,7 +1095,7 @@ void RowAggregation::makeAggFieldsNull(Row& row)
case execplan::CalpontSystemCatalog::VARBINARY:
case execplan::CalpontSystemCatalog::BLOB:
{
int colWidth = fRowGroupOut->getColumnWidth(colOut);
uint32_t colWidth = fRowGroupOut->getColumnWidth(colOut);
if (colWidth <= datatypes::MAXLEGACYWIDTH)
{

View File

@ -1320,6 +1320,12 @@ RowGroup& RowGroup::operator=(const RowGroup& r)
return *this;
}
RowGroup::RowGroup(ByteStream& bs): columnCount(0), data(nullptr), rgData(nullptr), strings(nullptr),
useStringTable(true), hasCollation(false), hasLongStringField(false), sTableThreshold(20)
{
this->deserialize(bs);
}
RowGroup::~RowGroup()
{
}

View File

@ -131,6 +131,16 @@ const int16_t rgCommonSize = 8192;
#pragma warning (disable : 4200)
#endif
// Helper to get a value from nested vector pointers.
template<typename T>
inline T derefFromTwoVectorPtrs(const std::vector<T>* outer,
const std::vector<T>* inner,
const T innerIdx)
{
auto outerIdx = inner->operator[](innerIdx);
return outer->operator[](outerIdx);
}
class StringStore
{
public:
@ -434,6 +444,7 @@ public:
getPrecision(colIndex));
}
inline long double getLongDoubleField(uint32_t colIndex) const;
inline void storeInt128FieldIntoPtr(uint32_t colIndex, uint8_t* x) const;
inline void getInt128Field(uint32_t colIndex, int128_t& x) const;
inline datatypes::TSInt128 getTSInt128Field(uint32_t colIndex) const;
@ -559,12 +570,17 @@ public:
inline uint64_t hash(uint32_t lastCol) const; // generates a hash for cols [0-lastCol]
inline uint64_t hash() const; // generates a hash for all cols
inline void colUpdateMariaDBHasher(datatypes::MariaDBHasher &hasher, uint32_t col) const;
inline void colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &hasher, uint32_t col) const;
inline uint64_t hashTypeless(const std::vector<uint32_t>& keyCols) const
inline void colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &hasher, uint32_t keyColsIdx,
const std::vector<uint32_t>& keyCols,
const std::vector<uint32_t>* smallSideKeyColumnsIds,
const std::vector<uint32_t>* smallSideColumnsWidths) const;
inline uint64_t hashTypeless(const std::vector<uint32_t>& keyCols,
const std::vector<uint32_t>* smallSideKeyColumnsIds,
const std::vector<uint32_t>* smallSideColumnsWidths) const
{
datatypes::MariaDBHasher h;
for (uint32_t i = 0; i < keyCols.size(); i++)
colUpdateMariaDBHasherTypeless(h, keyCols[i]);
colUpdateMariaDBHasherTypeless(h, i, keyCols, smallSideKeyColumnsIds, smallSideColumnsWidths);
return h.finalize();
}
@ -950,30 +966,65 @@ inline void Row::colUpdateMariaDBHasher(datatypes::MariaDBHasher &h, uint32_t co
}
inline void Row::colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &h, uint32_t col) const
inline void Row::colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &h, uint32_t keyColsIdx,
const std::vector<uint32_t>& keyCols,
const std::vector<uint32_t>* smallSideKeyColumnsIds,
const std::vector<uint32_t>* smallSideColumnsWidths) const
{
switch (getColType(col))
auto rowKeyColIdx = keyCols[keyColsIdx];
auto largeSideColType = getColType(rowKeyColIdx);
switch (largeSideColType)
{
case datatypes::SystemCatalog::CHAR:
case datatypes::SystemCatalog::VARCHAR:
case datatypes::SystemCatalog::BLOB:
case datatypes::SystemCatalog::TEXT:
{
CHARSET_INFO *cs = getCharset(col);
h.add(cs, getConstString(col));
CHARSET_INFO *cs = getCharset(rowKeyColIdx);
h.add(cs, getConstString(rowKeyColIdx));
break;
}
case datatypes::SystemCatalog::DECIMAL:
{
auto width = getColumnWidth(rowKeyColIdx);
if (datatypes::isWideDecimalType(largeSideColType,
width))
{
bool joinHasSkewedKeyColumn = (smallSideColumnsWidths);
datatypes::TSInt128 val = getTSInt128Field(rowKeyColIdx);
if (joinHasSkewedKeyColumn &&
width != derefFromTwoVectorPtrs(smallSideColumnsWidths, smallSideKeyColumnsIds, keyColsIdx))
{
if (val.getValue() >= std::numeric_limits<int64_t>::min() &&
val.getValue() <= std::numeric_limits<uint64_t>::max())
{
h.add(&my_charset_bin, (const char*)&val.getValue(), datatypes::MAXLEGACYWIDTH);
}
else
h.add(&my_charset_bin, (const char*)&val.getValue(), datatypes::MAXDECIMALWIDTH);
}
else
h.add(&my_charset_bin, (const char*)&val.getValue(), datatypes::MAXDECIMALWIDTH);
}
else
{
int64_t val = getIntField(rowKeyColIdx);
h.add(&my_charset_bin, (const char*) &val, datatypes::MAXLEGACYWIDTH);
}
break;
}
default:
{
if (isUnsigned(col))
if (isUnsigned(rowKeyColIdx))
{
uint64_t tb = getUintField(col);
h.add(&my_charset_bin, (const char*) &tb, 8);
uint64_t val = getUintField(rowKeyColIdx);
h.add(&my_charset_bin, (const char*) &val, datatypes::MAXLEGACYWIDTH);
}
else
{
int64_t val = getIntField(col);
h.add(&my_charset_bin, (const char*) &val, 8);
int64_t val = getIntField(rowKeyColIdx);
h.add(&my_charset_bin, (const char*) &val, datatypes::MAXLEGACYWIDTH);
}
break;
@ -981,7 +1032,6 @@ inline void Row::colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &h, uin
}
}
inline void Row::setStringField(const uint8_t* strdata, uint32_t length, uint32_t colIndex)
{
uint64_t offset;
@ -1096,6 +1146,11 @@ inline long double Row::getLongDoubleField(uint32_t colIndex) const
return *((long double*) &data[offsets[colIndex]]);
}
inline void Row::storeInt128FieldIntoPtr(uint32_t colIndex, uint8_t* x) const
{
datatypes::TSInt128::assignPtrPtr(x, &data[offsets[colIndex]]);
}
inline void Row::getInt128Field(uint32_t colIndex, int128_t& x) const
{
datatypes::TSInt128::assignPtrPtr(&x, &data[offsets[colIndex]]);
@ -1489,6 +1544,8 @@ public:
/** @brief Assignment operator. It copies metadata, not the row data */
RowGroup& operator=(const RowGroup&);
explicit RowGroup(messageqcpp::ByteStream& bs);
~RowGroup();
inline void initRow(Row*, bool forceInlineData = false) const;