diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 5481a6de9..ec8306646 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -1317,9 +1317,29 @@ void BatchPrimitiveProcessorJL::useJoiners(const vectorgetSmallSide()->size()) + break; + } + if (i == PMJoinerCount) + return false; + pos = posByJoinerNum[joinerNum]; + return true; +} + /* This algorithm relies on the joiners being sorted by size atm */ +/* XXXPAT: Going to interleave across joiners to take advantage of the new locking env in PrimProc */ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) { uint32_t size = 0, toSend, i, j; @@ -1332,11 +1352,26 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) uint64_t smallkey; bool isNull; bool bSignedUnsigned; - + + bool moreMsgs = pickNextJoinerNum(); + + if (!moreMsgs) + { + /* last message */ +// cout << "sending last joiner msg\n"; + ism.Command = BATCH_PRIMITIVE_END_JOINER; + bs.load((uint8_t*) &ism, sizeof(ism)); + bs << (messageqcpp::ByteStream::quadbyte)sessionID; + bs << (messageqcpp::ByteStream::quadbyte)stepID; + bs << uniqueID; + return false; + } + memset((void*)&ism, 0, sizeof(ism)); tSmallSide = tJoiners[joinerNum]->getSmallSide(); size = tSmallSide->size(); +#if 0 if (joinerNum == PMJoinerCount - 1 && pos == size) { /* last message */ @@ -1356,7 +1391,8 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) size = tSmallSide->size(); pos = 0; } - +#endif + ism.Command = BATCH_PRIMITIVE_ADD_JOINER; bs.load((uint8_t*) &ism, sizeof(ism)); bs << (messageqcpp::ByteStream::quadbyte)sessionID; @@ -1541,6 +1577,7 @@ bool BatchPrimitiveProcessorJL::nextTupleJoinerMsg(ByteStream& bs) } pos += toSend; + posByJoinerNum[joinerNum] = pos; return true; } diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 1061694e6..8deba48d3 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -312,9 +312,11 @@ private: uint16_t status; /* for Joiner serialization */ + bool pickNextJoinerNum(); uint32_t pos, joinerNum; boost::shared_ptr joiner; boost::shared_ptr > smallSide; + boost::scoped_array posByJoinerNum; /* for RowGroup return type */ rowgroup::RowGroup inputRG, projectionRG; diff --git a/dbcon/joblist/rowestimator.cpp b/dbcon/joblist/rowestimator.cpp index 4e5e97274..21369332f 100644 --- a/dbcon/joblist/rowestimator.cpp +++ b/dbcon/joblist/rowestimator.cpp @@ -499,14 +499,13 @@ uint64_t RowEstimator::estimateRows(const vector& cpColVec, rowsInLastExtent = ((hwm + 1) * fBlockSize / colCmd->getColType().colWidth) % fRowsPerExtent; // Sum up the total number of scanned rows. - uint32_t idx = scanFlags.size() - 1; - bool done = false; + int32_t idx = scanFlags.size() - 1; - while (!done) + while (idx >= 0) { if (scanFlags[idx]) { - extentRows = (idx == scanFlags.size() - 1 ? rowsInLastExtent : fRowsPerExtent); + extentRows = (idx == (int) scanFlags.size() - 1 ? rowsInLastExtent : fRowsPerExtent); // Get the predicate factor. #if ROW_EST_DEBUG @@ -549,26 +548,28 @@ uint64_t RowEstimator::estimateRows(const vector& cpColVec, #endif } - if (extentsSampled == fExtentsToSample || idx == 0) - { - done = true; - } - else - { + //if (extentsSampled == fExtentsToSample || idx == 0) + //{ + //done = true; + //} + //else + //{ idx--; - } + //} } // If there are more extents than we sampled, add the row counts for the qualifying extents // that we didn't sample to the count of rows that will be scanned. - if ((extentsSampled >= fExtentsToSample) && (idx > 0)) + // XXXPAT: Modified this fcn to sample all extents. Leaving this here due to level of arcana + // involved. :) + if (false && (extentsSampled >= fExtentsToSample) && (idx > 0)) { factor = (1.0 * estimatedRowCount) / (1.0 * totalRowsToBeScanned); #if ROW_EST_DEBUG cout << "overall factor-" << factor << endl; #endif - for (uint32_t i = 0; i < idx; i++) + for (int32_t i = 0; i < idx; i++) { if (scanFlags[i]) { @@ -611,4 +612,3 @@ uint64_t RowEstimator::estimateRowsForNonCPColumn(ColumnCommandJL& colCmd) } } //namespace joblist - diff --git a/oamapps/postConfigure/postConfigure.cpp b/oamapps/postConfigure/postConfigure.cpp index f66ef43af..0e28154b3 100644 --- a/oamapps/postConfigure/postConfigure.cpp +++ b/oamapps/postConfigure/postConfigure.cpp @@ -29,8 +29,6 @@ /** * @file */ -#include "config.h" - #include #include @@ -214,11 +212,6 @@ typedef struct _thread_data_t int main(int argc, char* argv[]) { -// print a warning if this is a developer build -#ifdef SKIP_OAM_INIT - cout << "SKIP_OAM_INIT is set" << endl; - sleep(2); -#endif Oam oam; string parentOAMModuleHostName; ChildModuleList childmodulelist; diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index 8a664bd16..7ed4339ae 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -76,6 +76,19 @@ extern int fCacheCount; extern uint32_t connectionsPerUM; extern int noVB; +// copied from https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 +uint nextPowOf2(uint x) +{ + x--; + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + x |= x >> 16; + x++; + return x; +} + BatchPrimitiveProcessor::BatchPrimitiveProcessor() : ot(BPS_ELEMENT_TYPE), txnID(0), @@ -109,7 +122,10 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() : prefetchThreshold(0), hasDictStep(false), sockIndex(0), - endOfJoinerRan(false) + endOfJoinerRan(false), + processorThreads(0), + ptMask(0), + firstInstance(false) { pp.setLogicalBlockMode(true); pp.setBlockPtr((int*) blockData); @@ -117,7 +133,7 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor() : } BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, - boost::shared_ptr bppst) : + boost::shared_ptr bppst, uint _processorThreads) : ot(BPS_ELEMENT_TYPE), txnID(0), sessionID(0), @@ -150,8 +166,16 @@ BatchPrimitiveProcessor::BatchPrimitiveProcessor(ByteStream& b, double prefetch, prefetchThreshold(prefetch), hasDictStep(false), sockIndex(0), - endOfJoinerRan(false) + endOfJoinerRan(false), + processorThreads(_processorThreads), + //processorThreads(32), + //ptMask(processorThreads - 1), + firstInstance(true) { + // promote processorThreads to next power of 2. also need to change the name to bucketCount or similar + processorThreads = nextPowOf2(processorThreads); + ptMask = processorThreads - 1; + pp.setLogicalBlockMode(true); pp.setBlockPtr((int*) blockData); sendThread = bppst; @@ -252,15 +276,31 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> joinerCount; // cout << "joinerCount = " << joinerCount << endl; joinTypes.reset(new JoinType[joinerCount]); - tJoiners.reset(new boost::shared_ptr[joinerCount]); - _pools.reset(new boost::shared_ptr[joinerCount]); - tlJoiners.reset(new boost::shared_ptr[joinerCount]); + + tJoiners.reset(new boost::shared_array >[joinerCount]); + for (uint j = 0; j < joinerCount; ++j) + tJoiners[j].reset(new boost::shared_ptr[processorThreads]); + + //_pools.reset(new boost::shared_ptr[joinerCount]); + tlJoiners.reset(new boost::shared_array >[joinerCount]); + for (uint j = 0; j < joinerCount; ++j) + tlJoiners[j].reset(new boost::shared_ptr[processorThreads]); + + addToJoinerLocks.reset(new boost::scoped_array[joinerCount]); + for (uint j = 0; j < joinerCount; ++j) + addToJoinerLocks[j].reset(new boost::mutex[processorThreads]); + + smallSideDataLocks.reset(new boost::mutex[joinerCount]); tJoinerSizes.reset(new uint32_t[joinerCount]); largeSideKeyColumns.reset(new uint32_t[joinerCount]); tlLargeSideKeyColumns.reset(new vector[joinerCount]); typelessJoin.reset(new bool[joinerCount]); tlKeyLengths.reset(new uint32_t[joinerCount]); + storedKeyAllocators.reset(new PoolAllocator[joinerCount]); + for (uint j = 0; j < joinerCount; ++j) + storedKeyAllocators[j].setUseLock(true); + joinNullValues.reset(new uint64_t[joinerCount]); doMatchNulls.reset(new bool[joinerCount]); joinFEFilters.reset(new scoped_ptr[joinerCount]); @@ -291,16 +331,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> joinNullValues[i]; bs >> largeSideKeyColumns[i]; //cout << "large side key is " << largeSideKeyColumns[i] << endl; - _pools[i].reset(new utils::SimplePool()); - utils::SimpleAllocator > alloc(_pools[i]); - tJoiners[i].reset(new TJoiner(10, TupleJoiner::hasher(), equal_to(), alloc)); + for (uint j = 0; j < processorThreads; ++j) + tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher())); } else { deserializeVector(bs, tlLargeSideKeyColumns[i]); bs >> tlKeyLengths[i]; //storedKeyAllocators[i] = PoolAllocator(); - tlJoiners[i].reset(new TLJoiner(10, TupleJoiner::hasher())); + for (uint j = 0; j < processorThreads; ++j) + tlJoiners[i][j].reset(new TLJoiner(10, TupleJoiner::hasher())); } } @@ -355,6 +395,9 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) bs >> tmp8; bs >> joinerSize; joiner.reset(new Joiner((bool) tmp8)); + // going to use just one lock for this old style, probably not used, join + addToJoinerLocks.reset(new boost::scoped_array[1]); + addToJoinerLocks[0].reset(new boost::mutex[1]); } #ifdef __FreeBSD__ @@ -513,13 +556,19 @@ void BatchPrimitiveProcessor::resetBPP(ByteStream& bs, const SP_UM_MUTEX& w, #endif } +// This version of addToJoiner() is multithreaded. Values are first +// hashed into thread-local vectors corresponding to the shared hash +// tables. Once the incoming values are organized locally, it grabs +// the lock for each shared table and inserts them there. void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) { - uint32_t count, i, joinerNum, tlIndex, startPos; - joblist::ElementType* et; - TypelessData tlLargeKey; - uint8_t nullFlag; +/* to get wall-time of hash table construction + idbassert(processorThreads != 0); + if (firstCallTime.is_not_a_date_time()) + firstCallTime = boost::posix_time::microsec_clock::universal_time(); +*/ + uint32_t count, i, joinerNum, tlIndex, startPos, bucket; #pragma pack(push,1) struct JoinerElements { @@ -528,7 +577,6 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) } *arr; #pragma pack(pop) - addToJoinerLock.lock(); /* skip the header */ bs.advance(sizeof(ISMPacketHeader) + 3 * sizeof(uint32_t)); @@ -541,31 +589,171 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) idbassert(joinerNum < joinerCount); arr = (JoinerElements*) bs.buf(); -// cout << "reading " << count << " elements from the bs, joinerNum is " << joinerNum << "\n"; - for (i = 0; i < count; i++) + uint32_t &tJoinerSize = tJoinerSizes[joinerNum]; + + // XXXPAT: enormous if stmts are evil. TODO: move each block into + // properly-named functions for clarity. + if (typelessJoin[joinerNum]) { - if (typelessJoin[joinerNum]) + vector > tmpBuckets[processorThreads]; + TypelessData tlLargeKey; + uint8_t nullFlag; + PoolAllocator &storedKeyAllocator = storedKeyAllocators[joinerNum]; + // this first loop hashes incoming values into vectors that parallel the hash tables. + for (i = 0; i < count; ++i) { bs >> nullFlag; - if (nullFlag == 0) { - tlLargeKey.deserialize(bs, storedKeyAllocators[joinerNum]); + tlLargeKey.deserialize(bs, storedKeyAllocator); bs >> tlIndex; - tlJoiners[joinerNum]->insert(pair(tlLargeKey, - tlIndex)); + bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask; + tmpBuckets[bucket].push_back(make_pair(tlLargeKey, tlIndex)); } else - tJoinerSizes[joinerNum]--; + --tJoinerSize; + } + + bool done = false, didSomeWork; + //uint loopCounter = 0, noWorkCounter = 0; + // this loop moves the elements from each vector into its corresponding hash table. + while (!done) + { + //++loopCounter; + done = true; + didSomeWork = false; + for (i = 0; i < processorThreads; ++i) + { + if (!tmpBuckets[i].empty()) + { + bool gotIt = addToJoinerLocks[joinerNum][i].try_lock(); + if (!gotIt) + { + done = false; // didn't get it, don't block, try the next bucket + continue; + } + for (auto &element : tmpBuckets[i]) + tlJoiners[joinerNum][i]->insert(element); + addToJoinerLocks[joinerNum][i].unlock(); + tmpBuckets[i].clear(); + didSomeWork = true; + } + } + // if this iteration did no useful work, everything we need is locked; wait briefly + // and try again. + if (!done && !didSomeWork) + { + ::usleep(500 * processorThreads); + //++noWorkCounter; + } + } + //cout << "TL join insert. Took " << loopCounter << " loops" << endl; + + } + else + { + boost::shared_array > tJoiner = tJoiners[joinerNum]; + uint64_t nullValue = joinNullValues[joinerNum]; + bool &l_doMatchNulls = doMatchNulls[joinerNum]; + joblist::JoinType joinType = joinTypes[joinerNum]; + vector > tmpBuckets[processorThreads]; + + if (joinType & MATCHNULLS) + { + // this first loop hashes incoming values into vectors that parallel the hash tables. + for (i = 0; i < count; ++i) + { + /* A minor optimization: the matchnull logic should only be used with + * the jointype specifies it and there's a null value in the small side */ + if (!l_doMatchNulls && arr[i].key == nullValue) + l_doMatchNulls = true; + bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask; + tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value)); + } + + + bool done = false, didSomeWork; + //uint loopCounter = 0, noWorkCounter = 0; + // this loop moves the elements from each vector into its corresponding hash table. + while (!done) + { + //++loopCounter; + done = true; + didSomeWork = false; + for (i = 0; i < processorThreads; ++i) + { + if (!tmpBuckets[i].empty()) + { + bool gotIt = addToJoinerLocks[joinerNum][i].try_lock(); + if (!gotIt) + { + done = false; // didn't get it, don't block, try the next bucket + continue; + } + for (auto &element : tmpBuckets[i]) + tJoiners[joinerNum][i]->insert(element); + addToJoinerLocks[joinerNum][i].unlock(); + tmpBuckets[i].clear(); + didSomeWork = true; + } + } + // if this iteration did no useful work, everything we need is locked; wait briefly + // and try again. + if (!done && !didSomeWork) + { + ::usleep(500 * processorThreads); + //++noWorkCounter; + } + } + + //cout << "T numeric join insert. Took " << loopCounter << " loops" << endl; } else { - /* A minor optimization: the matchnull logic should only be used with - * the jointype specifies it and there's a null value in the small side */ - if (arr[i].key == joinNullValues[joinerNum]) - doMatchNulls[joinerNum] = joinTypes[joinerNum] & MATCHNULLS; + // this first loop hashes incoming values into vectors that parallel the hash tables. + for (i = 0; i < count; ++i) + { + bucket = bucketPicker((char *) &arr[i].key, 8, bpSeed) & ptMask; + tmpBuckets[bucket].push_back(make_pair(arr[i].key, arr[i].value)); + } - tJoiners[joinerNum]->insert(pair(arr[i].key, arr[i].value)); + bool done = false; + bool didSomeWork; + //uint loopCounter = 0, noWorkCounter = 0; + // this loop moves the elements from each vector into its corresponding hash table. + while (!done) + { + //++loopCounter; + done = true; + didSomeWork = false; + for (i = 0; i < processorThreads; ++i) + { + if (!tmpBuckets[i].empty()) + { + bool gotIt = addToJoinerLocks[joinerNum][i].try_lock(); + if (!gotIt) + { + done = false; // didn't get it, don't block, try the next bucket + continue; + } + for (auto &element : tmpBuckets[i]) + tJoiners[joinerNum][i]->insert(element); + addToJoinerLocks[joinerNum][i].unlock(); + tmpBuckets[i].clear(); + didSomeWork = true; + } + } + // if this iteration did no useful work, everything we need is locked; wait briefly + // and try again. + if (!done && !didSomeWork) + { + ::usleep(500 * processorThreads); + //++noWorkCounter; + } + + } + //cout << "T numeric join insert 2. Took " << loopCounter << " loops," << + // " unproductive iterations = " << noWorkCounter << endl; } } @@ -574,24 +762,16 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) if (getTupleJoinRowGroupData) { -// cout << "copying full row data for joiner " << joinerNum << endl; - /* Need to update this assertion if there's a typeless join. search - for nullFlag. */ -// idbassert(ssrdPos[joinerNum] + (count * smallSideRowLengths[joinerNum]) <= -// smallSideRGs[joinerNum].getEmptySize() + -// (smallSideRowLengths[joinerNum] * tJoinerSizes[joinerNum])); - RowGroup& smallSide = smallSideRGs[joinerNum]; RGData offTheWire; // TODO: write an RGData fcn to let it interpret data within a ByteStream to avoid // the extra copying. offTheWire.deserialize(bs); + mutex::scoped_lock lk(smallSideDataLocks[joinerNum]); smallSide.setData(&smallSideRowData[joinerNum]); smallSide.append(offTheWire, startPos); - //ssrdPos[joinerNum] += count; - /* This prints the row data smallSideRGs[joinerNum].initRow(&r); for (i = 0; i < (tJoinerSizes[joinerNum] * smallSideRowLengths[joinerNum]); i+=r.getSize()) { @@ -603,8 +783,9 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) } else { - et = (joblist::ElementType*) bs.buf(); + joblist::ElementType *et = (joblist::ElementType*) bs.buf(); + mutex::scoped_lock lk(addToJoinerLocks[0][0]); for (i = 0; i < count; i++) { // cout << "BPP: adding <" << et[i].first << ", " << et[i].second << "> to Joiner\n"; @@ -615,71 +796,78 @@ void BatchPrimitiveProcessor::addToJoiner(ByteStream& bs) } idbassert(bs.length() == 0); - addToJoinerLock.unlock(); +} + +void BatchPrimitiveProcessor::doneSendingJoinerData() +{ + /* to get wall-time of hash table construction + if (!firstCallTime.is_not_a_date_time() && !(sessionID & 0x80000000)) + { + boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + Logger logger; + ostringstream os; + os << "id " << uniqueID << ": joiner construction time = " << now-firstCallTime; + logger.logMessage(os.str()); + cout << os.str() << endl; + } + */ } int BatchPrimitiveProcessor::endOfJoiner() { /* Wait for all joiner elements to be added */ uint32_t i; - - boost::mutex::scoped_lock scoped(addToJoinerLock); + size_t currentSize; + // it should be safe to run this without grabbing this lock + //boost::mutex::scoped_lock scoped(addToJoinerLock); if (endOfJoinerRan) return 0; + // minor hack / optimization. The instances not inserting the table data don't + // need to check that the table is complete. + if (!firstInstance) + { + endOfJoinerRan = true; + pthread_mutex_unlock(&objLock); + return 0; + } + if (ot == ROW_GROUP) for (i = 0; i < joinerCount; i++) { if (!typelessJoin[i]) { - if ((tJoiners[i].get() == NULL || tJoiners[i]->size() != - tJoinerSizes[i])) + currentSize = 0; + for (uint j = 0; j < processorThreads; ++j) + if (!tJoiners[i] || !tJoiners[i][j]) + return -1; + else + currentSize += tJoiners[i][j]->size(); + if (currentSize != tJoinerSizes[i]) return -1; + //if ((!tJoiners[i] || tJoiners[i]->size() != tJoinerSizes[i])) + // return -1; + } + else + { + currentSize = 0; + for (uint j = 0; j < processorThreads; ++j) + if (!tlJoiners[i] || !tlJoiners[i][j]) + return -1; + else + currentSize += tlJoiners[i][j]->size(); + if (currentSize != tJoinerSizes[i]) + return -1; + //if ((!tJoiners[i] || tlJoiners[i]->size() != tJoinerSizes[i])) + // return -1; } - else if ((tlJoiners[i].get() == NULL || tlJoiners[i]->size() != - tJoinerSizes[i])) - return -1; } else if (joiner.get() == NULL || joiner->size() != joinerSize) return -1; endOfJoinerRan = true; -#ifdef old_version - addToJoinerLock.lock(); - - if (ot == ROW_GROUP) - for (i = 0; i < joinerCount; i++) - { - if (!typelessJoin[i]) - while ((tJoiners[i].get() == NULL || tJoiners[i]->size() != - tJoinerSizes[i])) - { - addToJoinerLock.unlock(); - usleep(2000); - addToJoinerLock.lock(); - } - else - while ((tlJoiners[i].get() == NULL || tlJoiners[i]->size() != - tJoinerSizes[i])) - { - addToJoinerLock.unlock(); - usleep(2000); - addToJoinerLock.lock(); - } - } - else - while (joiner.get() == NULL || joiner->size() != joinerSize) - { - addToJoinerLock.unlock(); - usleep(2000); - addToJoinerLock.lock(); - } - - addToJoinerLock.unlock(); -#endif - #ifndef __FreeBSD__ pthread_mutex_unlock(&objLock); #endif @@ -991,8 +1179,9 @@ void BatchPrimitiveProcessor::executeTupleJoin() largeKey = oldRow.getUintField(colIndex); else largeKey = oldRow.getIntField(colIndex); + uint bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask; - found = (tJoiners[j]->find(largeKey) != tJoiners[j]->end()); + found = (tJoiners[j][bucket]->find(largeKey) != tJoiners[j][bucket]->end()); isNull = oldRow.isNullValue(colIndex); /* These conditions define when the row is NOT in the result set: * - if the key is not in the small side, and the join isn't a large-outer or anti join @@ -1016,7 +1205,8 @@ void BatchPrimitiveProcessor::executeTupleJoin() // the null values are not sent by UM in typeless case. null -> !found tlLargeKey = makeTypelessKey(oldRow, tlLargeSideKeyColumns[j], tlKeyLengths[j], &tmpKeyAllocators[j]); - found = tlJoiners[j]->find(tlLargeKey) != tlJoiners[j]->end(); + uint bucket = bucketPicker((char *) tlLargeKey.data, tlLargeKey.len, bpSeed) & ptMask; + found = tlJoiners[j][bucket]->find(tlLargeKey) != tlJoiners[j][bucket]->end(); if ((!found && !(joinTypes[j] & (LARGEOUTER | ANTI))) || (joinTypes[j] & ANTI)) @@ -1438,9 +1628,14 @@ void BatchPrimitiveProcessor::execute() } else { - /* project the key columns. If there's the filter IN the join, project everything. */ + /* project the key columns. If there's the filter IN the join, project everything. + Also need to project 'long' strings b/c executeTupleJoin may copy entire rows + using copyRow(), which will try to interpret the uninit'd string ptr. + Valgrind will legitimately complain about copying uninit'd values for the + other types but that is technically safe. */ for (j = 0; j < projectCount; j++) - if (keyColumnProj[j] || (hasJoinFEFilters && projectionMap[j] != -1)) + if (keyColumnProj[j] || (projectionMap[j] != -1 && (hasJoinFEFilters || + oldRow.isLongString(projectionMap[j])))) { #ifdef PRIMPROC_STOPWATCH stopwatch->start("-- projectIntoRowGroup"); @@ -1451,6 +1646,7 @@ void BatchPrimitiveProcessor::execute() #endif } + #ifdef PRIMPROC_STOPWATCH stopwatch->start("-- executeTupleJoin()"); executeTupleJoin(); @@ -1462,7 +1658,8 @@ void BatchPrimitiveProcessor::execute() /* project the non-key columns */ for (j = 0; j < projectCount; ++j) { - if ((!keyColumnProj[j] && projectionMap[j] != -1) && !hasJoinFEFilters) + if (projectionMap[j] != -1 && !keyColumnProj[j] && !hasJoinFEFilters && + !oldRow.isLongString(projectionMap[j])) { #ifdef PRIMPROC_STOPWATCH stopwatch->start("-- projectIntoRowGroup"); @@ -2173,6 +2370,8 @@ SBPP BatchPrimitiveProcessor::duplicate() bpp->bop = bop; bpp->hasPassThru = hasPassThru; bpp->forHJ = forHJ; + bpp->processorThreads = processorThreads; // is a power-of-2 at this point + bpp->ptMask = processorThreads - 1; if (ot == ROW_GROUP) { @@ -2207,7 +2406,7 @@ SBPP BatchPrimitiveProcessor::duplicate() bpp->joinTypes = joinTypes; bpp->largeSideKeyColumns = largeSideKeyColumns; bpp->tJoiners = tJoiners; - bpp->_pools = _pools; + //bpp->_pools = _pools; bpp->typelessJoin = typelessJoin; bpp->tlLargeSideKeyColumns = tlLargeSideKeyColumns; bpp->tlJoiners = tlJoiners; @@ -2477,6 +2676,8 @@ void BatchPrimitiveProcessor::initGJRG() inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jIndex, vector& v) { + uint bucket; + if (!typelessJoin[jIndex]) { if (r.isNullValue(largeSideKeyColumns[jIndex])) @@ -2486,8 +2687,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde { TJoiner::iterator it; - for (it = tJoiners[jIndex]->begin(); it != tJoiners[jIndex]->end(); ++it) - v.push_back(it->second); + for (uint i = 0; i < processorThreads; ++i) + for (it = tJoiners[jIndex][i]->begin(); it != tJoiners[jIndex][i]->end(); ++it) + v.push_back(it->second); return; } @@ -2507,15 +2709,15 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde largeKey = r.getIntField(colIndex); } - pair range = tJoiners[jIndex]->equal_range(largeKey); - + bucket = bucketPicker((char *) &largeKey, 8, bpSeed) & ptMask; + pair range = tJoiners[jIndex][bucket]->equal_range(largeKey); for (; range.first != range.second; ++range.first) v.push_back(range.first->second); if (doMatchNulls[jIndex]) // add the nulls to the match list { - range = tJoiners[jIndex]->equal_range(joinNullValues[jIndex]); - + bucket = bucketPicker((char *) &joinNullValues[jIndex], 8, bpSeed) & ptMask; + range = tJoiners[jIndex][bucket]->equal_range(joinNullValues[jIndex]); for (; range.first != range.second; ++range.first) v.push_back(range.first->second); } @@ -2537,9 +2739,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde if (hasNullValue) { TLJoiner::iterator it; - - for (it = tlJoiners[jIndex]->begin(); it != tlJoiners[jIndex]->end(); ++it) - v.push_back(it->second); + for (uint i = 0; i < processorThreads; ++i) + for (it = tlJoiners[jIndex][i]->begin(); it != tlJoiners[jIndex][i]->end(); ++it) + v.push_back(it->second); return; } @@ -2547,9 +2749,9 @@ inline void BatchPrimitiveProcessor::getJoinResults(const Row& r, uint32_t jInde TypelessData largeKey = makeTypelessKey(r, tlLargeSideKeyColumns[jIndex], tlKeyLengths[jIndex], &tmpKeyAllocators[jIndex]); - pair range = - tlJoiners[jIndex]->equal_range(largeKey); - + pair range; + bucket = bucketPicker((char *) largeKey.data, largeKey.len, bpSeed) & ptMask; + range = tlJoiners[jIndex][bucket]->equal_range(largeKey); for (; range.first != range.second; ++range.first) v.push_back(range.first->second); } @@ -2579,4 +2781,3 @@ void BatchPrimitiveProcessor::buildVSSCache(uint32_t loopCount) } // vim:ts=4 sw=4: - diff --git a/primitives/primproc/batchprimitiveprocessor.h b/primitives/primproc/batchprimitiveprocessor.h index b05910915..89402242c 100644 --- a/primitives/primproc/batchprimitiveprocessor.h +++ b/primitives/primproc/batchprimitiveprocessor.h @@ -86,7 +86,7 @@ class BatchPrimitiveProcessor { public: BatchPrimitiveProcessor(messageqcpp::ByteStream&, double prefetchThresh, - boost::shared_ptr); + boost::shared_ptr, uint processorThreads); ~BatchPrimitiveProcessor(); @@ -95,6 +95,7 @@ public: void resetBPP(messageqcpp::ByteStream&, const SP_UM_MUTEX& wLock, const SP_UM_IOSOCK& outputSock); void addToJoiner(messageqcpp::ByteStream&); int endOfJoiner(); + void doneSendingJoinerData(); int operator()(); void setLBIDForScan(uint64_t rid); @@ -255,7 +256,8 @@ private: bool doJoin; uint32_t joinerSize; uint16_t preJoinRidCount; - boost::mutex addToJoinerLock; + boost::scoped_array > addToJoinerLocks; + boost::scoped_array smallSideDataLocks; void executeJoin(); // uint32_t ridsIn, ridsOut; @@ -281,10 +283,11 @@ private: /* Rowgroups + join */ typedef std::tr1::unordered_multimap, - utils::SimpleAllocator > > TJoiner; - + utils::STLPoolAllocator > > TJoiner; + typedef std::tr1::unordered_multimap TLJoiner; + uint32_t, joiner::TupleJoiner::hasher, std::equal_to, + utils::STLPoolAllocator > > TLJoiner; bool generateJoinedRowGroup(rowgroup::Row& baseRow, const uint32_t depth = 0); /* generateJoinedRowGroup helper fcns & vars */ @@ -299,7 +302,7 @@ private: boost::scoped_array smallRows; boost::shared_array > gjrgMappings; - boost::shared_array > tJoiners; + boost::shared_array > > tJoiners; typedef std::vector MatchedData[LOGICAL_BLOCK_RIDS]; boost::shared_array tSmallSideMatches; void executeTupleJoin(); @@ -328,7 +331,7 @@ private: /* extra typeless join vars & fcns*/ boost::shared_array typelessJoin; boost::shared_array > tlLargeSideKeyColumns; - boost::shared_array > tlJoiners; + boost::shared_array > > tlJoiners; boost::shared_array tlKeyLengths; inline void getJoinResults(const rowgroup::Row& r, uint32_t jIndex, std::vector& v); // these allocators hold the memory for the keys stored in tlJoiners @@ -342,7 +345,6 @@ private: rowgroup::RowGroup fAggregateRG; rowgroup::RGData fAggRowGroupData; //boost::scoped_array fAggRowGroupData; - boost::shared_array > _pools; /* OR hacks */ uint8_t bop; // BOP_AND or BOP_OR @@ -389,7 +391,14 @@ private: uint32_t dbRoot; bool endOfJoinerRan; - + /* Some addJoiner() profiling stuff */ + boost::posix_time::ptime firstCallTime; + utils::Hasher_r bucketPicker; + const uint32_t bpSeed = 0xf22df448; // an arbitrary random # + uint processorThreads; + uint ptMask; + bool firstInstance; + friend class Command; friend class ColumnCommand; friend class DictStep; diff --git a/primitives/primproc/primitiveserver.cpp b/primitives/primproc/primitiveserver.cpp index f64e38a4a..2061d832f 100644 --- a/primitives/primproc/primitiveserver.cpp +++ b/primitives/primproc/primitiveserver.cpp @@ -151,7 +151,12 @@ int noVB = 0; const uint8_t fMaxColWidth(8); BPPMap bppMap; mutex bppLock; -mutex djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619 + +#define DJLOCK_READ 0 +#define DJLOCK_WRITE 1 +mutex djMutex; // lock for djLock, lol. +std::map djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619 + volatile int32_t asyncCounter; const int asyncMax = 20; // current number of asynchronous loads @@ -1272,7 +1277,7 @@ struct BPPHandler // threads lying around std::vector bppKeys; std::vector::iterator bppKeysIt; - + ~BPPHandler() { mutex::scoped_lock scoped(bppLock); @@ -1441,7 +1446,7 @@ struct BPPHandler // make the new BPP object bppv.reset(new BPPV()); bpp.reset(new BatchPrimitiveProcessor(bs, fPrimitiveServerPtr->prefetchThreshold(), - bppv->getSendThread())); + bppv->getSendThread(), fPrimitiveServerPtr->ProcessorThreads())); if (bs.length() > 0) bs >> initMsgsLeft; @@ -1490,7 +1495,7 @@ struct BPPHandler } } - SBPPV grabBPPs(uint32_t uniqueID) + inline SBPPV grabBPPs(uint32_t uniqueID) { BPPMap::iterator it; /* @@ -1526,6 +1531,30 @@ struct BPPHandler */ } + inline shared_mutex & getDJLock(uint32_t uniqueID) + { + mutex::scoped_lock lk(djMutex); + auto it = djLock.find(uniqueID); + if (it != djLock.end()) + return *it->second; + else + { + auto ret = djLock.insert(make_pair(uniqueID, new shared_mutex())).first; + return *ret->second; + } + } + + inline void deleteDJLock(uint32_t uniqueID) + { + mutex::scoped_lock lk(djMutex); + auto it = djLock.find(uniqueID); + if (it != djLock.end()) + { + delete it->second; + djLock.erase(it); + } + } + int addJoinerToBPP(ByteStream& bs, const posix_time::ptime& dieTime) { SBPPV bppv; @@ -1541,7 +1570,7 @@ struct BPPHandler if (bppv) { - mutex::scoped_lock lk(djLock); + shared_lock lk(getDJLock(uniqueID)); bppv->get()[0]->addToJoiner(bs); return 0; } @@ -1578,8 +1607,9 @@ struct BPPHandler return -1; } - mutex::scoped_lock lk(djLock); - + unique_lock lk(getDJLock(uniqueID)); + + for (i = 0; i < bppv->get().size(); i++) { err = bppv->get()[i]->endOfJoiner(); @@ -1592,6 +1622,7 @@ struct BPPHandler return -1; } } + bppv->get()[0]->doneSendingJoinerData(); /* Note: some of the duplicate/run/join sync was moved to the BPPV class to do more intelligent scheduling. Once the join data is received, BPPV will @@ -1622,7 +1653,7 @@ struct BPPHandler return -1; } - mutex::scoped_lock lk(djLock); + unique_lock lk(getDJLock(uniqueID)); mutex::scoped_lock scoped(bppLock); bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID); @@ -1657,7 +1688,15 @@ struct BPPHandler bs.rewind(); if (posix_time::second_clock::universal_time() > dieTime) + { + // XXXPAT: going to let this fall through and delete jobs for + // uniqueID if there are any. Not clear what the downside is. + /* + lk.unlock(); + deleteDJLock(uniqueID); return 0; + */ + } else return -1; } @@ -1673,6 +1712,8 @@ struct BPPHandler */ fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID); OOBPool->removeJobs(uniqueID); + lk.unlock(); + deleteDJLock(uniqueID); return 0; } diff --git a/primitives/primproc/primproc.cpp b/primitives/primproc/primproc.cpp index 46d6cb03e..201e76fa7 100644 --- a/primitives/primproc/primproc.cpp +++ b/primitives/primproc/primproc.cpp @@ -159,7 +159,7 @@ int8_t setupCwd(Config* cf) if (rc < 0 || access(".", W_OK) != 0) rc = chdir("/tmp"); - + return rc; } diff --git a/utils/common/poolallocator.cpp b/utils/common/poolallocator.cpp index 93a1fd4b1..5fdddce6d 100644 --- a/utils/common/poolallocator.cpp +++ b/utils/common/poolallocator.cpp @@ -65,32 +65,18 @@ void PoolAllocator::newBlock() nextAlloc = mem.front().get(); } -void* PoolAllocator::allocate(uint64_t size) +void * PoolAllocator::allocOOB(uint64_t size) { - void* ret; + OOBMemInfo memInfo; - if (size > allocSize) - { - OOBMemInfo memInfo; - - memUsage += size; - memInfo.mem.reset(new uint8_t[size]); - memInfo.size = size; - ret = (void*) memInfo.mem.get(); - oob[ret] = memInfo; - return ret; - } - - if (size > capacityRemaining) - newBlock(); - - ret = (void*) nextAlloc; - nextAlloc += size; - capacityRemaining -= size; memUsage += size; + memInfo.mem.reset(new uint8_t[size]); + memInfo.size = size; + void *ret = (void*) memInfo.mem.get(); + oob[ret] = memInfo; return ret; } - + void PoolAllocator::deallocate(void* p) { OutOfBandMap::iterator it = oob.find(p); diff --git a/utils/common/poolallocator.h b/utils/common/poolallocator.h index 448b3737e..1dd36951c 100644 --- a/utils/common/poolallocator.h +++ b/utils/common/poolallocator.h @@ -31,6 +31,7 @@ #include #include #include +#include namespace utils { @@ -40,18 +41,22 @@ class PoolAllocator public: static const unsigned DEFAULT_WINDOW_SIZE = 4096 * 40; // should be an integral # of pages - explicit PoolAllocator(unsigned windowSize = DEFAULT_WINDOW_SIZE, bool isTmpSpace = false) : + explicit PoolAllocator(unsigned windowSize = DEFAULT_WINDOW_SIZE, bool isTmpSpace = false, bool _useLock = false) : allocSize(windowSize), tmpSpace(isTmpSpace), capacityRemaining(0), memUsage(0), - nextAlloc(0) { } + nextAlloc(0), + useLock(_useLock), + lock(false) { } PoolAllocator(const PoolAllocator& p) : allocSize(p.allocSize), tmpSpace(p.tmpSpace), capacityRemaining(0), memUsage(0), - nextAlloc(0) { } + nextAlloc(0), + useLock(p.useLock), + lock(false) { } virtual ~PoolAllocator() {} PoolAllocator& operator=(const PoolAllocator&); @@ -69,8 +74,14 @@ public: return allocSize; } + void setUseLock(bool ul) + { + useLock = ul; + } + private: void newBlock(); + void *allocOOB(uint64_t size); unsigned allocSize; std::vector > mem; @@ -78,6 +89,8 @@ private: unsigned capacityRemaining; uint64_t memUsage; uint8_t* nextAlloc; + bool useLock; + boost::atomic lock; struct OOBMemInfo { @@ -88,6 +101,35 @@ private: OutOfBandMap oob; // for mem chunks bigger than the window size; these can be dealloc'd }; +inline void* PoolAllocator::allocate(uint64_t size) +{ + void *ret; + bool _false = false; + + if (useLock) + while (!lock.compare_exchange_weak(_false, true, boost::memory_order_acquire)) + _false = false; + + if (size > allocSize) + { + ret = allocOOB(size); + if (useLock) + lock.store(false, boost::memory_order_release); + return ret; + } + + if (size > capacityRemaining) + newBlock(); + + ret = (void*) nextAlloc; + nextAlloc += size; + capacityRemaining -= size; + memUsage += size; + if (useLock) + lock.store(false, boost::memory_order_release); + return ret; +} + } #endif diff --git a/utils/common/simpleallocator.h b/utils/common/simpleallocator.h index 9419a8f69..1dbd9c249 100644 --- a/utils/common/simpleallocator.h +++ b/utils/common/simpleallocator.h @@ -117,20 +117,20 @@ public: ~SimpleAllocator() throw() { } - pointer address(reference x) const + inline pointer address(reference x) const { return &x; } - const_pointer address(const_reference x) const + inline const_pointer address(const_reference x) const { return &x; } - pointer allocate(size_type n, const void* = 0) + inline pointer allocate(size_type n, const void* = 0) { return static_cast(fPool->allocate(n * sizeof(T))); } - void deallocate(pointer p, size_type n) + inline void deallocate(pointer p, size_type n) { fPool->deallocate(p, n * sizeof(T)); } @@ -142,21 +142,21 @@ public: return std::numeric_limits::max(); } #else - size_type max_size() const throw() + inline size_type max_size() const throw() { return fPool->max_size() / sizeof(T); } #endif - void construct(pointer ptr, const T& val) + inline void construct(pointer ptr, const T& val) { new ((void*)ptr) T(val); } - void destroy(pointer ptr) + inline void destroy(pointer ptr) { ptr->T::~T(); } - void setPool(SimplePool* pool) + inline void setPool(SimplePool* pool) { fPool = pool; } diff --git a/utils/common/stlpoolallocator.h b/utils/common/stlpoolallocator.h index 5d1eff71d..650d4e79f 100644 --- a/utils/common/stlpoolallocator.h +++ b/utils/common/stlpoolallocator.h @@ -83,7 +83,7 @@ public: void construct(pointer p, const T& val); void destroy(pointer p); - static const uint32_t DEFAULT_SIZE = 4096 * sizeof(T); + static const uint32_t DEFAULT_SIZE = 32768 * sizeof(T); boost::shared_ptr pa; }; diff --git a/utils/joiner/tuplejoiner.cpp b/utils/joiner/tuplejoiner.cpp index f8ac47b98..88bab4bb0 100644 --- a/utils/joiner/tuplejoiner.cpp +++ b/utils/joiner/tuplejoiner.cpp @@ -48,21 +48,21 @@ TupleJoiner::TupleJoiner( { if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE) { - STLPoolAllocator > alloc(64 * 1024 * 1024 + 1); + STLPoolAllocator > alloc; _pool = alloc.getPoolAllocator(); ld.reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc)); } else if (smallRG.usesStringTable()) { - STLPoolAllocator > alloc(64 * 1024 * 1024 + 1); + STLPoolAllocator > alloc; _pool = alloc.getPoolAllocator(); sth.reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc)); } else { - STLPoolAllocator > alloc(64 * 1024 * 1024 + 1); + STLPoolAllocator > alloc; _pool = alloc.getPoolAllocator(); h.reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc)); @@ -112,7 +112,7 @@ TupleJoiner::TupleJoiner( smallKeyColumns(smallJoinColumns), largeKeyColumns(largeJoinColumns), bSignedUnsignedJoin(false), uniqueLimit(100), finished(false) { - STLPoolAllocator > alloc(64 * 1024 * 1024 + 1); + STLPoolAllocator > alloc; _pool = alloc.getPoolAllocator(); ht.reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc)); @@ -1352,7 +1352,7 @@ void TupleJoiner::setTableName(const string& tname) void TupleJoiner::clearData() { - STLPoolAllocator > alloc(64 * 1024 * 1024 + 1); + STLPoolAllocator > alloc; _pool = alloc.getPoolAllocator(); if (typelessJoin) diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 373269e13..c6a9ea6cb 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -359,7 +359,7 @@ public: template void setUintField_offset(uint64_t val, uint32_t offset); inline void nextRow(uint32_t size); inline void prevRow(uint32_t size, uint64_t number); - + inline void setUintField(uint64_t val, uint32_t colIndex); template void setIntField(int64_t, uint32_t colIndex); inline void setIntField(int64_t, uint32_t colIndex); @@ -1803,7 +1803,8 @@ inline std::string StringStore::getString(uint64_t off) const if (off & 0x8000000000000000) { - off = off - 0x8000000000000000; + //off = off - 0x8000000000000000; + off &= ~0x8000000000000000; if (longStrings.size() <= off) return joblist::CPNULLSTRMARK; @@ -1842,7 +1843,8 @@ inline const uint8_t* StringStore::getPointer(uint64_t off) const if (off & 0x8000000000000000) { - off = off - 0x8000000000000000; + //off = off - 0x8000000000000000; + off &= ~0x8000000000000000; if (longStrings.size() <= off) return (const uint8_t*) joblist::CPNULLSTRMARK.c_str(); @@ -1910,10 +1912,10 @@ inline bool StringStore::equals(const std::string& str, uint64_t off) const if (off & 0x8000000000000000) { - if (longStrings.size() <= (off - 0x8000000000000000)) + if (longStrings.size() <= (off & ~0x8000000000000000)) return false; - mc = (MemChunk*) longStrings[off - 0x8000000000000000].get(); + mc = (MemChunk*) longStrings[off & ~0x8000000000000000].get(); memcpy(&length, mc->data, 4); @@ -1948,7 +1950,8 @@ inline uint32_t StringStore::getStringLength(uint64_t off) if (off & 0x8000000000000000) { - off = off - 0x8000000000000000; + //off = off - 0x8000000000000000; + off &= ~0x8000000000000000; if (longStrings.size() <= off) return 0; @@ -2015,4 +2018,3 @@ inline void RGData::getRow(uint32_t num, Row* row) #endif // vim:ts=4 sw=4: -