1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

MCOL-1822 Intermediate checkin. DISTINCT not working.

This commit is contained in:
David Hall
2019-02-25 14:54:46 -06:00
parent ab931e7c51
commit a2aa4b8479
67 changed files with 2391 additions and 1018 deletions

View File

@ -45,7 +45,14 @@ TupleJoiner::TupleJoiner(
smallRG(smallInput), largeRG(largeInput), joinAlg(INSERTING), joinType(jt),
threadCount(1), typelessJoin(false), bSignedUnsignedJoin(false), uniqueLimit(100), finished(false)
{
if (smallRG.usesStringTable())
if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
{
STLPoolAllocator<pair<const long double, Row::Pointer> > alloc(64 * 1024 * 1024 + 1);
_pool = alloc.getPoolAllocator();
ld.reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
}
else if (smallRG.usesStringTable())
{
STLPoolAllocator<pair<const int64_t, Row::Pointer> > alloc(64 * 1024 * 1024 + 1);
_pool = alloc.getPoolAllocator();
@ -131,8 +138,14 @@ TupleJoiner::TupleJoiner(
if (keyLength > 65536)
keyLength = 65536;
}
else if (smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::LONGDOUBLE)
{
keyLength += sizeof(long double);
}
else
{
keyLength += 8;
}
// Set bSignedUnsignedJoin if one or more join columns are signed to unsigned compares.
if (smallRG.isUnsigned(smallKeyColumns[i]) != largeRG.isUnsigned(largeKeyColumns[i]))
@ -198,9 +211,20 @@ void TupleJoiner::insert(Row& r, bool zeroTheRid)
{
if (typelessJoin)
{
ht->insert(pair<TypelessData, Row::Pointer>
(makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc),
r.getPointer()));
TypelessData td = makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc,
largeRG, largeKeyColumns);
if (td.len > 0)
{
ht->insert(pair<TypelessData, Row::Pointer>(td, r.getPointer()));
}
}
else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE)
{
long double smallKey = r.getLongDoubleField(smallKeyColumns[0]);
if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL))
ld->insert(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
else
ld->insert(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
}
else if (!smallRG.usesStringTable())
{
@ -260,9 +284,16 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
thIterator it;
pair<thIterator, thIterator> range;
largeKey = makeTypelessKey(largeSideRow, largeKeyColumns, keyLength, &tmpKeyAlloc[threadID]);
it = ht->find(largeKey);
largeKey = makeTypelessKey(largeSideRow, largeKeyColumns, keyLength, &tmpKeyAlloc[threadID], smallRG, smallKeyColumns);
if (largeKey.len > 0)
{
it = ht->find(largeKey);
}
else
{
return;
}
if (it == ht->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
@ -271,6 +302,28 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
}
else if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE
&& ld)
{
// This is a compare of two long double
long double largeKey;
ldIterator it;
pair<ldIterator, ldIterator> range;
Row r;
largeKey = largeSideRow.getLongDoubleField(largeKeyColumns[0]);
it = ld->find(largeKey);
if (it == ld->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = ld->equal_range(largeKey);
for (; range.first != range.second; ++range.first)
{
matches->push_back(range.first->second);
}
}
else if (!smallRG.usesStringTable())
{
int64_t largeKey;
@ -278,7 +331,11 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
pair<iterator, iterator> range;
Row r;
if (largeSideRow.isUnsigned(largeKeyColumns[0]))
if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
{
largeKey = (int64_t)largeSideRow.getLongDoubleField(largeKeyColumns[0]);
}
else if (largeSideRow.isUnsigned(largeKeyColumns[0]))
{
largeKey = (int64_t)largeSideRow.getUintField(largeKeyColumns[0]);
}
@ -287,19 +344,41 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
largeKey = largeSideRow.getIntField(largeKeyColumns[0]);
}
it = h->find(largeKey);
if (it == end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = h->equal_range(largeKey);
//smallRG.initRow(&r);
for (; range.first != range.second; ++range.first)
if (ld)
{
//r.setData(range.first->second);
//cerr << "matched small side row: " << r.toString() << endl;
matches->push_back(range.first->second);
// Compare against long double
ldIterator it;
pair<ldIterator, ldIterator> range;
long double ldKey = largeKey;
// ldKey = 6920;
it = ld->find(ldKey);
if (it == ld->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = ld->equal_range(ldKey);
for (; range.first != range.second; ++range.first)
{
matches->push_back(range.first->second);
}
}
else
{
it = h->find(largeKey);
if (it == h->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = h->equal_range(largeKey);
//smallRG.initRow(&r);
for (; range.first != range.second; ++range.first)
{
//r.setData(range.first->second);
//cerr << "matched small side row: " << r.toString() << endl;
matches->push_back(range.first->second);
}
}
}
else
@ -335,7 +414,14 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
if (UNLIKELY(inUM() && (joinType & MATCHNULLS) && !isNull && !typelessJoin))
{
if (!smallRG.usesStringTable())
if (smallRG.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
{
pair<ldIterator, ldIterator> range = ld->equal_range(joblist::LONGDOUBLENULL);
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
}
else if (!smallRG.usesStringTable())
{
pair<iterator, iterator> range = h->equal_range(getJoinNullValue());
@ -357,7 +443,14 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
{
if (!typelessJoin)
{
if (!smallRG.usesStringTable())
if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
{
ldIterator it;
for (it = ld->begin(); it != ld->end(); ++it)
matches->push_back(it->second);
}
else if (!smallRG.usesStringTable())
{
iterator it;
@ -410,6 +503,7 @@ void TupleJoiner::doneInserting()
tr1::unordered_set<int64_t>::iterator uit;
sthash_t::iterator sthit;
hash_t::iterator hit;
ldhash_t::iterator ldit;
typelesshash_t::iterator thit;
uint32_t i, pmpos = 0, rowCount;
Row smallRow;
@ -425,6 +519,8 @@ void TupleJoiner::doneInserting()
pmpos = 0;
else if (typelessJoin)
thit = ht->begin();
else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
ldit = ld->begin();
else if (!smallRG.usesStringTable())
hit = h->begin();
else
@ -439,6 +535,11 @@ void TupleJoiner::doneInserting()
smallRow.setPointer(thit->second);
++thit;
}
else if (smallRG.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
{
smallRow.setPointer(ldit->second);
++ldit;
}
else if (!smallRG.usesStringTable())
{
smallRow.setPointer(hit->second);
@ -450,6 +551,25 @@ void TupleJoiner::doneInserting()
++sthit;
}
if (smallRow.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
{
double dval = (double)roundl(smallRow.getLongDoubleField(smallKeyColumns[col]));
switch (largeRG.getColType(largeKeyColumns[col]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
uniquer.insert(*(int64_t*)&dval);
}
default:
{
uniquer.insert((int64_t)dval);
}
}
}
else
if (smallRow.isUnsigned(smallKeyColumns[col]))
{
uniquer.insert((int64_t)smallRow.getUintField(smallKeyColumns[col]));
@ -614,11 +734,23 @@ void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
out->push_back(it->second);
}
}
else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
{
ldIterator it;
for (it = ld->begin(); it != ld->end(); ++it)
{
smallR.setPointer(it->second);
if (!smallR.isMarked())
out->push_back(it->second);
}
}
else if (!smallRG.usesStringTable())
{
iterator it;
for (it = begin(); it != end(); ++it)
for (it = h->begin(); it != h->end(); ++it)
{
smallR.setPointer(it->second);
@ -670,8 +802,9 @@ void TupleJoiner::updateCPData(const Row& r)
for (col = 0; col < smallKeyColumns.size(); col++)
{
// if (r.getColumnWidth(smallKeyColumns[col]) > 8)
if (r.isLongString(smallKeyColumns[col]))
continue;
continue;
int64_t& min = cpValues[col][0], &max = cpValues[col][1];
@ -693,7 +826,29 @@ void TupleJoiner::updateCPData(const Row& r)
}
else if (r.isUnsigned(smallKeyColumns[col]))
{
uint64_t uval = r.getUintField(smallKeyColumns[col]);
uint64_t uval;
if (r.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
{
double dval = (double)roundl(r.getLongDoubleField(smallKeyColumns[col]));
switch (largeRG.getColType(largeKeyColumns[col]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
uval = *(uint64_t*)&dval;
}
default:
{
uval = (uint64_t)dval;
}
}
}
else
{
uval = r.getUintField(smallKeyColumns[col]);
}
if (uval > static_cast<uint64_t>(max))
max = static_cast<int64_t>(uval);
@ -703,7 +858,29 @@ void TupleJoiner::updateCPData(const Row& r)
}
else
{
int64_t val = r.getIntField(smallKeyColumns[col]);
int64_t val;
if (r.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
{
double dval = (double)roundl(r.getLongDoubleField(smallKeyColumns[col]));
switch (largeRG.getColType(largeKeyColumns[col]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
val = *(int64_t*)&dval;
}
default:
{
val = (int64_t)dval;
}
}
}
else
{
val = r.getIntField(smallKeyColumns[col]);
}
if (val > max)
max = val;
@ -720,6 +897,8 @@ size_t TupleJoiner::size() const
{
if (UNLIKELY(typelessJoin))
return ht->size();
else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
return ld->size();
else if (!smallRG.usesStringTable())
return h->size();
else
@ -768,20 +947,18 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
ret.data[off++] = 0;
}
else if (r.isUnsigned(keyCols[i]))
{
if (off + 8 > keylen)
goto toolong;
*((uint64_t*) &ret.data[off]) = r.getUintField(keyCols[i]);
off += 8;
}
else
{
if (off + 8 > keylen)
goto toolong;
if (r.isUnsigned(keyCols[i]))
{
*((uint64_t*) &ret.data[off]) = r.getUintField(keyCols[i]);
}
else
{
*((int64_t*) &ret.data[off]) = r.getIntField(keyCols[i]);
}
*((int64_t*) &ret.data[off]) = r.getIntField(keyCols[i]);
off += 8;
}
}
@ -795,7 +972,132 @@ toolong:
return ret;
}
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, PoolAllocator* fa)
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols,
uint32_t keylen, FixedAllocator* fa,
const rowgroup::RowGroup& otherSideRG, const std::vector<uint32_t>& otherKeyCols)
{
TypelessData ret;
uint32_t off = 0, i, j;
execplan::CalpontSystemCatalog::ColDataType type;
ret.data = (uint8_t*) fa->allocate();
for (i = 0; i < keyCols.size(); i++)
{
type = r.getColTypes()[keyCols[i]];
if (type == CalpontSystemCatalog::VARCHAR ||
type == CalpontSystemCatalog::CHAR ||
type == CalpontSystemCatalog::TEXT)
{
// this is a string, copy a normalized version
const uint8_t* str = r.getStringPointer(keyCols[i]);
uint32_t width = r.getStringLength(keyCols[i]);
if (width > 65536)
{
throw runtime_error("Cannot join strings greater than 64KB");
}
for (j = 0; j < width && str[j] != 0; j++)
{
if (off >= keylen)
goto toolong;
ret.data[off++] = str[j];
}
if (off >= keylen)
goto toolong;
ret.data[off++] = 0;
}
else if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
{
if (off + sizeof(long double) > keylen)
goto toolong;
// Small side is a long double. Since CS can't store larger than DOUBLE,
// we need to convert to whatever type large side is -- double or int64
long double keyld = r.getLongDoubleField(keyCols[i]);
switch (otherSideRG.getColType(otherKeyCols[i]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
if (off + 8 > keylen)
goto toolong;
if (keyld > MAX_DOUBLE || keyld < MIN_DOUBLE)
{
ret.len = 0;
return ret;
}
else
{
double d = (double)keyld;
*((int64_t*) &ret.data[off]) = *(int64_t*)&d;
}
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
if (off + sizeof(long double) > keylen)
goto toolong;
*((long double*) &ret.data[off]) = keyld;
off += sizeof(long double);
break;
}
default:
{
if (off + 8 > keylen)
goto toolong;
if (r.isUnsigned(keyCols[i]) && keyld > MAX_UBIGINT)
{
ret.len = 0;
return ret;
}
else if (keyld > MAX_BIGINT || keyld < MIN_BIGINT)
{
ret.len = 0;
return ret;
}
else
{
*((int64_t*) &ret.data[off]) = (int64_t)keyld;
off += 8;
}
break;
}
}
}
else if (r.isUnsigned(keyCols[i]))
{
if (off + 8 > keylen)
goto toolong;
*((uint64_t*) &ret.data[off]) = r.getUintField(keyCols[i]);
off += 8;
}
else
{
if (off + 8 > keylen)
goto toolong;
*((int64_t*) &ret.data[off]) = r.getIntField(keyCols[i]);
off += 8;
}
}
ret.len = off;
fa->truncateBy(keylen - off);
return ret;
toolong:
fa->truncateBy(keylen);
ret.len = 0;
return ret;
}
TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, PoolAllocator* fa,
const rowgroup::RowGroup& otherSideRG, const std::vector<uint32_t>& otherKeyCols)
{
TypelessData ret;
uint32_t off = 0, i, j;
@ -808,7 +1110,12 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, Pool
{
type = r.getColTypes()[keyCols[i]];
if (r.isCharType(keyCols[i]))
if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE
&& otherSideRG.getColType(otherKeyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
{
keylen += sizeof(long double);
}
else if (r.isCharType(keyCols[i]))
keylen += r.getStringLength(keyCols[i]) + 1;
else
keylen += 8;
@ -838,17 +1145,66 @@ TypelessData makeTypelessKey(const Row& r, const vector<uint32_t>& keyCols, Pool
ret.data[off++] = 0;
}
else if (type == CalpontSystemCatalog::LONGDOUBLE)
{
// Small side is a long double. Since CS can't store larger than DOUBLE,
// we need to convert to whatever type large side is -- double or int64
long double keyld = r.getLongDoubleField(keyCols[i]);
switch (otherSideRG.getColType(otherKeyCols[i]))
{
case CalpontSystemCatalog::DOUBLE:
case CalpontSystemCatalog::UDOUBLE:
case CalpontSystemCatalog::FLOAT:
case CalpontSystemCatalog::UFLOAT:
{
if (keyld > MAX_DOUBLE || keyld < MIN_DOUBLE)
{
ret.len = 0;
return ret;
}
else
{
double d = (double)keyld;
*((int64_t*) &ret.data[off]) = *(int64_t*)&d;
off += 8;
}
break;
}
case CalpontSystemCatalog::LONGDOUBLE:
{
*((long double*) &ret.data[off]) = keyld;
off += sizeof(long double);
break;
}
default:
{
if (r.isUnsigned(keyCols[i]) && keyld > MAX_UBIGINT)
{
ret.len = 0;
return ret;
}
else if (keyld > MAX_BIGINT || keyld < MIN_BIGINT)
{
ret.len = 0;
return ret;
}
else
{
*((int64_t*) &ret.data[off]) = (int64_t)keyld;
off += 8;
}
break;
}
}
}
else if (r.isUnsigned(keyCols[i]))
{
*((uint64_t*)&ret.data[off]) = r.getUintField(keyCols[i]);
off += 8;
}
else
{
if (r.isUnsigned(keyCols[i]))
{
*((uint64_t*)&ret.data[off]) = r.getUintField(keyCols[i]);
}
else
{
*((int64_t*)&ret.data[off]) = r.getIntField(keyCols[i]);
}
*((int64_t*)&ret.data[off]) = r.getIntField(keyCols[i]);
off += 8;
}
}
@ -881,25 +1237,29 @@ uint64_t getHashOfTypelessKey(const Row& r, const vector<uint32_t>& keyCols, uin
ret = hasher((const char*) str, len, ret);
/*
for (uint32_t j = 0; j < width && str[j] != 0; j++)
ret.data[off++] = str[j];
ret.data[off++] = str[j];
*/
ret = hasher(&nullChar, 1, ret);
width += len + 1;
}
else if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE)
{
long double tmp = r.getLongDoubleField(keyCols[i]);
ret = hasher((char*) &tmp, sizeof(long double), ret);
width += sizeof(long double);
}
else
if (r.isUnsigned(keyCols[i]))
{
tmp = r.getUintField(keyCols[i]);
ret = hasher((char*) &tmp, 8, ret);
width += 8;
}
else
{
tmp = r.getIntField(keyCols[i]);
ret = hasher((char*) &tmp, 8, ret);
width += 8;
if (r.isUnsigned(keyCols[i]))
{
tmp = r.getUintField(keyCols[i]);
ret = hasher((char*) &tmp, 8, ret);
}
else
{
tmp = r.getIntField(keyCols[i]);
ret = hasher((char*) &tmp, 8, ret);
}
}
}