1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-06-12 05:01:56 +03:00

Squash merge of the multithreaded PM join code.

Squashed commit of the following:

commit fe4cc375faf1588e30471062f78403e81229cd02
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Nov 1 13:38:11 2019 -0400

    Added some code comments to the new join code.

commit a7a82d093be4db3dfb44d33e4f514fd104b25f71
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Nov 1 13:17:47 2019 -0400

    Fixed an error down a path I think is unused.

commit 4e6c7c266a9aefd54c384ae2b466645770c81a5d
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Nov 1 13:12:12 2019 -0400

    std::atomic doesn't exist in C7, -> boost::atomic.

commit ed0996c3f4548fff0e19d43852d429ada1a72510
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Oct 16 12:47:32 2019 -0500

    Addition to the previous fix (join dependency projection).

commit 97bb806be9211e4688893460437f539c46f3796f
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Oct 15 15:22:09 2019 -0500

    Found and fixed a bad mem access, which may have been there for 8 years.

commit d8b0432d2abd70f28de5276daad758c494e4b04b
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Oct 15 14:04:48 2019 -0500

    Minor optimization in some code I happened to look at.

commit b6ec8204bf71670c7a8882464289e700aa5f7e33
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Oct 15 14:04:11 2019 -0500

    Fixed a compiler warning.

commit 0bf3e5218f71d92460ddc88090e3af77ecf28c35
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Oct 15 10:11:09 2019 -0500

    Undid part of the previous commit.

commit 5dfa1d23980e245c77c1644015b553aa4bcdf908
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Mon Oct 14 18:00:21 2019 -0500

    Proofread the diff vs base, added some comments, removed some debugging stuff.

commit 411fd955ebbae97ddab210a7b17fe5708538001d
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Oct 11 13:55:39 2019 -0500

    If a dev build (SKIP_OAM_INIT), made postConfigure exit before trying
    to start the system, because that won't work.

commit 634b1b8a7340b55fcaee045fd6d00b3e3a9269fa
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Mon Sep 30 14:55:45 2019 -0500

    Reduced crit section of BPP::addToJoiner a little.

commit 31f30c64dd95942f2c7a247cc81feaa5933c1a07
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 18 11:09:27 2019 -0500

    Checkpointing.  make the add joiner stuff free tmp mem quickly.

commit 9b7e788690546af7ddc4c921a0ab441ee9a8df02
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 18 10:38:57 2019 -0500

    Checkpoint.  Removed tmp hardcoding of bucket count.

commit fda4d8b7fb30d0431dc15e473042abb3d8121b19
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 18 10:20:09 2019 -0500

    Checkpoint.  Adjusted unproductive loop wait time.

commit 7b9a67df7d192f240e9e558e6e66c7aa9f1e8687
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 18 10:10:43 2019 -0500

    Checkpointing add'l optimizations.

    If we promote bpp::processorThreads / bucket count to a power of 2, we can
    use a bitmask instead of a mod operation to decide a bucket.

    Also, boosted utilization by not waiting for a bucket lock to become free.
    There are likely more gains to be had there with a smarter strategy.
    Maybe have each thread generate a random bucket access pattern to reduce
    chance of collision.  TBD.

commit abe7dab8661b5120f6ee268abc005dd66cd643e2
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Sep 17 16:15:51 2019 -0500

    Multithreaded PM hash table construction likely works here.

    A couple more fixes.
     - missed a mod after a hash in one place.
     - Made the PoolAllocator thread safe (small degree of performance hit
       there in threaded env).  May need to circle back to the table
       construction code to eliminate contention for the allocators instead.

commit ab308762fbd873dbf246a6d1574223087cd0d5f6
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Sep 17 12:14:14 2019 -0500

    Checkpointing.  Did some initial testing, fixed a couple things.

    Not done testing yet.

commit 3b161d74fa859edb8b5ba84bb905e586ac0586e6
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Tue Sep 17 11:24:55 2019 -0500

    Checkpointing.  First cut of multithreaded PM join table building.

    Builds but is untested.

commit cb7e6e1c2761fc6c33b3b1c6b6cda488d7792bca
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Mon Sep 16 13:03:50 2019 -0500

    Increase the STLPoolAllocator window size to reduce destruction time.

commit b0ddaaae71a0a4959ad15c87579d85ed88e17e1f
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Fri Sep 13 11:52:51 2019 -0500

    Fixed a bug preventing parallel table loading.  works now.

commit b87039604e312c1ddb88cdb226228b1c3addf018
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Thu Sep 12 22:04:15 2019 -0500

    Checkpointing some experimental changes.

     - Made the allocator type used by PM joins the STLPoolAllocator
     - Changed the default chunk size used by STLPoolAlloc based on a few test
        runs
     - Made BPP-JL interleave the PM join data by join # to take advantage
        of new locking env on PM.
     - While I was at it, fixed MCOL-1758.

commit fd4b09cc383d2b96959a8e5ca490c940bacb3d37
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Thu Sep 12 16:03:30 2019 -0500

    Speculative change.  Row estimator was stopping at 20 extents.

    Removed that limitation.

commit 7dcdd5b5455f9ac06121dd3cf1ba722150f3ee56
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Thu Sep 5 09:10:28 2019 -0500

    Inlined some hot simpleallocator fcns.

commit 6d84daceecc5499f6286cf3468c118b8b1d28d8f
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 4 17:02:29 2019 -0500

    Some optimizations to PM hash table creation.

    - made locks more granular.
    - reduced logic per iteration when adding elements.

commit b20bf54ed97c5a0a88d414a4dd844a0afc2e27f3
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 4 15:32:32 2019 -0500

    Reduced granularity of djLock in PrimProc.

commit 6273a8f3c4c62b87ef91c77a829033426e38e4d4
Author: Patrick LeBlanc <patrick.leblanc@mariadb.com>
Date:   Wed Sep 4 14:45:58 2019 -0500

    Added a timer to PM hash table construction

    signal USR1 will print cumulative wall time to stdout & reset the timer.
This commit is contained in:
Patrick LeBlanc
2019-11-01 17:34:33 -04:00
parent 3e7a964e2a
commit 1eaa83d852
14 changed files with 498 additions and 185 deletions

View File

@ -76,6 +76,19 @@ extern int fCacheCount;
extern uint32_t connectionsPerUM;
extern int noVB;
// copied from https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
uint nextPowOf2(uint x)
{
x--;
x |= x >> 1;
x |= x >> 2;
x |= x >> 4;
x |= x >> 8;
x |= x >> 16;
x++;
return x;
}
BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
ot(BPS_ELEMENT_TYPE),
txnID(0),
@ -109,7 +122,10 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
prefetchThreshold(0),
hasDictStep(false),
sockIndex(0),
endOfJoinerRan(false)
endOfJoinerRan(false),
processorThreads(0),
ptMask(0),
firstInstance(false)
{
pp.setLogicalBlockMode(true);
pp.setBlockPtr((int*) blockData);
@ -117,7 +133,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() :
}
BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
boost::shared_ptr<BPPSendThread> bppst) :
boost::shared_ptr<BPPSendThread> bppst, uint _processorThreads) :
ot(BPS_ELEMENT_TYPE),
txnID(0),
sessionID(0),
@ -150,8 +166,16 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch,
prefetchThreshold(prefetch),
hasDictStep(false),
sockIndex(0),
endOfJoinerRan(false)
endOfJoinerRan(false),
processorThreads(_processorThreads),
//processorThreads(32),
//ptMask(processorThreads - 1),
firstInstance(true)
{
// promote processorThreads to next power of 2. also need to change the name to bucketCount or similar
processorThreads = nextPowOf2(processorThreads);
ptMask = processorThreads - 1;
pp.setLogicalBlockMode(true);
pp.setBlockPtr((int*) blockData);
sendThread = bppst;
@ -252,15 +276,31 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
bs >> joinerCount;
// cout << "joinerCount = " << joinerCount << endl;
joinTypes.reset(new JoinType[joinerCount]);
tJoiners.reset(new boost::shared_ptr<TJoiner>[joinerCount]);
_pools.reset(new boost::shared_ptr<utils::SimplePool>[joinerCount]);
tlJoiners.reset(new boost::shared_ptr<TLJoiner>[joinerCount]);
tJoiners.reset(new boost::shared_array<boost::shared_ptr<TJoiner> >[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
tJoiners[j].reset(new boost::shared_ptr<TJoiner>[processorThreads]);
//_pools.reset(new boost::shared_ptr<utils::SimplePool>[joinerCount]);
tlJoiners.reset(new boost::shared_array<boost::shared_ptr<TLJoiner> >[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
tlJoiners[j].reset(new boost::shared_ptr<TLJoiner>[processorThreads]);
addToJoinerLocks.reset(new boost::scoped_array<boost::mutex>[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
addToJoinerLocks[j].reset(new boost::mutex[processorThreads]);
smallSideDataLocks.reset(new boost::mutex[joinerCount]);
tJoinerSizes.reset(new uint32_t[joinerCount]);
largeSideKeyColumns.reset(new uint32_t[joinerCount]);
tlLargeSideKeyColumns.reset(new vector<uint32_t>[joinerCount]);
typelessJoin.reset(new bool[joinerCount]);
tlKeyLengths.reset(new uint32_t[joinerCount]);
storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
for (uint j = 0; j < joinerCount; ++j)
storedKeyAllocators[j].setUseLock(true);
joinNullValues.reset(new uint64_t[joinerCount]);
doMatchNulls.reset(new bool[joinerCount]);
joinFEFilters.reset(new scoped_ptr<FuncExpWrapper>[joinerCount]);
@ -291,16 +331,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
bs >> joinNullValues[i];
bs >> largeSideKeyColumns[i];
//cout << "large side key is " << largeSideKeyColumns[i] << endl;
_pools[i].reset(new utils::SimplePool());
utils::SimpleAllocator<pair<uint64_t const, uint32_t> > alloc(_pools[i]);
tJoiners[i].reset(new TJoiner(10, TupleJoiner::hasher(), equal_to<uint64_t>(), alloc));
for (uint j = 0; j < processorThreads; ++j)
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher()));
}
else
{
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
bs >> tlKeyLengths[i];
//storedKeyAllocators[i] = PoolAllocator();
tlJoiners[i].reset(new TLJoiner(10, TupleJoiner::hasher()));
for (uint j = 0; j < processorThreads; ++j)
tlJoiners[i][j].reset(new TLJoiner(10, TupleJoiner::hasher()));
}
}
@ -355,6 +395,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
bs >> tmp8;
bs >> joinerSize;
joiner.reset(new Joiner((bool) tmp8));
// going to use just one lock for this old style, probably not used, join
addToJoinerLocks.reset(new boost::scoped_array<boost::mutex>[1]);
addToJoinerLocks[0].reset(new boost::mutex[1]);
}
#ifdef __FreeBSD__
@ -513,13 +556,19 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w,
#endif
}
// This version of addToJoiner() is multithreaded. Values are first
// hashed into thread-local vectors corresponding to the shared hash
// tables. Once the incoming values are organized locally, it grabs
// the lock for each shared table and inserts them there.
void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
{
uint32_t count, i, joinerNum, tlIndex, startPos;
joblist::ElementType* et;
TypelessData tlLargeKey;
uint8_t nullFlag;
/* to get wall-time of hash table construction
idbassert(processorThreads != 0);
if (firstCallTime.is_not_a_date_time())
firstCallTime = boost::posix_time::microsec_clock::universal_time();
*/
uint32_t count, i, joinerNum, tlIndex, startPos, bucket;
#pragma pack(push,1)
struct JoinerElements
{
@ -528,7 +577,6 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
} *arr;
#pragma pack(pop)
addToJoinerLock.lock();
/* skip the header */
bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t));
@ -541,31 +589,171 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
idbassert(joinerNum < joinerCount);
arr = (JoinerElements*) bs.buf();
// cout << "reading " << count << " elements from the bs, joinerNum is " << joinerNum << "\n";
for (i = 0; i < count; i++)
uint32_t &tJoinerSize = tJoinerSizes[joinerNum];
// XXXPAT: enormous if stmts are evil. TODO: move each block into
// properly-named functions for clarity.
if (typelessJoin[joinerNum])
{
if (typelessJoin[joinerNum])
vector<pair<TypelessData, uint32_t> > tmpBuckets[processorThreads];
TypelessData tlLargeKey;
uint8_t nullFlag;
PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum];
// this first loop hashes incoming values into vectors that parallel the hash tables.
for (i = 0; i < count; ++i)
{
bs >> nullFlag;
if (nullFlag == 0)
{
tlLargeKey.deserialize(bs, storedKeyAllocators[joinerNum]);
tlLargeKey.deserialize(bs, storedKeyAllocator);
bs >> tlIndex;
tlJoiners[joinerNum]->insert(pair<TypelessData, uint32_t>(tlLargeKey,
tlIndex));
bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask;
tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex));
}
else
tJoinerSizes[joinerNum]--;
--tJoinerSize;
}
bool done = false, didSomeWork;
//uint loopCounter = 0, noWorkCounter = 0;
// this loop moves the elements from each vector into its corresponding hash table.
while (!done)
{
//++loopCounter;
done = true;
didSomeWork = false;
for (i = 0; i < processorThreads; ++i)
{
if (!tmpBuckets[i].empty())
{
bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
if (!gotIt)
{
done = false; // didn't get it, don't block, try the next bucket
continue;
}
for (auto &element : tmpBuckets[i])
tlJoiners[joinerNum][i]->insert(element);
addToJoinerLocks[joinerNum][i].unlock();
tmpBuckets[i].clear();
didSomeWork = true;
}
}
// if this iteration did no useful work, everything we need is locked; wait briefly
// and try again.
if (!done && !didSomeWork)
{
::usleep(500 * processorThreads);
//++noWorkCounter;
}
}
//cout << "TL join insert. Took " << loopCounter << " loops" << endl;
}
else
{
boost::shared_array<boost::shared_ptr<TJoiner> > tJoiner = tJoiners[joinerNum];
uint64_t nullValue = joinNullValues[joinerNum];
bool &l_doMatchNulls = doMatchNulls[joinerNum];
joblist::JoinType joinType = joinTypes[joinerNum];
vector<pair<uint64_t, uint32_t> > tmpBuckets[processorThreads];
if (joinType & MATCHNULLS)
{
// this first loop hashes incoming values into vectors that parallel the hash tables.
for (i = 0; i < count; ++i)
{
/* A minor optimization: the matchnull logic should only be used with
* the jointype specifies it and there's a null value in the small side */
if (!l_doMatchNulls && arr[i].key == nullValue)
l_doMatchNulls = true;
bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask;
tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
}
bool done = false, didSomeWork;
//uint loopCounter = 0, noWorkCounter = 0;
// this loop moves the elements from each vector into its corresponding hash table.
while (!done)
{
//++loopCounter;
done = true;
didSomeWork = false;
for (i = 0; i < processorThreads; ++i)
{
if (!tmpBuckets[i].empty())
{
bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
if (!gotIt)
{
done = false; // didn't get it, don't block, try the next bucket
continue;
}
for (auto &element : tmpBuckets[i])
tJoiners[joinerNum][i]->insert(element);
addToJoinerLocks[joinerNum][i].unlock();
tmpBuckets[i].clear();
didSomeWork = true;
}
}
// if this iteration did no useful work, everything we need is locked; wait briefly
// and try again.
if (!done && !didSomeWork)
{
::usleep(500 * processorThreads);
//++noWorkCounter;
}
}
//cout << "T numeric join insert. Took " << loopCounter << " loops" << endl;
}
else
{
/* A minor optimization: the matchnull logic should only be used with
* the jointype specifies it and there's a null value in the small side */
if (arr[i].key == joinNullValues[joinerNum])
doMatchNulls[joinerNum] = joinTypes[joinerNum] & MATCHNULLS;
// this first loop hashes incoming values into vectors that parallel the hash tables.
for (i = 0; i < count; ++i)
{
bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask;
tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value));
}
tJoiners[joinerNum]->insert(pair<const uint64_t, uint32_t>(arr[i].key, arr[i].value));
bool done = false;
bool didSomeWork;
//uint loopCounter = 0, noWorkCounter = 0;
// this loop moves the elements from each vector into its corresponding hash table.
while (!done)
{
//++loopCounter;
done = true;
didSomeWork = false;
for (i = 0; i < processorThreads; ++i)
{
if (!tmpBuckets[i].empty())
{
bool gotIt = addToJoinerLocks[joinerNum][i].try_lock();
if (!gotIt)
{
done = false; // didn't get it, don't block, try the next bucket
continue;
}
for (auto &element : tmpBuckets[i])
tJoiners[joinerNum][i]->insert(element);
addToJoinerLocks[joinerNum][i].unlock();
tmpBuckets[i].clear();
didSomeWork = true;
}
}
// if this iteration did no useful work, everything we need is locked; wait briefly
// and try again.
if (!done && !didSomeWork)
{
::usleep(500 * processorThreads);
//++noWorkCounter;
}
}
//cout << "T numeric join insert 2. Took " << loopCounter << " loops," <<
// " unproductive iterations = " << noWorkCounter << endl;
}
}
@ -574,24 +762,16 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
if (getTupleJoinRowGroupData)
{
// cout << "copying full row data for joiner " << joinerNum << endl;
/* Need to update this assertion if there's a typeless join. search
for nullFlag. */
// idbassert(ssrdPos[joinerNum] + (count * smallSideRowLengths[joinerNum]) <=
// smallSideRGs[joinerNum].getEmptySize() +
// (smallSideRowLengths[joinerNum] * tJoinerSizes[joinerNum]));
RowGroup& smallSide = smallSideRGs[joinerNum];
RGData offTheWire;
// TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid
// the extra copying.
offTheWire.deserialize(bs);
mutex::scoped_lock lk(smallSideDataLocks[joinerNum]);
smallSide.setData(&smallSideRowData[joinerNum]);
smallSide.append(offTheWire, startPos);
//ssrdPos[joinerNum] += count;
/* This prints the row data
smallSideRGs[joinerNum].initRow(&r);
for (i = 0; i < (tJoinerSizes[joinerNum] * smallSideRowLengths[joinerNum]); i+=r.getSize()) {
@ -603,8 +783,9 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
}
else
{
et = (joblist::ElementType*) bs.buf();
joblist::ElementType *et = (joblist::ElementType*) bs.buf();
mutex::scoped_lock lk(addToJoinerLocks[0][0]);
for (i = 0; i < count; i++)
{
// cout << "BPP: adding <" << et[i].first << ", " << et[i].second << "> to Joiner\n";
@ -615,71 +796,78 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs)
}
idbassert(bs.length() == 0);
addToJoinerLock.unlock();
}
void BatchPrimitiveProcessor::doneSendingJoinerData()
{
/* to get wall-time of hash table construction
if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000))
{
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
Logger logger;
ostringstream os;
os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime;
logger.logMessage(os.str());
cout << os.str() << endl;
}
*/
}
int BatchPrimitiveProcessor::endOfJoiner()
{
/* Wait for all joiner elements to be added */
uint32_t i;
boost::mutex::scoped_lock scoped(addToJoinerLock);
size_t currentSize;
// it should be safe to run this without grabbing this lock
//boost::mutex::scoped_lock scoped(addToJoinerLock);
if (endOfJoinerRan)
return 0;
// minor hack / optimization. The instances not inserting the table data don't
// need to check that the table is complete.
if (!firstInstance)
{
endOfJoinerRan = true;
pthread_mutex_unlock(&objLock);
return 0;
}
if (ot == ROW_GROUP)
for (i = 0; i < joinerCount; i++)
{
if (!typelessJoin[i])
{
if ((tJoiners[i].get() == NULL || tJoiners[i]->size() !=
tJoinerSizes[i]))
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tJoiners[i] || !tJoiners[i][j])
return -1;
else
currentSize += tJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
return -1;
//if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
else
{
currentSize = 0;
for (uint j = 0; j < processorThreads; ++j)
if (!tlJoiners[i] || !tlJoiners[i][j])
return -1;
else
currentSize += tlJoiners[i][j]->size();
if (currentSize != tJoinerSizes[i])
return -1;
//if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i]))
// return -1;
}
else if ((tlJoiners[i].get() == NULL || tlJoiners[i]->size() !=
tJoinerSizes[i]))
return -1;
}
else if (joiner.get() == NULL || joiner->size() != joinerSize)
return -1;
endOfJoinerRan = true;
#ifdef old_version
addToJoinerLock.lock();
if (ot == ROW_GROUP)
for (i = 0; i < joinerCount; i++)
{
if (!typelessJoin[i])
while ((tJoiners[i].get() == NULL || tJoiners[i]->size() !=
tJoinerSizes[i]))
{
addToJoinerLock.unlock();
usleep(2000);
addToJoinerLock.lock();
}
else
while ((tlJoiners[i].get() == NULL || tlJoiners[i]->size() !=
tJoinerSizes[i]))
{
addToJoinerLock.unlock();
usleep(2000);
addToJoinerLock.lock();
}
}
else
while (joiner.get() == NULL || joiner->size() != joinerSize)
{
addToJoinerLock.unlock();
usleep(2000);
addToJoinerLock.lock();
}
addToJoinerLock.unlock();
#endif
#ifndef __FreeBSD__
pthread_mutex_unlock(&objLock);
#endif
@ -991,8 +1179,9 @@ void BatchPrimitiveProcessor::executeTupleJoin()
largeKey = oldRow.getUintField(colIndex);
else
largeKey = oldRow.getIntField(colIndex);
uint bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask;
found = (tJoiners[j]->find(largeKey) != tJoiners[j]->end());
found = (tJoiners[j][bucket]->find(largeKey) != tJoiners[j][bucket]->end());
isNull = oldRow.isNullValue(colIndex);
/* These conditions define when the row is NOT in the result set:
* - if the key is not in the small side, and the join isn't a large-outer or anti join
@ -1016,7 +1205,8 @@ void BatchPrimitiveProcessor::executeTupleJoin()
// the null values are not sent by UM in typeless case. null -> !found
tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j],
&tmpKeyAllocators[j]);
found = tlJoiners[j]->find(tlLargeKey) != tlJoiners[j]->end();
uint bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask;
found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end();
if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) ||
(joinTypes[j] & ANTI))
@ -1438,9 +1628,14 @@ void BatchPrimitiveProcessor::execute()
}
else
{
/* project the key columns. If there's the filter IN the join, project everything. */
/* project the key columns. If there's the filter IN the join, project everything.
Also need to project 'long' strings b/c executeTupleJoin may copy entire rows
using copyRow(), which will try to interpret the uninit'd string ptr.
Valgrind will legitimately complain about copying uninit'd values for the
other types but that is technically safe. */
for (j = 0; j < projectCount; j++)
if (keyColumnProj[j] || (hasJoinFEFilters && projectionMap[j] != -1))
if (keyColumnProj[j] || (projectionMap[j] != -1 && (hasJoinFEFilters ||
oldRow.isLongString(projectionMap[j]))))
{
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- projectIntoRowGroup");
@ -1451,6 +1646,7 @@ void BatchPrimitiveProcessor::execute()
#endif
}
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- executeTupleJoin()");
executeTupleJoin();
@ -1462,7 +1658,8 @@ void BatchPrimitiveProcessor::execute()
/* project the non-key columns */
for (j = 0; j < projectCount; ++j)
{
if ((!keyColumnProj[j] && projectionMap[j] != -1) && !hasJoinFEFilters)
if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters &&
!oldRow.isLongString(projectionMap[j]))
{
#ifdef PRIMPROC_STOPWATCH
stopwatch->start("-- projectIntoRowGroup");
@ -2173,6 +2370,8 @@ SBPP BatchPrimitiveProcessor::duplicate()
bpp->bop = bop;
bpp->hasPassThru = hasPassThru;
bpp->forHJ = forHJ;
bpp->processorThreads = processorThreads; // is a power-of-2 at this point
bpp->ptMask = processorThreads - 1;
if (ot == ROW_GROUP)
{
@ -2207,7 +2406,7 @@ SBPP BatchPrimitiveProcessor::duplicate()
bpp->joinTypes = joinTypes;
bpp->largeSideKeyColumns = largeSideKeyColumns;
bpp->tJoiners = tJoiners;
bpp->_pools = _pools;
//bpp->_pools = _pools;
bpp->typelessJoin = typelessJoin;
bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns;
bpp->tlJoiners = tlJoiners;
@ -2477,6 +2676,8 @@ void BatchPrimitiveProcessor::initGJRG()
inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jIndex, vector<uint32_t>& v)
{
uint bucket;
if (!typelessJoin[jIndex])
{
if (r.isNullValue(largeSideKeyColumns[jIndex]))
@ -2486,8 +2687,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
{
TJoiner::iterator it;
for (it = tJoiners[jIndex]->begin(); it != tJoiners[jIndex]->end(); ++it)
v.push_back(it->second);
for (uint i = 0; i < processorThreads; ++i)
for (it = tJoiners[jIndex][i]->begin(); it != tJoiners[jIndex][i]->end(); ++it)
v.push_back(it->second);
return;
}
@ -2507,15 +2709,15 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
largeKey = r.getIntField(colIndex);
}
pair<TJoiner::iterator, TJoiner::iterator> range = tJoiners[jIndex]->equal_range(largeKey);
bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask;
pair<TJoiner::iterator, TJoiner::iterator> range = tJoiners[jIndex][bucket]->equal_range(largeKey);
for (; range.first != range.second; ++range.first)
v.push_back(range.first->second);
if (doMatchNulls[jIndex]) // add the nulls to the match list
{
range = tJoiners[jIndex]->equal_range(joinNullValues[jIndex]);
bucket = bucketPicker((char *) &joinNullValues[jIndex], 8, bpSeed) & ptMask;
range = tJoiners[jIndex][bucket]->equal_range(joinNullValues[jIndex]);
for (; range.first != range.second; ++range.first)
v.push_back(range.first->second);
}
@ -2537,9 +2739,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
if (hasNullValue)
{
TLJoiner::iterator it;
for (it = tlJoiners[jIndex]->begin(); it != tlJoiners[jIndex]->end(); ++it)
v.push_back(it->second);
for (uint i = 0; i < processorThreads; ++i)
for (it = tlJoiners[jIndex][i]->begin(); it != tlJoiners[jIndex][i]->end(); ++it)
v.push_back(it->second);
return;
}
@ -2547,9 +2749,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde
TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex],
tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]);
pair<TLJoiner::iterator, TLJoiner::iterator> range =
tlJoiners[jIndex]->equal_range(largeKey);
pair<TLJoiner::iterator, TLJoiner::iterator> range;
bucket = bucketPicker((char *) largeKey.data, largeKey.len, bpSeed) & ptMask;
range = tlJoiners[jIndex][bucket]->equal_range(largeKey);
for (; range.first != range.second; ++range.first)
v.push_back(range.first->second);
}
@ -2579,4 +2781,3 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount)
}
// vim:ts=4 sw=4: