diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 869c9758f..5cb5f876f 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -354,7 +354,11 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> tlKeyLengths[i]; //storedKeyAllocators[i] = PoolAllocator(); for (uint j = 0; j < processorThreads; ++j) - tlJoiners[i][j].reset(new TLJoiner(10, TupleJoiner::hasher())); + tlJoiners[i][j].reset(new TLJoiner(10, + TupleJoiner::TypelessDataHasher(&outputRG, + &tlLargeSideKeyColumns[i]), + TupleJoiner::TypelessDataComparator(&outputRG, + &tlLargeSideKeyColumns[i]))); } } @@ -613,7 +617,7 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) { tlLargeKey.deserialize(bs, storedKeyAllocator); bs >> tlIndex; - bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask; + bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[joinerNum]) & ptMask; tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex)); } else @@ -1188,7 +1192,7 @@ void BatchPrimitiveProcessor::executeTupleJoin() // the null values are not sent by UM in typeless case. null -> !found tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j], &tmpKeyAllocators[j]); - uint bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask; + uint bucket = tlLargeKey.hash(outputRG, tlLargeSideKeyColumns[j]) & ptMask; found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end(); if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) || @@ -2736,7 +2740,7 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex], tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]); pair range; - bucket = bucketPicker((char *) largeKey.data, largeKey.len, bpSeed) & ptMask; + bucket = largeKey.hash(outputRG, tlLargeSideKeyColumns[jIndex]) & ptMask; range = tlJoiners[jIndex][bucket]->equal_range(largeKey); for (; range.first != range.second; ++range.first) v.push_back(range.first->second); diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index fd1b3992e..b72001249 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -296,7 +296,9 @@ private: utils::STLPoolAllocator > > TJoiner; typedef std::tr1::unordered_multimap, + uint32_t, + joiner::TupleJoiner::TypelessDataHasher, + joiner::TupleJoiner::TypelessDataComparator, utils::STLPoolAllocator > > TLJoiner; bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0); diff --git a/utils/common/collation.h b/utils/common/collation.h index d26129933..88cd2821a 100644 --- a/utils/common/collation.h +++ b/utils/common/collation.h @@ -98,6 +98,10 @@ public: cs->hash_sort((const uchar *) str, length, &mPart1, &mPart2); return *this; } + MariaDBHasher & add(CHARSET_INFO *cs, const utils::ConstString &str) + { + return add(cs, str.str(), str.length()); + } uint32_t finalize() const { return (uint32_t) mPart1; diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index 7c6793bf9..aed391d0c 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -188,7 +188,7 @@ TupleJoiner::TupleJoiner( || smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::TEXT) { - keyLength += smallRG.getColumnWidth(smallKeyColumns[i]) + 1; // +1 null char + keyLength += smallRG.getColumnWidth(smallKeyColumns[i]) + 2; // +2 for length // MCOL-698: if we don't do this LONGTEXT allocates 32TB RAM if (keyLength > 65536) @@ -1244,11 +1244,86 @@ size_t TupleJoiner::size() const return rows.size(); } + +class TypelessDataStringEncoder +{ + const uint8_t* mStr; + uint32_t mLength; +public: + TypelessDataStringEncoder(const uint8_t *str, uint32_t length) + :mStr(str), mLength(length) + { } + bool store(uint8_t* to, uint32_t& off, uint32_t keylen) const + { + if (mLength > 0xFFFF) // We encode length into two bytes below + { + throw runtime_error("Cannot join strings greater than 64KB"); + } + + if (off + mLength + 2 > keylen) + return true; + + to[off++]= mLength / 0xFF; + to[off++]= mLength % 0xFF; + /* + QQ: perhaps now when we put length, + we don't need to stop at '\0' bytes any more. + If so, the loop below can be replace to memcpy(). + */ + for (uint32_t j = 0; j < mLength && mStr[j] != 0; j++) + { + if (off >= keylen) + return true; + to[off++] = mStr[j]; + } + + return false; + } +}; + + +class TypelessDataDecoder +{ + const uint8_t *mPtr; + const uint8_t *mEnd; + void checkAvailableData(uint32_t nbytes) const + { + if (mPtr + nbytes > mEnd) + throw runtime_error("TypelessData is too short"); + } +public: + TypelessDataDecoder(const uint8_t* ptr, size_t length) + :mPtr(ptr), mEnd(ptr + length) + { } + TypelessDataDecoder(const TypelessData &data) + :TypelessDataDecoder(data.data, data.len) + { } + ConstString scanGeneric(uint32_t length) + { + checkAvailableData(length); + ConstString res((const char *) mPtr, length); + mPtr += length; + return res; + } + uint32_t scanStringLength() + { + checkAvailableData(2); + uint32_t res = ((uint32_t) mPtr[0]) * 255 + mPtr[1]; + mPtr += 2; + return res; + } + ConstString scanString() + { + return scanGeneric(scanStringLength()); + } +}; + + TypelessData makeTypelessKey(const Row& r, const vector& keyCols, uint32_t keylen, FixedAllocator* fa) { TypelessData ret; - uint32_t off = 0, i, j; + uint32_t off = 0, i; execplan::CalpontSystemCatalog::ColDataType type; ret.data = (uint8_t*) fa->allocate(); @@ -1264,24 +1339,8 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, // this is a string, copy a normalized version const uint8_t* str = r.getStringPointer(keyCols[i]); uint32_t width = r.getStringLength(keyCols[i]); - - if (width > 65536) - { - throw runtime_error("Cannot join strings greater than 64KB"); - } - - for (j = 0; j < width && str[j] != 0; j++) - { - if (off >= keylen) - goto toolong; - - ret.data[off++] = str[j]; - } - - if (off >= keylen) + if (TypelessDataStringEncoder(str, width).store(ret.data, off, keylen)) goto toolong; - - ret.data[off++] = 0; } else if (r.isUnsigned(keyCols[i])) { @@ -1308,12 +1367,78 @@ toolong: return ret; } + +uint32 TypelessData::hash(const RowGroup& r, + const std::vector& keyCols) const +{ + TypelessDataDecoder decoder(*this); + datatypes::MariaDBHasher hasher; + for (uint32_t i = 0; i < keyCols.size(); i++) + { + switch (r.getColTypes()[keyCols[i]]) + { + case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + { + CHARSET_INFO *cs= const_cast(r).getCharset(keyCols[i]); + hasher.add(cs, decoder.scanString()); + break; + } + default: + { + hasher.add(&my_charset_bin, decoder.scanGeneric(8)); + break; + } + } + } + return hasher.finalize(); +} + + +int TypelessData::cmp(const RowGroup& r, const std::vector& keyCols, + const TypelessData &da, const TypelessData &db) +{ + TypelessDataDecoder a(da); + TypelessDataDecoder b(db); + + for (uint32_t i = 0; i < keyCols.size(); i++) + { + switch (r.getColTypes()[keyCols[i]]) + { + case CalpontSystemCatalog::VARCHAR: + case CalpontSystemCatalog::CHAR: + case CalpontSystemCatalog::TEXT: + { + datatypes::Charset cs(*const_cast(r).getCharset(keyCols[i])); + ConstString ta = a.scanString(); + ConstString tb = b.scanString(); + if (int rc= cs.strnncollsp(ta, tb)) + return rc; + break; + } + default: + { + ConstString ta = a.scanGeneric(8); + ConstString tb = b.scanGeneric(8); + idbassert(ta.length() == tb.length()); + if (int rc= memcmp(ta.str(), tb.str() , ta.length())) + return rc; + break; + } + } + } + return 0; // Equal +} + + + TypelessData makeTypelessKey(const Row& r, const vector& keyCols, uint32_t keylen, FixedAllocator* fa, const rowgroup::RowGroup& otherSideRG, const std::vector& otherKeyCols) { TypelessData ret; - uint32_t off = 0, i, j; + uint32_t off = 0, i; execplan::CalpontSystemCatalog::ColDataType type; ret.data = (uint8_t*) fa->allocate(); @@ -1329,24 +1454,8 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, // this is a string, copy a normalized version const uint8_t* str = r.getStringPointer(keyCols[i]); uint32_t width = r.getStringLength(keyCols[i]); - - if (width > 65536) - { - throw runtime_error("Cannot join strings greater than 64KB"); - } - - for (j = 0; j < width && str[j] != 0; j++) - { - if (off >= keylen) - goto toolong; - - ret.data[off++] = str[j]; - } - - if (off >= keylen) + if (TypelessDataStringEncoder(str, width).store(ret.data, off, keylen)) goto toolong; - - ret.data[off++] = 0; } else if (r.getColType(keyCols[i]) == CalpontSystemCatalog::LONGDOUBLE) { @@ -1436,7 +1545,7 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, Pool const rowgroup::RowGroup& otherSideRG, const std::vector& otherKeyCols) { TypelessData ret; - uint32_t off = 0, i, j; + uint32_t off = 0, i; execplan::CalpontSystemCatalog::ColDataType type; uint32_t keylen = 0; @@ -1452,7 +1561,7 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, Pool keylen += sizeof(long double); } else if (r.isCharType(keyCols[i])) - keylen += r.getStringLength(keyCols[i]) + 1; + keylen += r.getStringLength(keyCols[i]) + 2; else keylen += 8; } @@ -1470,16 +1579,7 @@ TypelessData makeTypelessKey(const Row& r, const vector& keyCols, Pool // this is a string, copy a normalized version const uint8_t* str = r.getStringPointer(keyCols[i]); uint32_t width = r.getStringLength(keyCols[i]); - - if (width > 65536) - { - throw runtime_error("Cannot join strings greater than 64KB"); - } - - for (j = 0; j < width && str[j] != 0; j++) - ret.data[off++] = str[j]; - - ret.data[off++] = 0; + TypelessDataStringEncoder(str, width).store(ret.data, off, keylen); } else if (type == CalpontSystemCatalog::LONGDOUBLE) { diff --git a/utils/joiner/tuplejoiner.h b/utils/joiner/tuplejoiner.h index c1be13eac..974adae19 100644 --- a/utils/joiner/tuplejoiner.h +++ b/utils/joiner/tuplejoiner.h @@ -68,6 +68,10 @@ public: void deserialize(messageqcpp::ByteStream&, utils::FixedAllocator&); void deserialize(messageqcpp::ByteStream&, utils::PoolAllocator&); std::string toString() const; + uint32_t hash(const rowgroup::RowGroup&, const std::vector& keyCols) const; + static int cmp(const rowgroup::RowGroup&, const std::vector& keyCols, + const TypelessData &a, + const TypelessData &b); }; inline bool TypelessData::operator==(const TypelessData& t) const @@ -108,6 +112,18 @@ extern TypelessData makeTypelessKey(const rowgroup::Row&, extern uint64_t getHashOfTypelessKey(const rowgroup::Row&, const std::vector&, uint32_t seed = 0); +class TypelessDataStructure +{ +public: + const rowgroup::RowGroup *mRowGroup; + const std::vector *mMap; + TypelessDataStructure(const rowgroup::RowGroup *rg, + const std::vector *map) + :mRowGroup(rg), + mMap(map) + { } +}; + class TupleJoiner { @@ -143,6 +159,33 @@ public: utils::Hasher fHasher; }; + + struct TypelessDataHasher: public TypelessDataStructure + { + TypelessDataHasher(const rowgroup::RowGroup *rg, + const std::vector *map) + :TypelessDataStructure(rg, map) + { } + inline size_t operator()(const TypelessData& e) const + { + return e.hash(*mRowGroup, *mMap); + } + }; + + struct TypelessDataComparator: public TypelessDataStructure + { + public: + TypelessDataComparator(const rowgroup::RowGroup *rg, + const std::vector *map) + :TypelessDataStructure(rg, map) + { } + bool operator()(const TypelessData& a, const TypelessData& b) const + { + return !TypelessData::cmp(*mRowGroup, *mMap, a, b); + } + }; + + /* ctor to use for numeric join */ TupleJoiner( const rowgroup::RowGroup& smallInput,