You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-4753 Performance problem in Typeless join
This commit is contained in:
committed by
Roman Nozdrin
parent
c6d0b46bc6
commit
b3d6f62964
@ -1195,9 +1195,8 @@ void BatchPrimitiveProcessor::executeTupleJoin()
|
||||
{
|
||||
//cout << " typeless join\n";
|
||||
// the null values are not sent by UM in typeless case. null -> !found
|
||||
tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j],
|
||||
&tmpKeyAllocators[j]);
|
||||
uint bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[j]) & ptMask;
|
||||
tlLargeKey = TypelessData(&oldRow);
|
||||
uint bucket = oldRow.hashTypeless(tlLargeSideKeyColumns[j]) & ptMask;
|
||||
found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
|
||||
|
||||
if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
|
||||
@ -2714,11 +2713,10 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
|
||||
}
|
||||
}
|
||||
|
||||
TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex],
|
||||
tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]);
|
||||
pair<TLJoiner::iterator, TLJoiner::iterator> range;
|
||||
bucket = largeKey.hash(outputRG, tlLargeSideKeyColumns[jIndex]) & ptMask;
|
||||
range = tlJoiners[jIndex][bucket]->equal_range(largeKey);
|
||||
TypelessData largeKey(&r);
|
||||
bucket = r.hashTypeless(tlLargeSideKeyColumns[jIndex]) & ptMask;
|
||||
pair<TLJoiner::iterator, TLJoiner::iterator> range =
|
||||
tlJoiners[jIndex][bucket]->equal_range(largeKey);
|
||||
for (; range.first != range.second; ++range.first)
|
||||
v.push_back(range.first->second);
|
||||
}
|
||||
|
@ -1321,58 +1321,11 @@ public:
|
||||
};
|
||||
|
||||
|
||||
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
|
||||
uint32_t keylen, FixedAllocator* fa)
|
||||
{
|
||||
TypelessData ret;
|
||||
uint32_t off = 0, i;
|
||||
execplan::CalpontSystemCatalog::ColDataType type;
|
||||
|
||||
ret.data = (uint8_t*) fa->allocate();
|
||||
|
||||
for (i = 0; i < keyCols.size(); i++)
|
||||
{
|
||||
type = r.getColTypes()[keyCols[i]];
|
||||
|
||||
if (type == CalpontSystemCatalog::VARCHAR ||
|
||||
type == CalpontSystemCatalog::CHAR ||
|
||||
type == CalpontSystemCatalog::TEXT)
|
||||
{
|
||||
// this is a string, copy a normalized version
|
||||
const uint8_t* str = r.getStringPointer(keyCols[i]);
|
||||
uint32_t width = r.getStringLength(keyCols[i]);
|
||||
if (TypelessDataStringEncoder(str, width).store(ret.data, off, keylen))
|
||||
goto toolong;
|
||||
}
|
||||
else if (r.isUnsigned(keyCols[i]))
|
||||
{
|
||||
if (off + 8 > keylen)
|
||||
goto toolong;
|
||||
*((uint64_t*) &ret.data[off]) = r.getUintField(keyCols[i]);
|
||||
off += 8;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (off + 8 > keylen)
|
||||
goto toolong;
|
||||
*((int64_t*) &ret.data[off]) = r.getIntField(keyCols[i]);
|
||||
off += 8;
|
||||
}
|
||||
}
|
||||
|
||||
ret.len = off;
|
||||
fa->truncateBy(keylen - off);
|
||||
return ret;
|
||||
toolong:
|
||||
fa->truncateBy(keylen);
|
||||
ret.len = 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
uint32 TypelessData::hash(const RowGroup& r,
|
||||
const std::vector<uint32_t>& keyCols) const
|
||||
{
|
||||
if (mRowPtr)
|
||||
return mRowPtr->hashTypeless(keyCols);
|
||||
TypelessDataDecoder decoder(*this);
|
||||
datatypes::MariaDBHasher hasher;
|
||||
for (uint32_t i = 0; i < keyCols.size(); i++)
|
||||
@ -1398,9 +1351,59 @@ uint32 TypelessData::hash(const RowGroup& r,
|
||||
}
|
||||
|
||||
|
||||
int TypelessData::cmpToRow(const RowGroup& r,
|
||||
const std::vector<uint32_t>& keyCols,
|
||||
const rowgroup::Row &row) const
|
||||
{
|
||||
TypelessDataDecoder a(*this);
|
||||
|
||||
for (uint32_t i = 0; i < keyCols.size(); i++)
|
||||
{
|
||||
switch (r.getColTypes()[keyCols[i]])
|
||||
{
|
||||
case CalpontSystemCatalog::VARCHAR:
|
||||
case CalpontSystemCatalog::CHAR:
|
||||
case CalpontSystemCatalog::TEXT:
|
||||
{
|
||||
datatypes::Charset cs(*const_cast<RowGroup&>(r).getCharset(keyCols[i]));
|
||||
ConstString ta = a.scanString();
|
||||
ConstString tb = row.getConstString(keyCols[i]);
|
||||
if (int rc= cs.strnncollsp(ta, tb))
|
||||
return rc;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
ConstString ta = a.scanGeneric(datatypes::MAXLEGACYWIDTH);
|
||||
if (r.isUnsigned(keyCols[i]))
|
||||
{
|
||||
uint64_t tb = row.getUintField(keyCols[i]);
|
||||
if (int rc= memcmp(ta.str(), &tb , datatypes::MAXLEGACYWIDTH))
|
||||
return rc;
|
||||
}
|
||||
else
|
||||
{
|
||||
int64_t tb = row.getIntField(keyCols[i]);
|
||||
if (int rc= memcmp(ta.str(), &tb , datatypes::MAXLEGACYWIDTH))
|
||||
return rc;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0; // Equal
|
||||
}
|
||||
|
||||
|
||||
int TypelessData::cmp(const RowGroup& r, const std::vector<uint32_t>& keyCols,
|
||||
const TypelessData &da, const TypelessData &db)
|
||||
{
|
||||
idbassert((da.mRowPtr == nullptr) + (db.mRowPtr == nullptr) > 0);
|
||||
if (da.mRowPtr)
|
||||
return -db.cmpToRow(r, keyCols, da.mRowPtr[0]);
|
||||
if (db.mRowPtr)
|
||||
return da.cmpToRow(r, keyCols, db.mRowPtr[0]);
|
||||
|
||||
TypelessDataDecoder a(da);
|
||||
TypelessDataDecoder b(db);
|
||||
|
||||
@ -1543,114 +1546,6 @@ toolong:
|
||||
return ret;
|
||||
}
|
||||
|
||||
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, PoolAllocator* fa,
|
||||
const rowgroup::RowGroup& otherSideRG, const std::vector<uint32_t>& otherKeyCols)
|
||||
{
|
||||
TypelessData ret;
|
||||
uint32_t off = 0, i;
|
||||
execplan::CalpontSystemCatalog::ColDataType type;
|
||||
|
||||
uint32_t keylen = 0;
|
||||
|
||||
/* get the length of the normalized key... */
|
||||
for (i = 0; i < keyCols.size(); i++)
|
||||
{
|
||||
type = r.getColTypes()[keyCols[i]];
|
||||
|
||||
if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE
|
||||
&& otherSideRG.getColType(otherKeyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
|
||||
{
|
||||
keylen += sizeof(long double);
|
||||
}
|
||||
else if (r.isCharType(keyCols[i]))
|
||||
keylen += r.getStringLength(keyCols[i]) + 2;
|
||||
else
|
||||
keylen += 8;
|
||||
}
|
||||
|
||||
ret.data = (uint8_t*) fa->allocate(keylen);
|
||||
|
||||
for (i = 0; i < keyCols.size(); i++)
|
||||
{
|
||||
type = r.getColTypes()[keyCols[i]];
|
||||
|
||||
if (type == CalpontSystemCatalog::VARCHAR ||
|
||||
type == CalpontSystemCatalog::CHAR ||
|
||||
type == CalpontSystemCatalog::TEXT)
|
||||
{
|
||||
// this is a string, copy a normalized version
|
||||
const uint8_t* str = r.getStringPointer(keyCols[i]);
|
||||
uint32_t width = r.getStringLength(keyCols[i]);
|
||||
TypelessDataStringEncoder(str, width).store(ret.data, off, keylen);
|
||||
}
|
||||
else if (type == CalpontSystemCatalog::LONGDOUBLE)
|
||||
{
|
||||
// Small side is a long double. Since CS can't store larger than DOUBLE,
|
||||
// we need to convert to whatever type large side is -- double or int64
|
||||
long double keyld = r.getLongDoubleField(keyCols[i]);
|
||||
switch (otherSideRG.getColType(otherKeyCols[i]))
|
||||
{
|
||||
case CalpontSystemCatalog::DOUBLE:
|
||||
case CalpontSystemCatalog::UDOUBLE:
|
||||
case CalpontSystemCatalog::FLOAT:
|
||||
case CalpontSystemCatalog::UFLOAT:
|
||||
{
|
||||
if (keyld > MAX_DOUBLE || keyld < MIN_DOUBLE)
|
||||
{
|
||||
ret.len = 0;
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
double d = (double)keyld;
|
||||
*((int64_t*) &ret.data[off]) = *(int64_t*)&d;
|
||||
off += 8;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case CalpontSystemCatalog::LONGDOUBLE:
|
||||
{
|
||||
*((long double*) &ret.data[off]) = keyld;
|
||||
off += sizeof(long double);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
if (r.isUnsigned(keyCols[i]) && keyld > MAX_UBIGINT)
|
||||
{
|
||||
ret.len = 0;
|
||||
return ret;
|
||||
}
|
||||
else if (keyld > MAX_BIGINT || keyld < MIN_BIGINT)
|
||||
{
|
||||
ret.len = 0;
|
||||
return ret;
|
||||
}
|
||||
else
|
||||
{
|
||||
*((int64_t*) &ret.data[off]) = (int64_t)keyld;
|
||||
off += 8;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (r.isUnsigned(keyCols[i]))
|
||||
{
|
||||
*((uint64_t*)&ret.data[off]) = r.getUintField(keyCols[i]);
|
||||
off += 8;
|
||||
}
|
||||
else
|
||||
{
|
||||
*((int64_t*)&ret.data[off]) = r.getIntField(keyCols[i]);
|
||||
off += 8;
|
||||
}
|
||||
}
|
||||
|
||||
assert(off == keylen);
|
||||
ret.len = off;
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint64_t getHashOfTypelessKey(const Row& r, const vector<uint32_t>& keyCols, uint32_t seed)
|
||||
{
|
||||
|
@ -49,8 +49,10 @@ class TypelessData
|
||||
public:
|
||||
uint8_t* data;
|
||||
uint32_t len;
|
||||
const rowgroup::Row *mRowPtr;
|
||||
|
||||
TypelessData() : data(NULL), len(0) { }
|
||||
TypelessData() : data(NULL), len(0), mRowPtr(nullptr) { }
|
||||
TypelessData(const rowgroup::Row *rowPtr) : data(NULL), len(0), mRowPtr(rowPtr) { }
|
||||
inline bool operator==(const TypelessData&) const;
|
||||
void serialize(messageqcpp::ByteStream&) const;
|
||||
void deserialize(messageqcpp::ByteStream&, utils::FixedAllocator&);
|
||||
@ -60,6 +62,8 @@ public:
|
||||
static int cmp(const rowgroup::RowGroup&, const std::vector<uint32_t>& keyCols,
|
||||
const TypelessData &a,
|
||||
const TypelessData &b);
|
||||
int cmpToRow(const rowgroup::RowGroup& r, const std::vector<uint32_t>& keyCols,
|
||||
const rowgroup::Row &db) const;
|
||||
};
|
||||
|
||||
inline bool TypelessData::operator==(const TypelessData& t) const
|
||||
@ -88,15 +92,10 @@ public:
|
||||
* key is limited by keylen. Keys that are longer are assigned a length of 0 on return,
|
||||
* signifying that it shouldn't match anything.
|
||||
*/
|
||||
extern TypelessData makeTypelessKey(const rowgroup::Row&,
|
||||
const std::vector<uint32_t>&, uint32_t keylen, utils::FixedAllocator* fa);
|
||||
// MCOL-1822 SUM/AVG as long double: pass in RG and col so we can determine type conversion
|
||||
extern TypelessData makeTypelessKey(const rowgroup::Row&,
|
||||
const std::vector<uint32_t>&, uint32_t keylen, utils::FixedAllocator* fa,
|
||||
const rowgroup::RowGroup&, const std::vector<uint32_t>&);
|
||||
extern TypelessData makeTypelessKey(const rowgroup::Row&,
|
||||
const std::vector<uint32_t>&, utils::PoolAllocator* fa,
|
||||
const rowgroup::RowGroup&, const std::vector<uint32_t>&);
|
||||
extern uint64_t getHashOfTypelessKey(const rowgroup::Row&, const std::vector<uint32_t>&,
|
||||
uint32_t seed = 0);
|
||||
|
||||
|
@ -559,6 +559,14 @@ public:
|
||||
inline uint64_t hash(uint32_t lastCol) const; // generates a hash for cols [0-lastCol]
|
||||
inline uint64_t hash() const; // generates a hash for all cols
|
||||
inline void colUpdateMariaDBHasher(datatypes::MariaDBHasher &hasher, uint32_t col) const;
|
||||
inline void colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &hasher, uint32_t col) const;
|
||||
inline uint64_t hashTypeless(const std::vector<uint32_t>& keyCols) const
|
||||
{
|
||||
datatypes::MariaDBHasher h;
|
||||
for (uint32_t i = 0; i < keyCols.size(); i++)
|
||||
colUpdateMariaDBHasherTypeless(h, keyCols[i]);
|
||||
return h.finalize();
|
||||
}
|
||||
|
||||
bool equals(const Row&, uint32_t lastCol) const;
|
||||
inline bool equals(const Row&) const;
|
||||
@ -942,6 +950,38 @@ inline void Row::colUpdateMariaDBHasher(datatypes::MariaDBHasher &h, uint32_t co
|
||||
}
|
||||
|
||||
|
||||
inline void Row::colUpdateMariaDBHasherTypeless(datatypes::MariaDBHasher &h, uint32_t col) const
|
||||
{
|
||||
switch (getColType(col))
|
||||
{
|
||||
case datatypes::SystemCatalog::CHAR:
|
||||
case datatypes::SystemCatalog::VARCHAR:
|
||||
case datatypes::SystemCatalog::BLOB:
|
||||
case datatypes::SystemCatalog::TEXT:
|
||||
{
|
||||
CHARSET_INFO *cs = getCharset(col);
|
||||
h.add(cs, getConstString(col));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
if (isUnsigned(col))
|
||||
{
|
||||
uint64_t tb = getUintField(col);
|
||||
h.add(&my_charset_bin, (const char*) &tb, 8);
|
||||
}
|
||||
else
|
||||
{
|
||||
int64_t val = getIntField(col);
|
||||
h.add(&my_charset_bin, (const char*) &val, 8);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
inline void Row::setStringField(const uint8_t* strdata, uint32_t length, uint32_t colIndex)
|
||||
{
|
||||
uint64_t offset;
|
||||
|
@ -593,7 +593,7 @@ int ColumnBufferCompressed::saveCompressionHeaders( )
|
||||
// If lbid written in the header is not 0 and not equal to `lastupdatedlbid` - we are running
|
||||
// for the next extent for column segment file.
|
||||
const auto lastUpdatedLbid = fColInfo->getLastUpdatedLBID();
|
||||
if (lbid && (uint64_t)lastUpdatedLbid != lbid)
|
||||
if (lbid && lastUpdatedLbid != lbid)
|
||||
{
|
||||
// Write back lbid, after header initialization.
|
||||
fCompressor->setLBIDByIndex(hdrBuf, lbid, 0);
|
||||
|
Reference in New Issue
Block a user