You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Part#1 MCOL-4064 Make JOIN collation aware
Making field1=field2 collation aware for long CHAR/VARCHAR.
This commit is contained in:
@ -354,7 +354,11 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
|
||||
bs >> tlKeyLengths[i];
|
||||
//storedKeyAllocators[i] = PoolAllocator();
|
||||
for (uint j = 0; j < processorThreads; ++j)
|
||||
tlJoiners[i][j].reset(new TLJoiner(10, TupleJoiner::hasher()));
|
||||
tlJoiners[i][j].reset(new TLJoiner(10,
|
||||
TupleJoiner::TypelessDataHasher(&outputRG,
|
||||
&tlLargeSideKeyColumns[i]),
|
||||
TupleJoiner::TypelessDataComparator(&outputRG,
|
||||
&tlLargeSideKeyColumns[i])));
|
||||
}
|
||||
}
|
||||
|
||||
@ -613,7 +617,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
|
||||
{
|
||||
tlLargeKey.deserialize(bs, storedKeyAllocator);
|
||||
bs >> tlIndex;
|
||||
bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask;
|
||||
bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[joinerNum]) & ptMask;
|
||||
tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex));
|
||||
}
|
||||
else
|
||||
@ -1188,7 +1192,7 @@ void BatchPrimitiveProcessor::executeTupleJoin()
|
||||
// the null values are not sent by UM in typeless case. null -> !found
|
||||
tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j],
|
||||
&tmpKeyAllocators[j]);
|
||||
uint bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask;
|
||||
uint bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[j]) & ptMask;
|
||||
found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
|
||||
|
||||
if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
|
||||
@ -2736,7 +2740,7 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
|
||||
TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex],
|
||||
tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]);
|
||||
pair<TLJoiner::iterator, TLJoiner::iterator> range;
|
||||
bucket = bucketPicker((char *) largeKey.data, largeKey.len, bpSeed) & ptMask;
|
||||
bucket = largeKey.hash(outputRG, tlLargeSideKeyColumns[jIndex]) & ptMask;
|
||||
range = tlJoiners[jIndex][bucket]->equal_range(largeKey);
|
||||
for (; range.first != range.second; ++range.first)
|
||||
v.push_back(range.first->second);
|
||||
|
@ -296,7 +296,9 @@ private:
|
||||
utils::STLPoolAllocator<std::pair<const uint64_t, uint32_t> > > TJoiner;
|
||||
|
||||
typedef std::tr1::unordered_multimap<joiner::TypelessData,
|
||||
uint32_t, joiner::TupleJoiner::hasher, std::equal_to<joiner::TypelessData>,
|
||||
uint32_t,
|
||||
joiner::TupleJoiner::TypelessDataHasher,
|
||||
joiner::TupleJoiner::TypelessDataComparator,
|
||||
utils::STLPoolAllocator<std::pair<const joiner::TypelessData, uint32_t> > > TLJoiner;
|
||||
|
||||
bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0);
|
||||
|
@ -98,6 +98,10 @@ public:
|
||||
cs->hash_sort((const uchar *) str, length, &mPart1, &mPart2);
|
||||
return *this;
|
||||
}
|
||||
MariaDBHasher & add(CHARSET_INFO *cs, const utils::ConstString &str)
|
||||
{
|
||||
return add(cs, str.str(), str.length());
|
||||
}
|
||||
uint32_t finalize() const
|
||||
{
|
||||
return (uint32_t) mPart1;
|
||||
|
@ -188,7 +188,7 @@ TupleJoiner::TupleJoiner(
|
||||
||
|
||||
smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::TEXT)
|
||||
{
|
||||
keyLength += smallRG.getColumnWidth(smallKeyColumns[i]) + 1; // +1 null char
|
||||
keyLength += smallRG.getColumnWidth(smallKeyColumns[i]) + 2; // +2 for length
|
||||
|
||||
// MCOL-698: if we don't do this LONGTEXT allocates 32TB RAM
|
||||
if (keyLength > 65536)
|
||||
@ -1244,11 +1244,86 @@ size_t TupleJoiner::size() const
|
||||
return rows.size();
|
||||
}
|
||||
|
||||
|
||||
class TypelessDataStringEncoder
|
||||
{
|
||||
const uint8_t* mStr;
|
||||
uint32_t mLength;
|
||||
public:
|
||||
TypelessDataStringEncoder(const uint8_t *str, uint32_t length)
|
||||
:mStr(str), mLength(length)
|
||||
{ }
|
||||
bool store(uint8_t* to, uint32_t& off, uint32_t keylen) const
|
||||
{
|
||||
if (mLength > 0xFFFF) // We encode length into two bytes below
|
||||
{
|
||||
throw runtime_error("Cannot join strings greater than 64KB");
|
||||
}
|
||||
|
||||
if (off + mLength + 2 > keylen)
|
||||
return true;
|
||||
|
||||
to[off++]= mLength / 0xFF;
|
||||
to[off++]= mLength % 0xFF;
|
||||
/*
|
||||
QQ: perhaps now when we put length,
|
||||
we don't need to stop at '\0' bytes any more.
|
||||
If so, the loop below can be replace to memcpy().
|
||||
*/
|
||||
for (uint32_t j = 0; j < mLength && mStr[j] != 0; j++)
|
||||
{
|
||||
if (off >= keylen)
|
||||
return true;
|
||||
to[off++] = mStr[j];
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class TypelessDataDecoder
|
||||
{
|
||||
const uint8_t *mPtr;
|
||||
const uint8_t *mEnd;
|
||||
void checkAvailableData(uint32_t nbytes) const
|
||||
{
|
||||
if (mPtr + nbytes > mEnd)
|
||||
throw runtime_error("TypelessData is too short");
|
||||
}
|
||||
public:
|
||||
TypelessDataDecoder(const uint8_t* ptr, size_t length)
|
||||
:mPtr(ptr), mEnd(ptr + length)
|
||||
{ }
|
||||
TypelessDataDecoder(const TypelessData &data)
|
||||
:TypelessDataDecoder(data.data, data.len)
|
||||
{ }
|
||||
ConstString scanGeneric(uint32_t length)
|
||||
{
|
||||
checkAvailableData(length);
|
||||
ConstString res((const char *) mPtr, length);
|
||||
mPtr += length;
|
||||
return res;
|
||||
}
|
||||
uint32_t scanStringLength()
|
||||
{
|
||||
checkAvailableData(2);
|
||||
uint32_t res = ((uint32_t) mPtr[0]) * 255 + mPtr[1];
|
||||
mPtr += 2;
|
||||
return res;
|
||||
}
|
||||
ConstString scanString()
|
||||
{
|
||||
return scanGeneric(scanStringLength());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
|
||||
uint32_t keylen, FixedAllocator* fa)
|
||||
{
|
||||
TypelessData ret;
|
||||
uint32_t off = 0, i, j;
|
||||
uint32_t off = 0, i;
|
||||
execplan::CalpontSystemCatalog::ColDataType type;
|
||||
|
||||
ret.data = (uint8_t*) fa->allocate();
|
||||
@ -1264,24 +1339,8 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
|
||||
// this is a string, copy a normalized version
|
||||
const uint8_t* str = r.getStringPointer(keyCols[i]);
|
||||
uint32_t width = r.getStringLength(keyCols[i]);
|
||||
|
||||
if (width > 65536)
|
||||
{
|
||||
throw runtime_error("Cannot join strings greater than 64KB");
|
||||
}
|
||||
|
||||
for (j = 0; j < width && str[j] != 0; j++)
|
||||
{
|
||||
if (off >= keylen)
|
||||
goto toolong;
|
||||
|
||||
ret.data[off++] = str[j];
|
||||
}
|
||||
|
||||
if (off >= keylen)
|
||||
if (TypelessDataStringEncoder(str, width).store(ret.data, off, keylen))
|
||||
goto toolong;
|
||||
|
||||
ret.data[off++] = 0;
|
||||
}
|
||||
else if (r.isUnsigned(keyCols[i]))
|
||||
{
|
||||
@ -1308,12 +1367,78 @@ toolong:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
uint32 TypelessData::hash(const RowGroup& r,
|
||||
const std::vector<uint32_t>& keyCols) const
|
||||
{
|
||||
TypelessDataDecoder decoder(*this);
|
||||
datatypes::MariaDBHasher hasher;
|
||||
for (uint32_t i = 0; i < keyCols.size(); i++)
|
||||
{
|
||||
switch (r.getColTypes()[keyCols[i]])
|
||||
{
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
case CalpontSystemCatalog::TEXT:
|
||||
{
|
||||
CHARSET_INFO *cs= const_cast<RowGroup&>(r).getCharset(keyCols[i]);
|
||||
hasher.add(cs, decoder.scanString());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
hasher.add(&my_charset_bin, decoder.scanGeneric(8));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return hasher.finalize();
|
||||
}
|
||||
|
||||
|
||||
int TypelessData::cmp(const RowGroup& r, const std::vector<uint32_t>& keyCols,
|
||||
const TypelessData &da, const TypelessData &db)
|
||||
{
|
||||
TypelessDataDecoder a(da);
|
||||
TypelessDataDecoder b(db);
|
||||
|
||||
for (uint32_t i = 0; i < keyCols.size(); i++)
|
||||
{
|
||||
switch (r.getColTypes()[keyCols[i]])
|
||||
{
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
case CalpontSystemCatalog::TEXT:
|
||||
{
|
||||
datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(keyCols[i]));
|
||||
ConstString ta = a.scanString();
|
||||
ConstString tb = b.scanString();
|
||||
if (int rc= cs.strnncollsp(ta, tb))
|
||||
return rc;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
ConstString ta = a.scanGeneric(8);
|
||||
ConstString tb = b.scanGeneric(8);
|
||||
idbassert(ta.length() == tb.length());
|
||||
if (int rc= memcmp(ta.str(), tb.str() , ta.length()))
|
||||
return rc;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0; // Equal
|
||||
}
|
||||
|
||||
|
||||
|
||||
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
|
||||
uint32_t keylen, FixedAllocator* fa,
|
||||
const rowgroup::RowGroup& otherSideRG, const std::vector<uint32_t>& otherKeyCols)
|
||||
{
|
||||
TypelessData ret;
|
||||
uint32_t off = 0, i, j;
|
||||
uint32_t off = 0, i;
|
||||
execplan::CalpontSystemCatalog::ColDataType type;
|
||||
|
||||
ret.data = (uint8_t*) fa->allocate();
|
||||
@ -1329,24 +1454,8 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
|
||||
// this is a string, copy a normalized version
|
||||
const uint8_t* str = r.getStringPointer(keyCols[i]);
|
||||
uint32_t width = r.getStringLength(keyCols[i]);
|
||||
|
||||
if (width > 65536)
|
||||
{
|
||||
throw runtime_error("Cannot join strings greater than 64KB");
|
||||
}
|
||||
|
||||
for (j = 0; j < width && str[j] != 0; j++)
|
||||
{
|
||||
if (off >= keylen)
|
||||
goto toolong;
|
||||
|
||||
ret.data[off++] = str[j];
|
||||
}
|
||||
|
||||
if (off >= keylen)
|
||||
if (TypelessDataStringEncoder(str, width).store(ret.data, off, keylen))
|
||||
goto toolong;
|
||||
|
||||
ret.data[off++] = 0;
|
||||
}
|
||||
else if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
|
||||
{
|
||||
@ -1436,7 +1545,7 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, Pool
|
||||
const rowgroup::RowGroup& otherSideRG, const std::vector<uint32_t>& otherKeyCols)
|
||||
{
|
||||
TypelessData ret;
|
||||
uint32_t off = 0, i, j;
|
||||
uint32_t off = 0, i;
|
||||
execplan::CalpontSystemCatalog::ColDataType type;
|
||||
|
||||
uint32_t keylen = 0;
|
||||
@ -1452,7 +1561,7 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, Pool
|
||||
keylen += sizeof(long double);
|
||||
}
|
||||
else if (r.isCharType(keyCols[i]))
|
||||
keylen += r.getStringLength(keyCols[i]) + 1;
|
||||
keylen += r.getStringLength(keyCols[i]) + 2;
|
||||
else
|
||||
keylen += 8;
|
||||
}
|
||||
@ -1470,16 +1579,7 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, Pool
|
||||
// this is a string, copy a normalized version
|
||||
const uint8_t* str = r.getStringPointer(keyCols[i]);
|
||||
uint32_t width = r.getStringLength(keyCols[i]);
|
||||
|
||||
if (width > 65536)
|
||||
{
|
||||
throw runtime_error("Cannot join strings greater than 64KB");
|
||||
}
|
||||
|
||||
for (j = 0; j < width && str[j] != 0; j++)
|
||||
ret.data[off++] = str[j];
|
||||
|
||||
ret.data[off++] = 0;
|
||||
TypelessDataStringEncoder(str, width).store(ret.data, off, keylen);
|
||||
}
|
||||
else if (type == CalpontSystemCatalog::LONGDOUBLE)
|
||||
{
|
||||
|
@ -68,6 +68,10 @@ public:
|
||||
void deserialize(messageqcpp::ByteStream&, utils::FixedAllocator&);
|
||||
void deserialize(messageqcpp::ByteStream&, utils::PoolAllocator&);
|
||||
std::string toString() const;
|
||||
uint32_t hash(const rowgroup::RowGroup&, const std::vector<uint32_t>& keyCols) const;
|
||||
static int cmp(const rowgroup::RowGroup&, const std::vector<uint32_t>& keyCols,
|
||||
const TypelessData &a,
|
||||
const TypelessData &b);
|
||||
};
|
||||
|
||||
inline bool TypelessData::operator==(const TypelessData& t) const
|
||||
@ -108,6 +112,18 @@ extern TypelessData makeTypelessKey(const rowgroup::Row&,
|
||||
extern uint64_t getHashOfTypelessKey(const rowgroup::Row&, const std::vector<uint32_t>&,
|
||||
uint32_t seed = 0);
|
||||
|
||||
class TypelessDataStructure
|
||||
{
|
||||
public:
|
||||
const rowgroup::RowGroup *mRowGroup;
|
||||
const std::vector<uint32_t> *mMap;
|
||||
TypelessDataStructure(const rowgroup::RowGroup *rg,
|
||||
const std::vector<uint32_t> *map)
|
||||
:mRowGroup(rg),
|
||||
mMap(map)
|
||||
{ }
|
||||
};
|
||||
|
||||
|
||||
class TupleJoiner
|
||||
{
|
||||
@ -143,6 +159,33 @@ public:
|
||||
utils::Hasher fHasher;
|
||||
};
|
||||
|
||||
|
||||
struct TypelessDataHasher: public TypelessDataStructure
|
||||
{
|
||||
TypelessDataHasher(const rowgroup::RowGroup *rg,
|
||||
const std::vector<uint32_t> *map)
|
||||
:TypelessDataStructure(rg, map)
|
||||
{ }
|
||||
inline size_t operator()(const TypelessData& e) const
|
||||
{
|
||||
return e.hash(*mRowGroup, *mMap);
|
||||
}
|
||||
};
|
||||
|
||||
struct TypelessDataComparator: public TypelessDataStructure
|
||||
{
|
||||
public:
|
||||
TypelessDataComparator(const rowgroup::RowGroup *rg,
|
||||
const std::vector<uint32_t> *map)
|
||||
:TypelessDataStructure(rg, map)
|
||||
{ }
|
||||
bool operator()(const TypelessData& a, const TypelessData& b) const
|
||||
{
|
||||
return !TypelessData::cmp(*mRowGroup, *mMap, a, b);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/* ctor to use for numeric join */
|
||||
TupleJoiner(
|
||||
const rowgroup::RowGroup& smallInput,
|
||||
|
Reference in New Issue
Block a user