1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Squash merge of the threaded UM hash table construction feature.

Conflicts:
	oam/etc/Columnstore.xml.singleserver
This commit is contained in:
Patrick LeBlanc
2019-11-21 14:41:00 -05:00
parent 8c2cef3727
commit 0d26dc447c
11 changed files with 837 additions and 375 deletions

View File

@ -54,6 +54,7 @@ using namespace funcexp;
using namespace querytele;
#include "atomicops.h"
#include "spinlock.h"
namespace joblist
{
@ -73,8 +74,6 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
fTupleId2(-1),
fCorrelatedSide(0),
resourceManager(jobInfo.rm),
totalUMMemoryUsage(0),
rgDataSize(0),
runRan(false),
joinRan(false),
largeSideIndex(1),
@ -84,7 +83,8 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
fTokenJoin(-1),
fStatsMutexPtr(new boost::mutex()),
fFunctionJoinKeys(jobInfo.keyInfo->functionJoinKeys),
sessionMemLimit(jobInfo.umMemLimit)
sessionMemLimit(jobInfo.umMemLimit),
rgdLock(false)
{
/* Need to figure out how much memory these use...
Overhead storing 16 byte elements is about 32 bytes. That
@ -116,11 +116,13 @@ TupleHashJoinStep::TupleHashJoinStep(const JobInfo& jobInfo) :
else
allowDJS = false;
numCores = sysconf(_SC_NPROCESSORS_ONLN);
if (numCores <= 0)
numCores = 8;
/* Debugging, rand() is used to simulate failures
time_t t = time(NULL);
srand(t);
*/
}
TupleHashJoinStep::~TupleHashJoinStep()
@ -130,8 +132,8 @@ TupleHashJoinStep::~TupleHashJoinStep()
if (ownsOutputDL)
delete outputDL;
if (totalUMMemoryUsage != 0)
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
for (uint i = 0 ; i < smallDLs.size(); i++)
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
//cout << "deallocated THJS, UM memory available: " << resourceManager.availableMemory() << endl;
}
@ -200,140 +202,237 @@ void TupleHashJoinStep::join()
}
}
/* Index is which small input to read. */
void TupleHashJoinStep::smallRunnerFcn(uint32_t index)
// simple sol'n. Poll mem usage of Joiner once per second. Request mem
// increase after the fact. Failure to get mem will be detected and handled by
// the threads inserting into Joiner.
void TupleHashJoinStep::trackMem(uint index)
{
uint64_t i;
bool more, flippedUMSwitch = false, gotMem;
RGData oneRG;
//shared_array<uint8_t> oneRG;
Row r;
boost::shared_ptr<TupleJoiner> joiner = joiners[index];
ssize_t memBefore = 0, memAfter = 0;
bool gotMem;
boost::unique_lock<boost::mutex> scoped(memTrackMutex);
while (!stopMemTracking)
{
memAfter = joiner->getMemUsage();
if (memAfter != memBefore)
{
gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
memBefore = memAfter;
if (!gotMem)
return;
}
memTrackDone.timed_wait(scoped, boost::posix_time::seconds(1));
}
// one more iteration to capture mem usage since last poll, for this one
// raise an error if mem went over the limit
memAfter = joiner->getMemUsage();
if (memAfter == memBefore)
return;
gotMem = resourceManager->getMemory(memAfter - memBefore, sessionMemLimit, false);
atomicops::atomicAdd(&memUsedByEachJoin[index], memAfter - memBefore);
if (!gotMem)
{
if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
(tableOid() < 3000 && tableOid() >= 1000)))
{
joinIsTooBig = true;
fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now (monitor thread)" << endl;
abort();
}
}
}
void TupleHashJoinStep::startSmallRunners(uint index)
{
utils::setThreadName("HJSStartSmall");
string extendedInfo;
JoinType jt;
RowGroupDL* smallDL;
uint32_t smallIt;
RowGroup smallRG;
boost::shared_ptr<TupleJoiner> joiner;
string extendedInfo;
extendedInfo += toString(); // is each small side supposed to have the whole THJS info?
smallDL = smallDLs[index];
smallIt = smallIts[index];
smallRG = smallRGs[index];
jt = joinTypes[index];
extendedInfo += toString();
//cout << " smallRunner " << index << " sees jointype " << jt << " joinTypes has " << joinTypes.size()
// << " elements" << endl;
if (typelessJoin[index])
{
joiner.reset(new TupleJoiner(smallRG, largeRG, smallSideKeys[index],
largeSideKeys[index], jt));
joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index],
largeSideKeys[index], jt, &jobstepThreadPool));
}
else
{
joiner.reset(new TupleJoiner(smallRG, largeRG, smallSideKeys[index][0],
largeSideKeys[index][0], jt));
joiner.reset(new TupleJoiner(smallRGs[index], largeRG, smallSideKeys[index][0],
largeSideKeys[index][0], jt, &jobstepThreadPool));
}
joiner->setUniqueLimit(uniqueLimit);
joiner->setTableName(smallTableNames[index]);
joiners[index] = joiner;
/* check for join types unsupported on the PM. */
if (!largeBPS || !isExeMgr)
joiner->setInUM(rgData[index]);
/*
read the small side into a TupleJoiner
send the TupleJoiner to the large side TBPS
start the large TBPS
read the large side, write to the output
start the small runners
join them
check status
handle abort, out of memory, etc
*/
smallRG.initRow(&r);
// cout << "reading smallDL" << endl;
more = smallDL->next(smallIt, &oneRG);
boost::posix_time::ptime end_time, start_time =
boost::posix_time::microsec_clock::universal_time();
stopMemTracking = false;
uint64_t jobs[numCores];
uint64_t memMonitor = jobstepThreadPool.invoke([this, index] { this->trackMem(index); });
// starting 1 thread when in PM mode, since it's only inserting into a
// vector of rows. The rest will be started when converted to UM mode.
if (joiner->inUM())
for (int i = 0; i < numCores; i++)
jobs[i] = jobstepThreadPool.invoke([this, i, index, &jobs] { this->smallRunnerFcn(index, i, jobs); });
else
jobs[0] = jobstepThreadPool.invoke([this, index, &jobs] { this->smallRunnerFcn(index, 0, jobs); });
// wait for the first thread to join, then decide whether the others exist and need joining
jobstepThreadPool.join(jobs[0]);
if (joiner->inUM())
for (int i = 1; i < numCores; i++)
jobstepThreadPool.join(jobs[i]);
// stop the monitor thread
memTrackMutex.lock();
stopMemTracking = true;
memTrackDone.notify_one();
memTrackMutex.unlock();
jobstepThreadPool.join(memMonitor);
/* If there was an error or an abort, drain the input DL,
do endOfInput on the output */
if (cancelled())
{
// cout << "HJ stopping... status is " << status() << endl;
if (largeBPS)
largeBPS->abort();
bool more = true;
RGData oneRG;
while (more)
more = smallDLs[index]->next(smallIts[index], &oneRG);
}
//joiner->doneInserting();
end_time = boost::posix_time::microsec_clock::universal_time();
if (!(fSessionId & 0x80000000))
cout << "hash table construction time = " << end_time - start_time <<
" size = " << joiner->size() << endl;
extendedInfo += "\n";
ostringstream oss;
if (joiner->inPM())
{
oss << "PM join (" << index << ")" << endl;
#ifdef JLF_DEBUG
cout << oss.str();
#endif
extendedInfo += oss.str();
}
else if (joiner->inUM() && !joiner->onDisk())
{
oss << "UM join (" << index << ")" << endl;
#ifdef JLF_DEBUG
cout << oss.str();
#endif
extendedInfo += oss.str();
}
/* Trying to get the extended info to match the original version
It's kind of kludgey at the moment, need to clean it up at some point */
if (!joiner->onDisk())
{
joiner->doneInserting();
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
fExtendedInfo += extendedInfo;
formatMiniStats(index);
}
}
/* Index is which small input to read. */
void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t *jobs)
{
utils::setThreadName("HJSmallRunner");
bool more = true;
RGData oneRG;
Row r;
RowGroupDL* smallDL;
uint32_t smallIt;
RowGroup smallRG;
boost::shared_ptr<TupleJoiner> joiner = joiners[index];
smallDL = smallDLs[index];
smallIt = smallIts[index];
smallRG = smallRGs[index];
smallRG.initRow(&r);
try
{
/* check for join types unsupported on the PM. */
if (!largeBPS || !isExeMgr)
{
flippedUMSwitch = true;
oss << "UM join (" << index << ")";
#ifdef JLF_DEBUG
cout << oss.str() << endl;
#endif
extendedInfo += oss.str();
joiner->setInUM();
}
resourceManager->getMemory(joiner->getMemUsage(), sessionMemLimit, false);
(void)atomicops::atomicAdd(&totalUMMemoryUsage, joiner->getMemUsage());
memUsedByEachJoin[index] += joiner->getMemUsage();
ssize_t rgSize;
bool gotMem;
goto next;
while (more && !cancelled())
{
uint64_t memUseBefore, memUseAfter;
smallRG.setData(&oneRG);
if (smallRG.getRowCount() == 0)
goto next;
smallRG.getRow(0, &r);
memUseBefore = joiner->getMemUsage() + rgDataSize;
// TupleHJ owns the row memory
utils::getSpinlock(rgdLock);
rgData[index].push_back(oneRG);
rgDataSize += smallRG.getSizeWithStrings();
utils::releaseSpinlock(rgdLock);
for (i = 0; i < smallRG.getRowCount(); i++, r.nextRow())
rgSize = smallRG.getSizeWithStrings();
atomicops::atomicAdd(&memUsedByEachJoin[index], rgSize);
gotMem = resourceManager->getMemory(rgSize, sessionMemLimit, false);
if (!gotMem)
{
//cout << "inserting " << r.toString() << endl;
joiner->insert(r);
}
memUseAfter = joiner->getMemUsage() + rgDataSize;
if (UNLIKELY(!flippedUMSwitch && (memUseAfter >= pmMemLimit)))
{
flippedUMSwitch = true;
oss << "UM join (" << index << ") ";
#ifdef JLF_DEBUG
cout << oss.str() << endl;
#endif
extendedInfo += oss.str();
joiner->setInUM();
memUseAfter = joiner->getMemUsage() + rgDataSize;
}
gotMem = resourceManager->getMemory(memUseAfter - memUseBefore, sessionMemLimit, false);
atomicops::atomicAdd(&totalUMMemoryUsage, memUseAfter - memUseBefore);
memUsedByEachJoin[index] += memUseAfter - memUseBefore;
/* This is kind of kludgy and overlaps with segreateJoiners() atm.
If this join won't be converted to disk-based, this fcn needs to abort the same as
it did before. If it will be converted, it should just return. */
if (UNLIKELY(!gotMem))
{
if (isDML || !allowDJS || (fSessionId & 0x80000000) ||
(tableOid() < 3000 && tableOid() >= 1000))
boost::unique_lock<boost::mutex> sl(saneErrMsg);
if (!joinIsTooBig && (isDML || !allowDJS || (fSessionId & 0x80000000) ||
(tableOid() < 3000 && tableOid() >= 1000)))
{
joinIsTooBig = true;
fLogger->logMessage(logging::LOG_TYPE_INFO, logging::ERR_JOIN_TOO_BIG);
errorMessage(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_TOO_BIG));
status(logging::ERR_JOIN_TOO_BIG);
cout << "Join is too big, raise the UM join limit for now" << endl;
cout << "Join is too big, raise the UM join limit for now (small runner)" << endl;
abort();
break;
}
else
{
joiner->setConvertToDiskJoin();
return;
}
}
joiner->insertRGData(smallRG, threadID);
if (!joiner->inUM() && (memUsedByEachJoin[index] > pmMemLimit))
{
joiner->setInUM(rgData[index]);
for (int i = 1; i < numCores; i++)
jobs[i] = jobstepThreadPool.invoke([this, i, index, jobs]
{ this->smallRunnerFcn(index, i, jobs); });
}
next:
// cout << "inserted one rg into the joiner, rowcount = " <<
// smallRG.getRowCount() << endl;
dlMutex.lock();
more = smallDL->next(smallIt, &oneRG);
dlMutex.unlock();
}
}
catch (boost::exception& e)
@ -356,36 +455,10 @@ next:
status(logging::ERR_EXEMGR_MALFUNCTION);
}
if (!flippedUMSwitch && !cancelled())
{
oss << "PM join (" << index << ")";
#ifdef JLF_DEBUG
cout << oss.str() << endl;
#endif
extendedInfo += oss.str();
if (!joiner->inUM())
joiner->setInPM();
}
/* If there was an error or an abort drain the input DL,
do endOfInput on the output */
if (cancelled())
{
// cout << "HJ stopping... status is " << status() << endl;
if (largeBPS)
largeBPS->abort();
while (more)
more = smallDL->next(smallIt, &oneRG);
}
joiner->doneInserting();
extendedInfo += "\n";
boost::mutex::scoped_lock lk(*fStatsMutexPtr);
fExtendedInfo += extendedInfo;
formatMiniStats(index);
}
void TupleHashJoinStep::forwardCPData()
{
uint32_t i, col;
@ -517,24 +590,6 @@ void TupleHashJoinStep::djsReaderFcn(int index)
processFE2(l_outputRG, l_fe2RG, fe2InRow, fe2OutRow, &v_rgData, &l_fe);
processDupList(0, (fe2 ? l_fe2RG : l_outputRG), &v_rgData);
#if 0
if (!v_rgData.empty() && fSessionId < 0x80000000)
{
if (fe2)
{
l_fe2RG.setData(&v_rgData[0]);
cout << "fully processed rowgroup: " << l_fe2RG.toString() << endl;
}
else
{
l_outputRG.setData(&v_rgData[0]);
cout << "fully processed rowgroup: " << l_outputRG.toString() << endl;
}
}
cout << "ownsOutputDL = " << (int) ownsOutputDL << " fDelivery = " << (int) fDelivery << endl;
#endif
sendResult(v_rgData);
}
@ -553,6 +608,7 @@ void TupleHashJoinStep::djsReaderFcn(int index)
void TupleHashJoinStep::hjRunner()
{
uint32_t i;
std::vector<uint64_t> smallRunners; // thread handles from thread pool
if (cancelled())
{
@ -581,7 +637,7 @@ void TupleHashJoinStep::hjRunner()
/* Start the small-side runners */
rgData.reset(new vector<RGData>[smallDLs.size()]);
memUsedByEachJoin.reset(new uint64_t[smallDLs.size()]);
memUsedByEachJoin.reset(new ssize_t[smallDLs.size()]);
for (i = 0; i < smallDLs.size(); i++)
memUsedByEachJoin[i] = 0;
@ -680,7 +736,7 @@ void TupleHashJoinStep::hjRunner()
{
vector<RGData> empty;
resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit);
atomicops::atomicSub(&totalUMMemoryUsage, memUsedByEachJoin[djsJoinerMap[i]]);
memUsedByEachJoin[djsJoinerMap[i]] = 0;
djs[i].loadExistingData(rgData[djsJoinerMap[i]]);
rgData[djsJoinerMap[i]].swap(empty);
}
@ -792,8 +848,11 @@ void TupleHashJoinStep::hjRunner()
joiners.clear();
tbpsJoiners.clear();
rgData.reset();
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
totalUMMemoryUsage = 0;
for (uint i = 0; i < smallDLs.size(); i++)
{
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
memUsedByEachJoin[i] = 0;
}
}
}
}
@ -840,7 +899,7 @@ void TupleHashJoinStep::hjRunner()
#ifdef JLF_DEBUG
cout << "moving join " << i << " to UM (PM join can't follow a UM join)\n";
#endif
tbpsJoiners[i]->setInUM();
tbpsJoiners[i]->setInUM(rgData[i]);
}
}
@ -880,12 +939,10 @@ void TupleHashJoinStep::hjRunner()
}
#ifdef JLF_DEBUG
if (runFE2onPM)
cout << "PM runs FE2\n";
else
cout << "UM runs FE2\n";
#endif
largeBPS->setFcnExpGroup2(fe2, fe2Output, runFE2onPM);
}
@ -971,8 +1028,11 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
joiners.clear();
rgData.reset();
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
totalUMMemoryUsage = 0;
for (uint i = 0; i < smallDLs.size(); i++)
{
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
memUsedByEachJoin[i] = 0;
}
return 0;
}
@ -992,8 +1052,11 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
cout << " -- returning error status " << deliveredRG->getStatus() << endl;
deliveredRG->serializeRGData(bs);
resourceManager->returnMemory(totalUMMemoryUsage, sessionMemLimit);
totalUMMemoryUsage = 0;
for (uint i = 0; i < smallDLs.size(); i++)
{
resourceManager->returnMemory(memUsedByEachJoin[i], sessionMemLimit);
memUsedByEachJoin[i] = 0;
}
return 0;
}
@ -1850,7 +1913,8 @@ void TupleHashJoinStep::segregateJoiners()
return;
}
/* Debugging code, this makes all eligible joins disk-based.
#if 0
// Debugging code, this makes all eligible joins disk-based.
else {
cout << "making all joins disk-based" << endl;
for (i = 0; i < smallSideCount; i++) {
@ -1860,7 +1924,7 @@ void TupleHashJoinStep::segregateJoiners()
}
return;
}
*/
#endif
/* For now if there is no largeBPS all joins need to either be DJS or not, not mixed */
if (!largeBPS)

View File

@ -75,6 +75,8 @@ public:
void tableOid1(execplan::CalpontSystemCatalog::OID tableOid1)
{
fTableOID1 = tableOid1;
if (fTableOID1 < 3000)
numCores = 1; // syscat query, no need for more than 1 thread
}
void tableOid2(execplan::CalpontSystemCatalog::OID tableOid2)
{
@ -425,7 +427,6 @@ private:
std::vector<std::vector<uint32_t> > smallSideKeys;
ResourceManager* resourceManager;
volatile uint64_t totalUMMemoryUsage;
struct JoinerSorter
{
@ -436,16 +437,14 @@ private:
}
};
std::vector<boost::shared_ptr<joiner::TupleJoiner> > joiners;
boost::scoped_array<std::vector<rowgroup::RGData> > rgData;
TupleBPS* largeBPS;
rowgroup::RowGroup largeRG, outputRG;
std::vector<rowgroup::RowGroup> smallRGs;
uint64_t pmMemLimit;
uint64_t rgDataSize;
ssize_t pmMemLimit;
void hjRunner();
void smallRunnerFcn(uint32_t index);
void smallRunnerFcn(uint32_t index, uint threadID, uint64_t *threads);
struct HJRunner
{
@ -462,15 +461,13 @@ private:
SmallRunner(TupleHashJoinStep* hj, uint32_t i) : HJ(hj), index(i) { }
void operator()()
{
utils::setThreadName("HJSSmallSide");
HJ->smallRunnerFcn(index);
HJ->startSmallRunners(index);
}
TupleHashJoinStep* HJ;
uint32_t index;
};
int64_t mainRunner; // thread handle from thread pool
std::vector<uint64_t> smallRunners; // thread handles from thread pool
// for notify TupleAggregateStep PM hashjoin
// Ideally, hashjoin and delivery communicate with RowGroupDL,
@ -614,10 +611,19 @@ private:
std::vector<boost::shared_ptr<joiner::TupleJoiner> > tbpsJoiners;
std::vector<boost::shared_ptr<joiner::TupleJoiner> > djsJoiners;
std::vector<int> djsJoinerMap;
boost::scoped_array<uint64_t> memUsedByEachJoin;
boost::scoped_array<ssize_t> memUsedByEachJoin;
boost::mutex djsLock;
boost::shared_ptr<int64_t> sessionMemLimit;
/* Threaded UM join support */
int numCores;
boost::mutex dlMutex, memTrackMutex, saneErrMsg;
boost::condition memTrackDone;
std::atomic<bool> rgdLock;
bool stopMemTracking;
void trackMem(uint index);
void startSmallRunners(uint index);
friend class DiskJoinStep;
};

View File

@ -1359,7 +1359,7 @@ void cleanTempDir()
return;
if (tmpPrefix.empty())
tmpPrefix = "/tmp/infinidb";
tmpPrefix = "/tmp/cs-diskjoin";
tmpPrefix += "/";
@ -1636,4 +1636,3 @@ int main(int argc, char* argv[])
return 0;
}
// vim:ts=4 sw=4:

View File

@ -495,7 +495,7 @@
<!-- Be careful modifying TempFilePath! On start, ExeMgr deletes
the entire directory and recreates it to make sure no
files are left behind.
<TempFilePath>/tmp</TempFilePath> -->
<TempFilePath>/tmp/cs-diskjoin</TempFilePath> -->
<TempFileCompression>Y</TempFileCompression>
</HashJoin>
<JobList>

View File

@ -241,6 +241,7 @@
<TempSaveSize>128M</TempSaveSize> <!-- default SWSDL max element save size -->
<WaitPeriod>10</WaitPeriod> <!-- in seconds -->
<TempFileDir>/tmp/columnstore_tmp_files</TempFileDir>
<CalpontHome>$INSTALLDIR</CalpontHome>
<ModuleHeartbeatPeriod>10</ModuleHeartbeatPeriod>
<ModuleHeartbeatCount>3</ModuleHeartbeatCount>
<ProcessRestartCount>10</ProcessRestartCount>
@ -486,7 +487,7 @@
<!-- Be careful modifying TempFilePath! On start, ExeMgr deletes
the entire directory and recreates it to make sure no
files are left behind. -->
<TempFilePath>/var/lib/columnstore/tmp</TempFilePath>
<TempFilePath>/var/lib/columnstore/tmp/cs-diskjoin</TempFilePath>
<TempFileCompression>Y</TempFileCompression>
</HashJoin>
<JobList>

View File

@ -50,6 +50,9 @@ FixedAllocator::FixedAllocator(const FixedAllocator& f)
tmpSpace = f.tmpSpace;
capacityRemaining = 0;
currentlyStored = 0;
useLock = f.useLock;
lock = false;
}
FixedAllocator& FixedAllocator::operator=(const FixedAllocator& f)
@ -57,10 +60,22 @@ FixedAllocator& FixedAllocator::operator=(const FixedAllocator& f)
elementCount = f.elementCount;
elementSize = f.elementSize;
tmpSpace = f.tmpSpace;
useLock = f.useLock;
lock = false;
deallocateAll();
return *this;
}
void FixedAllocator::setUseLock(bool useIt)
{
useLock = useIt;
}
void FixedAllocator::setAllocSize(uint allocSize)
{
elementSize = allocSize;
}
void FixedAllocator::newBlock()
{
shared_array<uint8_t> next;
@ -80,39 +95,15 @@ void FixedAllocator::newBlock()
}
}
void* FixedAllocator::allocate()
{
void* ret;
if (capacityRemaining < elementSize)
newBlock();
ret = nextAlloc;
nextAlloc += elementSize;
capacityRemaining -= elementSize;
currentlyStored += elementSize;
return ret;
}
void* FixedAllocator::allocate(uint32_t len)
{
void* ret;
if (capacityRemaining < len)
newBlock();
ret = nextAlloc;
nextAlloc += len;
capacityRemaining -= len;
currentlyStored += len;
return ret;
}
void FixedAllocator::truncateBy(uint32_t amt)
{
if (useLock)
getSpinlock(lock);
nextAlloc -= amt;
capacityRemaining += amt;
currentlyStored -= amt;
if (useLock)
releaseSpinlock(lock);
}
void FixedAllocator::deallocateAll()

View File

@ -38,6 +38,8 @@
#include <vector>
#include <limits>
#include <unistd.h>
#include <atomic>
#include "spinlock.h"
#if defined(_MSC_VER) && defined(xxxFIXEDALLOCATOR_DLLEXPORT)
#define EXPORT __declspec(dllexport)
@ -55,11 +57,13 @@ public:
EXPORT FixedAllocator() :
capacityRemaining(0),
elementCount(std::numeric_limits<unsigned long>::max()),
elementCount(DEFAULT_NUM_ELEMENTS),
elementSize(0),
currentlyStored(0),
tmpSpace(false),
nextAlloc(0) {}
nextAlloc(0),
useLock(false),
lock(false) {}
EXPORT explicit FixedAllocator(unsigned long allocSize, bool isTmpSpace = false,
unsigned long numElements = DEFAULT_NUM_ELEMENTS) :
capacityRemaining(0),
@ -67,7 +71,9 @@ public:
elementSize(allocSize),
currentlyStored(0),
tmpSpace(isTmpSpace),
nextAlloc(0) {}
nextAlloc(0),
useLock(false),
lock(false) {}
EXPORT FixedAllocator(const FixedAllocator&);
EXPORT FixedAllocator& operator=(const FixedAllocator&);
virtual ~FixedAllocator() {}
@ -78,6 +84,8 @@ public:
void deallocate() { } // does nothing
EXPORT void deallocateAll(); // drops all memory in use
EXPORT uint64_t getMemUsage() const;
void setUseLock(bool);
void setAllocSize(uint);
private:
void newBlock();
@ -89,10 +97,46 @@ private:
uint64_t currentlyStored;
bool tmpSpace;
uint8_t* nextAlloc;
bool useLock;
std::atomic<bool> lock;
};
inline void* FixedAllocator::allocate()
{
void* ret;
if (useLock)
getSpinlock(lock);
if (capacityRemaining < elementSize)
newBlock();
ret = nextAlloc;
nextAlloc += elementSize;
capacityRemaining -= elementSize;
currentlyStored += elementSize;
if (useLock)
releaseSpinlock(lock);
return ret;
}
inline void* FixedAllocator::allocate(uint32_t len)
{
void* ret;
if (useLock)
getSpinlock(lock);
if (capacityRemaining < len)
newBlock();
ret = nextAlloc;
nextAlloc += len;
capacityRemaining -= len;
currentlyStored += len;
if (useLock)
releaseSpinlock(lock);
return ret;
}
#undef EXPORT
} // namespace
#endif

27
utils/common/spinlock.h Normal file
View File

@ -0,0 +1,27 @@
#pragma once
#include <atomic>
namespace utils
{
inline void getSpinlock(std::atomic<bool> &lock)
{
bool _false = false;
while (!lock.compare_exchange_weak(_false, true, std::memory_order_acquire))
_false = false;
}
inline bool trySpinlock(std::atomic<bool> &lock)
{
bool _false = false;
bool ret = lock.compare_exchange_weak(_false, true, std::memory_order_acquire);
return ret;
}
inline void releaseSpinlock(std::atomic<bool> &lock)
{
lock.store(false, std::memory_order_release);
}
}

View File

@ -27,6 +27,7 @@
#endif
#include "hasher.h"
#include "lbidlist.h"
#include "spinlock.h"
using namespace std;
using namespace rowgroup;
@ -42,30 +43,49 @@ TupleJoiner::TupleJoiner(
const rowgroup::RowGroup& largeInput,
uint32_t smallJoinColumn,
uint32_t largeJoinColumn,
JoinType jt) :
JoinType jt,
threadpool::ThreadPool *jsThreadPool) :
smallRG(smallInput), largeRG(largeInput), joinAlg(INSERTING), joinType(jt),
threadCount(1), typelessJoin(false), bSignedUnsignedJoin(false), uniqueLimit(100), finished(false)
threadCount(1), typelessJoin(false), bSignedUnsignedJoin(false), uniqueLimit(100), finished(false),
jobstepThreadPool(jsThreadPool), _convertToDiskJoin(false)
{
uint i;
getBucketCount();
m_bucketLocks.reset(new boost::mutex[bucketCount]);
if (smallRG.getColTypes()[smallJoinColumn] == CalpontSystemCatalog::LONGDOUBLE)
{
ld.reset(new boost::scoped_ptr<ldhash_t>[bucketCount]);
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++)
{
STLPoolAllocator<pair<const long double, Row::Pointer> > alloc;
_pool = alloc.getPoolAllocator();
ld.reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
_pool[i] = alloc.getPoolAllocator();
ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
}
}
else if (smallRG.usesStringTable())
{
sth.reset(new boost::scoped_ptr<sthash_t>[bucketCount]);
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++)
{
STLPoolAllocator<pair<const int64_t, Row::Pointer> > alloc;
_pool = alloc.getPoolAllocator();
sth.reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
_pool[i] = alloc.getPoolAllocator();
sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
}
}
else
{
h.reset(new boost::scoped_ptr<hash_t>[bucketCount]);
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
for (i = 0; i < bucketCount; i++)
{
STLPoolAllocator<pair<const int64_t, uint8_t*> > alloc;
_pool = alloc.getPoolAllocator();
h.reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
_pool[i] = alloc.getPoolAllocator();
h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
}
}
smallRG.initRow(&smallNullRow);
@ -106,16 +126,28 @@ TupleJoiner::TupleJoiner(
const rowgroup::RowGroup& largeInput,
const vector<uint32_t>& smallJoinColumns,
const vector<uint32_t>& largeJoinColumns,
JoinType jt) :
JoinType jt,
threadpool::ThreadPool *jsThreadPool) :
smallRG(smallInput), largeRG(largeInput), joinAlg(INSERTING),
joinType(jt), threadCount(1), typelessJoin(true),
smallKeyColumns(smallJoinColumns), largeKeyColumns(largeJoinColumns),
bSignedUnsignedJoin(false), uniqueLimit(100), finished(false)
bSignedUnsignedJoin(false), uniqueLimit(100), finished(false),
jobstepThreadPool(jsThreadPool), _convertToDiskJoin(false)
{
uint i;
getBucketCount();
_pool.reset(new boost::shared_ptr<PoolAllocator>[bucketCount]);
ht.reset(new boost::scoped_ptr<typelesshash_t>[bucketCount]);
for (i = 0; i < bucketCount; i++)
{
STLPoolAllocator<pair<const TypelessData, Row::Pointer> > alloc;
_pool = alloc.getPoolAllocator();
_pool[i] = alloc.getPoolAllocator();
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
}
m_bucketLocks.reset(new boost::mutex[bucketCount]);
ht.reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
smallRG.initRow(&smallNullRow);
if (smallOuterJoin() || largeOuterJoin() || semiJoin() || antiJoin())
@ -126,7 +158,7 @@ TupleJoiner::TupleJoiner(
smallNullRow.initToNull();
}
for (uint32_t i = keyLength = 0; i < smallKeyColumns.size(); i++)
for (i = keyLength = 0; i < smallKeyColumns.size(); i++)
{
if (smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::CHAR ||
smallRG.getColTypes()[smallKeyColumns[i]] == CalpontSystemCatalog::VARCHAR
@ -155,12 +187,16 @@ TupleJoiner::TupleJoiner(
}
}
storedKeyAlloc = FixedAllocator(keyLength);
// note, 'numcores' is implied by tuplehashjoin on calls to insertRGData().
// TODO: make it explicit to avoid future confusion.
storedKeyAlloc.reset(new FixedAllocator[numCores]);
for (i = 0; i < (uint) numCores; i++)
storedKeyAlloc[i].setAllocSize(keyLength);
discreteValues.reset(new bool[smallKeyColumns.size()]);
cpValues.reset(new vector<int64_t>[smallKeyColumns.size()]);
for (uint32_t i = 0; i < smallKeyColumns.size(); i++)
for (i = 0; i < smallKeyColumns.size(); i++)
{
discreteValues[i] = false;
if (isUnsigned(smallRG.getColTypes()[smallKeyColumns[i]]))
@ -199,6 +235,166 @@ bool TupleJoiner::operator<(const TupleJoiner& tj) const
return size() < tj.size();
}
void TupleJoiner::getBucketCount()
{
// get the # of cores, round up to nearest power of 2
// make the bucket mask
numCores = sysconf(_SC_NPROCESSORS_ONLN);
if (numCores <= 0)
numCores = 8;
bucketCount = (numCores == 1 ? 1 : (1 << (32 - __builtin_clz(numCores - 1))));
bucketMask = bucketCount - 1;
}
template<typename buckets_t, typename hash_table_t>
void TupleJoiner::bucketsToTables(buckets_t *buckets, hash_table_t *tables)
{
uint i;
bool done = false, wasProductive;
while (!done)
{
done = true;
wasProductive = false;
for (i = 0; i < bucketCount; i++)
{
if (buckets[i].empty())
continue;
bool gotIt = m_bucketLocks[i].try_lock();
if (!gotIt)
{
done = false;
continue;
}
for (auto &element : buckets[i])
tables[i]->insert(element);
m_bucketLocks[i].unlock();
wasProductive = true;
buckets[i].clear();
}
if (!done && !wasProductive)
::usleep(1000 * numCores);
}
}
void TupleJoiner::um_insertTypeless(uint threadID, uint rowCount, Row &r)
{
TypelessData td[rowCount];
vector<pair<TypelessData, Row::Pointer> > v[bucketCount];
uint i;
FixedAllocator *alloc = &storedKeyAlloc[threadID];
for (i = 0; i < rowCount; i++, r.nextRow())
{
td[i] = makeTypelessKey(r, smallKeyColumns, keyLength, alloc,
largeRG, largeKeyColumns);
if (td[i].len == 0)
continue;
uint bucket = bucketPicker((char *) td[i].data, td[i].len, bpSeed) & bucketMask;
v[bucket].push_back(pair<TypelessData, Row::Pointer>(td[i], r.getPointer()));
}
bucketsToTables(&v[0], ht.get());
}
void TupleJoiner::um_insertLongDouble(uint rowCount, Row &r)
{
vector<pair<long double, Row::Pointer> > v[bucketCount];
uint i;
uint smallKeyColumn = smallKeyColumns[0];
for (i = 0; i < rowCount; i++, r.nextRow())
{
long double smallKey = r.getLongDoubleField(smallKeyColumn);
uint bucket = bucketPicker((char *) &smallKey, 10, bpSeed) & bucketMask; // change if we decide to support windows again
if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL))
v[bucket].push_back(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
else
v[bucket].push_back(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
}
bucketsToTables(&v[0], ld.get());
}
void TupleJoiner::um_insertInlineRows(uint rowCount, Row &r)
{
uint i;
int64_t smallKey;
vector<pair<int64_t, uint8_t *> > v[bucketCount];
uint smallKeyColumn = smallKeyColumns[0];
for (i = 0; i < rowCount; i++, r.nextRow())
{
if (!r.isUnsigned(smallKeyColumn))
smallKey = r.getIntField(smallKeyColumn);
else
smallKey = (int64_t) r.getUintField(smallKeyColumn);
uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask;
if (UNLIKELY(smallKey == nullValueForJoinColumn))
v[bucket].push_back(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData()));
else
v[bucket].push_back(pair<int64_t, uint8_t*>(smallKey, r.getData()));
}
bucketsToTables(&v[0], h.get());
}
void TupleJoiner::um_insertStringTable(uint rowCount, Row &r)
{
int64_t smallKey;
uint i;
vector<pair<int64_t, Row::Pointer> > v[bucketCount];
uint smallKeyColumn = smallKeyColumns[0];
for (i = 0; i < rowCount; i++, r.nextRow())
{
if (!r.isUnsigned(smallKeyColumn))
smallKey = r.getIntField(smallKeyColumn);
else
smallKey = (int64_t) r.getUintField(smallKeyColumn);
uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask;
if (UNLIKELY(smallKey == nullValueForJoinColumn))
v[bucket].push_back(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer()));
else
v[bucket].push_back(pair<int64_t, Row::Pointer>(smallKey, r.getPointer()));
}
bucketsToTables(&v[0], sth.get());
}
void TupleJoiner::insertRGData(RowGroup &rg, uint threadID)
{
uint i, rowCount;
Row r;
rg.initRow(&r);
rowCount = rg.getRowCount();
rg.getRow(0, &r);
m_cpValuesLock.lock();
for (i = 0; i < rowCount; i++, r.nextRow())
{
updateCPData(r);
r.zeroRid();
}
m_cpValuesLock.unlock();
rg.getRow(0, &r);
if (joinAlg == UM)
{
if (typelessJoin)
um_insertTypeless(threadID, rowCount, r);
else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE)
um_insertLongDouble(rowCount, r);
else if (!smallRG.usesStringTable())
um_insertInlineRows(rowCount, r);
else
um_insertStringTable(rowCount, r);
}
else
{
// while in PM-join mode, inserting is single-threaded
for (i = 0; i < rowCount; i++, r.nextRow())
rows.push_back(r.getPointer());
}
}
void TupleJoiner::insert(Row& r, bool zeroTheRid)
{
/* when doing a disk-based join, only the first iteration on the large side
@ -212,57 +408,61 @@ void TupleJoiner::insert(Row& r, bool zeroTheRid)
{
if (typelessJoin)
{
TypelessData td = makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc,
TypelessData td = makeTypelessKey(r, smallKeyColumns, keyLength, &storedKeyAlloc[0],
largeRG, largeKeyColumns);
if (td.len > 0)
{
ht->insert(pair<TypelessData, Row::Pointer>(td, r.getPointer()));
uint bucket = bucketPicker((char *) td.data, td.len, bpSeed) & bucketMask;
ht[bucket]->insert(pair<TypelessData, Row::Pointer>(td, r.getPointer()));
}
}
else if (r.getColType(smallKeyColumns[0]) == execplan::CalpontSystemCatalog::LONGDOUBLE)
{
long double smallKey = r.getLongDoubleField(smallKeyColumns[0]);
uint bucket = bucketPicker((char *) &smallKey, 10, bpSeed) & bucketMask; // change if we decide to support windows again
if (UNLIKELY(smallKey == joblist::LONGDOUBLENULL))
ld->insert(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
ld[bucket]->insert(pair<long double, Row::Pointer>(joblist::LONGDOUBLENULL, r.getPointer()));
else
ld->insert(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
ld[bucket]->insert(pair<long double, Row::Pointer>(smallKey, r.getPointer()));
}
else if (!smallRG.usesStringTable())
{
int64_t smallKey;
if (r.isUnsigned(smallKeyColumns[0]))
smallKey = (int64_t)(r.getUintField(smallKeyColumns[0]));
else
if (!r.isUnsigned(smallKeyColumns[0]))
smallKey = r.getIntField(smallKeyColumns[0]);
if (UNLIKELY(smallKey == nullValueForJoinColumn))
h->insert(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData()));
else
h->insert(pair<int64_t, uint8_t*>(smallKey, r.getData())); // Normal path for integers
smallKey = (int64_t) r.getUintField(smallKeyColumns[0]);
uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask;
if (UNLIKELY(smallKey == nullValueForJoinColumn))
h[bucket]->insert(pair<int64_t, uint8_t*>(getJoinNullValue(), r.getData()));
else
h[bucket]->insert(pair<int64_t, uint8_t*>(smallKey, r.getData())); // Normal path for integers
}
else
{
int64_t smallKey = r.getIntField(smallKeyColumns[0]);
int64_t smallKey;
if (!r.isUnsigned(smallKeyColumns[0]))
smallKey = r.getIntField(smallKeyColumns[0]);
else
smallKey = (int64_t) r.getUintField(smallKeyColumns[0]);
uint bucket = bucketPicker((char *) &smallKey, sizeof(smallKey), bpSeed) & bucketMask;
if (UNLIKELY(smallKey == nullValueForJoinColumn))
sth->insert(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer()));
sth[bucket]->insert(pair<int64_t, Row::Pointer>(getJoinNullValue(), r.getPointer()));
else
sth->insert(pair<int64_t, Row::Pointer>(smallKey, r.getPointer()));
sth[bucket]->insert(pair<int64_t, Row::Pointer>(smallKey, r.getPointer()));
}
}
else
{
rows.push_back(r.getPointer());
}
}
void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uint32_t threadID,
vector<Row::Pointer>* matches)
{
uint32_t i;
bool isNull = hasNullJoinColumn(largeSideRow);
matches->clear();
if (inPM())
@ -286,19 +486,14 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
pair<thIterator, thIterator> range;
largeKey = makeTypelessKey(largeSideRow, largeKeyColumns, keyLength, &tmpKeyAlloc[threadID], smallRG, smallKeyColumns);
if (largeKey.len > 0)
{
it = ht->find(largeKey);
}
else
{
return;
}
if (it == ht->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
if (largeKey.len == 0)
return;
range = ht->equal_range(largeKey);
uint bucket = bucketPicker((char *) largeKey.data, largeKey.len, bpSeed) & bucketMask;
range = ht[bucket]->equal_range(largeKey);
if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
@ -313,13 +508,11 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
Row r;
largeKey = largeSideRow.getLongDoubleField(largeKeyColumns[0]);
it = ld->find(largeKey);
uint bucket = bucketPicker((char *) &largeKey, 10, bpSeed) & bucketMask;
range = ld[bucket]->equal_range(largeKey);
if (it == ld->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = ld->equal_range(largeKey);
for (; range.first != range.second; ++range.first)
{
matches->push_back(range.first->second);
@ -328,9 +521,6 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
else if (!smallRG.usesStringTable())
{
int64_t largeKey;
iterator it;
pair<iterator, iterator> range;
Row r;
if (largeSideRow.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
{
@ -348,64 +538,41 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
if (ld)
{
// Compare against long double
ldIterator it;
pair<ldIterator, ldIterator> range;
long double ldKey = largeKey;
// ldKey = 6920;
it = ld->find(ldKey);
uint bucket = bucketPicker((char *) &ldKey, 10, bpSeed) & bucketMask;
auto range = ld[bucket]->equal_range(ldKey);
if (it == ld->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = ld->equal_range(ldKey);
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
}
else
{
uint bucket = bucketPicker((char *) &largeKey, sizeof(largeKey), bpSeed) & bucketMask;
auto range = h[bucket]->equal_range(largeKey);
if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
for (; range.first != range.second; ++range.first)
{
matches->push_back(range.first->second);
}
}
else
{
it = h->find(largeKey);
int64_t largeKey = largeSideRow.getIntField(largeKeyColumns[0]);
uint bucket = bucketPicker((char *) &largeKey, sizeof(largeKey), bpSeed) & bucketMask;
auto range = sth[bucket]->equal_range(largeKey);
if (it == h->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
if (range.first == range.second && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = h->equal_range(largeKey);
//smallRG.initRow(&r);
for (; range.first != range.second; ++range.first)
{
//r.setData(range.first->second);
//cerr << "matched small side row: " << r.toString() << endl;
matches->push_back(range.first->second);
}
}
}
else
{
int64_t largeKey;
sthash_t::iterator it;
pair<sthash_t::iterator, sthash_t::iterator> range;
Row r;
largeKey = largeSideRow.getIntField(largeKeyColumns[0]);
it = sth->find(largeKey);
if (it == sth->end() && !(joinType & (LARGEOUTER | MATCHNULLS)))
return;
range = sth->equal_range(largeKey);
//smallRG.initRow(&r);
for (; range.first != range.second; ++range.first)
{
//r.setPointer(range.first->second);
//cerr << "matched small side row: " << r.toString() << endl;
matches->push_back(range.first->second);
}
}
}
if (UNLIKELY(largeOuterJoin() && matches->size() == 0))
{
@ -417,21 +584,27 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
{
if (smallRG.getColType(largeKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
{
pair<ldIterator, ldIterator> range = ld->equal_range(joblist::LONGDOUBLENULL);
uint bucket = bucketPicker((char *) &(joblist::LONGDOUBLENULL),
sizeof(joblist::LONGDOUBLENULL), bpSeed) & bucketMask;
pair<ldIterator, ldIterator> range = ld[bucket]->equal_range(joblist::LONGDOUBLENULL);
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
}
else if (!smallRG.usesStringTable())
{
pair<iterator, iterator> range = h->equal_range(getJoinNullValue());
auto nullVal = getJoinNullValue();
uint bucket = bucketPicker((char *) &nullVal, sizeof(nullVal), bpSeed) & bucketMask;
pair<iterator, iterator> range = h[bucket]->equal_range(nullVal);
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
}
else
{
pair<sthash_t::iterator, sthash_t::iterator> range = sth->equal_range(getJoinNullValue());
auto nullVal = getJoinNullValue();
uint bucket = bucketPicker((char *) &nullVal, sizeof(nullVal), bpSeed) & bucketMask;
pair<sthash_t::iterator, sthash_t::iterator> range = sth[bucket]->equal_range(nullVal);
for (; range.first != range.second; ++range.first)
matches->push_back(range.first->second);
@ -448,21 +621,24 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
{
ldIterator it;
for (it = ld->begin(); it != ld->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = ld[i]->begin(); it != ld[i]->end(); ++it)
matches->push_back(it->second);
}
else if (!smallRG.usesStringTable())
{
iterator it;
for (it = h->begin(); it != h->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = h[i]->begin(); it != h[i]->end(); ++it)
matches->push_back(it->second);
}
else
{
sthash_t::iterator it;
for (it = sth->begin(); it != sth->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = sth[i]->begin(); it != sth[i]->end(); ++it)
matches->push_back(it->second);
}
}
@ -470,7 +646,8 @@ void TupleJoiner::match(rowgroup::Row& largeSideRow, uint32_t largeRowIndex, uin
{
thIterator it;
for (it = ht->begin(); it != ht->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = ht[i]->begin(); it != ht[i]->end(); ++it)
matches->push_back(it->second);
}
}
@ -516,16 +693,17 @@ void TupleJoiner::doneInserting()
rowCount = size();
uint bucket = 0;
if (joinAlg == PM)
pmpos = 0;
else if (typelessJoin)
thit = ht->begin();
thit = ht[bucket]->begin();
else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
ldit = ld->begin();
ldit = ld[bucket]->begin();
else if (!smallRG.usesStringTable())
hit = h->begin();
hit = h[bucket]->begin();
else
sthit = sth->begin();
sthit = sth[bucket]->begin();
for (i = 0; i < rowCount; i++)
{
@ -533,21 +711,29 @@ void TupleJoiner::doneInserting()
smallRow.setPointer(rows[pmpos++]);
else if (typelessJoin)
{
while (thit == ht[bucket]->end())
thit = ht[++bucket]->begin();
smallRow.setPointer(thit->second);
++thit;
}
else if (smallRG.getColType(smallKeyColumns[col]) == CalpontSystemCatalog::LONGDOUBLE)
{
while (ldit == ld[bucket]->end())
ldit = ld[++bucket]->begin();
smallRow.setPointer(ldit->second);
++ldit;
}
else if (!smallRG.usesStringTable())
{
while (hit == h[bucket]->end())
hit = h[++bucket]->begin();
smallRow.setPointer(hit->second);
++hit;
}
else
{
while (sthit == sth[bucket]->end())
sthit = sth[++bucket]->begin();
smallRow.setPointer(sthit->second);
++sthit;
}
@ -570,8 +756,7 @@ void TupleJoiner::doneInserting()
}
}
}
else
if (smallRow.isUnsigned(smallKeyColumns[col]))
else if (smallRow.isUnsigned(smallKeyColumns[col]))
{
uniquer.insert((int64_t)smallRow.getUintField(smallKeyColumns[col]));
}
@ -599,6 +784,18 @@ void TupleJoiner::setInPM()
joinAlg = PM;
}
void TupleJoiner::umJoinConvert(size_t begin, size_t end)
{
Row smallRow;
smallRG.initRow(&smallRow);
while (begin < end)
{
smallRow.setPointer(rows[begin++]);
insert(smallRow);
}
}
void TupleJoiner::setInUM()
{
vector<Row::Pointer> empty;
@ -610,16 +807,17 @@ void TupleJoiner::setInUM()
joinAlg = UM;
size = rows.size();
smallRG.initRow(&smallRow);
#ifdef TJ_DEBUG
cout << "converting array to hash, size = " << size << "\n";
#endif
size_t chunkSize = ((size / numCores) + 1 < 50000 ? 50000 : (size / numCores) + 1); // don't start a thread to process < 50k rows
for (i = 0; i < size; i++)
{
smallRow.setPointer(rows[i]);
insert(smallRow);
}
uint64_t jobs[numCores];
i = 0;
for (size_t firstRow = 0; i < (uint) numCores && firstRow < size; i++, firstRow += chunkSize)
jobs[i] = jobstepThreadPool->invoke([this, firstRow, chunkSize, size] {
this->umJoinConvert(firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size));
} );
for (uint j = 0; j < i; j++)
jobstepThreadPool->join(jobs[j]);
#ifdef TJ_DEBUG
cout << "done\n";
@ -635,6 +833,57 @@ void TupleJoiner::setInUM()
}
}
void TupleJoiner::umJoinConvert(uint threadID, vector<RGData> &rgs, size_t begin, size_t end)
{
RowGroup l_smallRG(smallRG);
while (begin < end)
{
l_smallRG.setData(&(rgs[begin++]));
insertRGData(l_smallRG, threadID);
}
}
void TupleJoiner::setInUM(vector<RGData> &rgs)
{
Row smallRow;
uint32_t i, size;
if (joinAlg == UM)
return;
{ // don't need rows anymore, free the mem
vector<Row::Pointer> empty;
rows.swap(empty);
}
joinAlg = UM;
size = rgs.size();
size_t chunkSize = ((size / numCores) + 1 < 10 ? 10 : (size / numCores) + 1); // don't issue jobs for < 10 rowgroups
uint64_t jobs[numCores];
i = 0;
for (size_t firstRow = 0; i < (uint) numCores && firstRow < size; i++, firstRow += chunkSize)
jobs[i] = jobstepThreadPool->invoke([this, firstRow, chunkSize, size, i, &rgs] {
this->umJoinConvert(i, rgs, firstRow, (firstRow + chunkSize < size ? firstRow + chunkSize : size));
} );
for (uint j = 0; j < i; j++)
jobstepThreadPool->join(jobs[j]);
#ifdef TJ_DEBUG
cout << "done\n";
#endif
if (typelessJoin)
{
tmpKeyAlloc.reset(new FixedAllocator[threadCount]);
for (i = 0; i < threadCount; i++)
tmpKeyAlloc[i] = FixedAllocator(keyLength, true);
}
}
void TupleJoiner::setPMJoinResults(boost::shared_array<vector<uint32_t> > jr,
uint32_t threadID)
{
@ -727,7 +976,8 @@ void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
{
typelesshash_t::iterator it;
for (it = ht->begin(); it != ht->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = ht[i]->begin(); it != ht[i]->end(); ++it)
{
smallR.setPointer(it->second);
@ -739,7 +989,8 @@ void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
{
ldIterator it;
for (it = ld->begin(); it != ld->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = ld[i]->begin(); it != ld[i]->end(); ++it)
{
smallR.setPointer(it->second);
@ -751,7 +1002,8 @@ void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
{
iterator it;
for (it = h->begin(); it != h->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = h[i]->begin(); it != h[i]->end(); ++it)
{
smallR.setPointer(it->second);
@ -763,7 +1015,8 @@ void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
{
sthash_t::iterator it;
for (it = sth->begin(); it != sth->end(); ++it)
for (uint i = 0; i < bucketCount; i++)
for (it = sth[i]->begin(); it != sth[i]->end(); ++it)
{
smallR.setPointer(it->second);
@ -777,9 +1030,21 @@ void TupleJoiner::getUnmarkedRows(vector<Row::Pointer>* out)
uint64_t TupleJoiner::getMemUsage() const
{
if (inUM() && typelessJoin)
return _pool->getMemUsage() + storedKeyAlloc.getMemUsage();
{
size_t ret = 0;
for (uint i = 0; i < bucketCount; i++)
ret += _pool[i]->getMemUsage();
for (int i = 0; i < numCores; i++)
ret += storedKeyAlloc[i].getMemUsage();
return ret;
}
else if (inUM())
return _pool->getMemUsage();
{
size_t ret = 0;
for (uint i = 0; i < bucketCount; i++)
ret += _pool[i]->getMemUsage();
return ret;
}
else
return (rows.size() * sizeof(Row::Pointer));
}
@ -896,14 +1161,17 @@ size_t TupleJoiner::size() const
{
if (joinAlg == UM || joinAlg == INSERTING)
{
size_t ret = 0;
for (uint i = 0; i < bucketCount; i++)
if (UNLIKELY(typelessJoin))
return ht->size();
ret += ht[i]->size();
else if (smallRG.getColType(smallKeyColumns[0]) == CalpontSystemCatalog::LONGDOUBLE)
return ld->size();
ret += ld[i]->size();
else if (!smallRG.usesStringTable())
return h->size();
ret += h[i]->size();
else
return sth->size();
ret += sth[i]->size();
return ret;
}
return rows.size();
@ -1352,15 +1620,29 @@ void TupleJoiner::setTableName(const string& tname)
void TupleJoiner::clearData()
{
STLPoolAllocator<pair<const TypelessData, Row::Pointer> > alloc;
_pool = alloc.getPoolAllocator();
_pool.reset(new boost::shared_ptr<utils::PoolAllocator>[bucketCount]);
if (typelessJoin)
ht.reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
ht.reset(new boost::scoped_ptr<typelesshash_t>[bucketCount]);
else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE)
ld.reset(new boost::scoped_ptr<ldhash_t>[bucketCount]);
else if (smallRG.usesStringTable())
sth.reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
sth.reset(new boost::scoped_ptr<sthash_t>[bucketCount]);
else
h.reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
h.reset(new boost::scoped_ptr<hash_t>[bucketCount]);
for (uint i = 0; i < bucketCount; i++)
{
STLPoolAllocator<pair<const TypelessData, Row::Pointer> > alloc;
_pool[i] = alloc.getPoolAllocator();
if (typelessJoin)
ht[i].reset(new typelesshash_t(10, hasher(), typelesshash_t::key_equal(), alloc));
else if (smallRG.getColTypes()[smallKeyColumns[0]] == CalpontSystemCatalog::LONGDOUBLE)
ld[i].reset(new ldhash_t(10, hasher(), ldhash_t::key_equal(), alloc));
else if (smallRG.usesStringTable())
sth[i].reset(new sthash_t(10, hasher(), sthash_t::key_equal(), alloc));
else
h[i].reset(new hash_t(10, hasher(), hash_t::key_equal(), alloc));
}
std::vector<rowgroup::Row::Pointer> empty;
rows.swap(empty);
@ -1406,7 +1688,16 @@ boost::shared_ptr<TupleJoiner> TupleJoiner::copyForDiskJoin()
}
if (typelessJoin)
ret->storedKeyAlloc = FixedAllocator(keyLength);
{
ret->storedKeyAlloc.reset(new FixedAllocator[numCores]);
for (int i = 0; i < numCores; i++)
ret->storedKeyAlloc[i].setAllocSize(keyLength);
}
ret->numCores = numCores;
ret->bucketCount = bucketCount;
ret->bucketMask = bucketMask;
ret->jobstepThreadPool = jobstepThreadPool;
ret->setThreadCount(1);
ret->clearData();
@ -1414,9 +1705,9 @@ boost::shared_ptr<TupleJoiner> TupleJoiner::copyForDiskJoin()
return ret;
}
void TupleJoiner::setConvertToDiskJoin()
{
_convertToDiskJoin = true;
}
};

View File

@ -38,6 +38,7 @@
#include "../funcexp/funcexpwrapper.h"
#include "stlpoolallocator.h"
#include "hasher.h"
#include "threadpool.h"
namespace joiner
{
@ -147,7 +148,8 @@ public:
const rowgroup::RowGroup& largeInput,
uint32_t smallJoinColumn,
uint32_t largeJoinColumn,
joblist::JoinType jt);
joblist::JoinType jt,
threadpool::ThreadPool *jsThreadPool);
/* ctor to use for string & compound join */
TupleJoiner(
@ -155,12 +157,14 @@ public:
const rowgroup::RowGroup& largeInput,
const std::vector<uint32_t>& smallJoinColumns,
const std::vector<uint32_t>& largeJoinColumns,
joblist::JoinType jt);
joblist::JoinType jt,
threadpool::ThreadPool *jsThreadPool);
~TupleJoiner();
size_t size() const;
void insert(rowgroup::Row& r, bool zeroTheRid = true);
void insert(rowgroup::Row& r, bool zeroTheRid = true); // not thread-safe
void insertRGData(rowgroup::RowGroup &rg, uint threadID);
void doneInserting();
/* match() returns the small-side rows that match the large-side row.
@ -187,8 +191,20 @@ public:
{
return joinAlg == UM;
}
inline bool onDisk() const
{
return _convertToDiskJoin;
}
void setInPM();
void setInUM(std::vector<rowgroup::RGData> &rgs);
void umJoinConvert(uint threadID, std::vector<rowgroup::RGData> &rgs, size_t begin, size_t end);
// TODO: these are currently in use by edge cases, ex, converting to disk
// join. Would be nice to make those cases use the rgdata variants
// above.
void setInUM();
void umJoinConvert(size_t begin, size_t end);
void setThreadCount(uint32_t cnt);
void setPMJoinResults(boost::shared_array<std::vector<uint32_t> >,
uint32_t threadID);
@ -325,6 +341,7 @@ public:
{
return finished;
}
void setConvertToDiskJoin();
private:
typedef std::tr1::unordered_multimap<int64_t, uint8_t*, hasher, std::equal_to<int64_t>,
@ -344,13 +361,13 @@ private:
TupleJoiner();
TupleJoiner(const TupleJoiner&);
TupleJoiner& operator=(const TupleJoiner&);
void getBucketCount();
rowgroup::RGData smallNullMemory;
boost::scoped_ptr<hash_t> h; // used for UM joins on ints
boost::scoped_ptr<sthash_t> sth; // used for UM join on ints where the backing table uses a string table
boost::scoped_ptr<ldhash_t> ld; // used for UM join on long double
boost::scoped_array<boost::scoped_ptr<hash_t> > h; // used for UM joins on ints
boost::scoped_array<boost::scoped_ptr<sthash_t> > sth; // used for UM join on ints where the backing table uses a string table
boost::scoped_array<boost::scoped_ptr<ldhash_t> > ld; // used for UM join on long double
std::vector<rowgroup::Row::Pointer> rows; // used for PM join
/* This struct is rough. The BPP-JL stores the parsed results for
@ -372,16 +389,16 @@ private:
};
JoinAlg joinAlg;
joblist::JoinType joinType;
boost::shared_ptr<utils::PoolAllocator> _pool; // pool for the table and nodes
boost::shared_array<boost::shared_ptr<utils::PoolAllocator> > _pool; // pools for the table and nodes
uint32_t threadCount;
std::string tableName;
/* vars, & fcns for typeless join */
bool typelessJoin;
std::vector<uint32_t> smallKeyColumns, largeKeyColumns;
boost::scoped_ptr<typelesshash_t> ht; // used for UM join on strings
boost::scoped_array<boost::scoped_ptr<typelesshash_t> > ht; // used for UM join on strings
uint32_t keyLength;
utils::FixedAllocator storedKeyAlloc;
boost::scoped_array<utils::FixedAllocator> storedKeyAlloc;
boost::scoped_array<utils::FixedAllocator> tmpKeyAlloc;
bool bSignedUnsignedJoin; // Set if we have a signed vs unsigned compare in a join. When not set, we can save checking for the signed bit.
@ -398,9 +415,27 @@ private:
boost::scoped_array<std::vector<int64_t> > cpValues; // if !discreteValues, [0] has min, [1] has max
uint32_t uniqueLimit;
bool finished;
// multithreaded UM hash table construction
int numCores;
uint bucketCount;
uint bucketMask;
boost::scoped_array<boost::mutex> m_bucketLocks;
boost::mutex m_typelessLock, m_cpValuesLock;
utils::Hasher_r bucketPicker;
const uint32_t bpSeed = 0x4545e1d7; // an arbitrary random #
threadpool::ThreadPool *jobstepThreadPool;
void um_insertTypeless(uint threadID, uint rowcount, rowgroup::Row &r);
void um_insertLongDouble(uint rowcount, rowgroup::Row &r);
void um_insertInlineRows(uint rowcount, rowgroup::Row &r);
void um_insertStringTable(uint rowcount, rowgroup::Row &r);
template<typename buckets_t, typename hash_table_t>
void bucketsToTables(buckets_t *, hash_table_t *);
bool _convertToDiskJoin;
};
}
#endif

View File

@ -29,6 +29,7 @@ using namespace std;
using namespace logging;
#include "threadpool.h"
#include "threadnaming.h"
#include <iomanip>
#include <sstream>
#include "boost/date_time/posix_time/posix_time_types.hpp"
@ -321,6 +322,7 @@ uint64_t ThreadPool::invoke(const Functor_T& threadfunc)
void ThreadPool::beginThread() throw()
{
utils::setThreadName("Idle");
try
{
boost::mutex::scoped_lock lock1(fMutex);
@ -386,6 +388,7 @@ void ThreadPool::beginThread() throw()
lock1.unlock();
utils::setThreadName("Unspecified");
try
{
todo->functor();
@ -405,6 +408,7 @@ void ThreadPool::beginThread() throw()
#endif
}
utils::setThreadName("Idle");
lock1.lock();
--fIssued;
--waitingFunctorsSize;