1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00

MCOL-5514 Parallel disk join step.

This commit is contained in:
Denis Khalikov
2023-06-15 20:47:41 +03:00
parent ebfb9face2
commit 2a66ae2ed1
6 changed files with 204 additions and 112 deletions

View File

@@ -98,7 +98,6 @@ DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bo
{ {
// drain inputs, close output // drain inputs, close output
smallReader(); // only small input is supplying input at this point smallReader(); // only small input is supplying input at this point
// largeReader();
outputDL->endOfInput(); outputDL->endOfInput();
closedOutput = true; closedOutput = true;
} }
@@ -178,7 +177,6 @@ void DiskJoinStep::smallReader()
memUsage = jp->insertSmallSideRGData(rgData); memUsage = jp->insertSmallSideRGData(rgData);
combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage);
// cout << "memusage = " << memUsage << " total = " << combinedMemUsage << endl;
if (combinedMemUsage > smallLimit) if (combinedMemUsage > smallLimit)
{ {
errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT)); errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT));
@@ -189,14 +187,11 @@ void DiskJoinStep::smallReader()
} }
} }
// cout << "(" << joinerIndex << ") read the small side data, combined mem usage= " << combinedMemUsage << if (LIKELY(!cancelled()))
// " rowcount = " << rowCount << endl;
if (!cancelled())
{ {
memUsage = jp->doneInsertingSmallData(); memUsage = jp->doneInsertingSmallData();
combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage);
// cout << "2memusage = " << memUsage << " total = " << combinedMemUsage << endl;
if (combinedMemUsage > smallLimit) if (combinedMemUsage > smallLimit)
{ {
errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT)); errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT));
@@ -226,7 +221,6 @@ void DiskJoinStep::largeReader()
RowGroup l_largeRG = largeRG; RowGroup l_largeRG = largeRG;
largeIterationCount++; largeIterationCount++;
// cout << "iteration " << largeIterationCount << " largeLimit=" << largeLimit << endl;
try try
{ {
@@ -237,7 +231,6 @@ void DiskJoinStep::largeReader()
if (more) if (more)
{ {
l_largeRG.setData(&rgData); l_largeRG.setData(&rgData);
// cout << "large side raw data: " << largeRG.toString() << endl;
largeSize += jp->insertLargeSideRGData(rgData); largeSize += jp->insertLargeSideRGData(rgData);
} }
@@ -245,7 +238,6 @@ void DiskJoinStep::largeReader()
jp->doneInsertingLargeData(); jp->doneInsertingLargeData();
// cout << "(" << joinerIndex << ") read the large side data rowcount = " << rowCount << endl;
if (!more) if (!more)
lastLargeIteration = true; lastLargeIteration = true;
} }
@@ -257,29 +249,21 @@ void DiskJoinStep::largeReader()
abort(); abort();
} }
if (cancelled()) if (UNLIKELY(cancelled()))
while (more) while (more)
more = largeDL->next(largeIt, &rgData); more = largeDL->next(largeIt, &rgData);
} }
void DiskJoinStep::loadFcn() void DiskJoinStep::loadFcn(const uint32_t threadID, const uint32_t smallSideSizeLimit,
const std::vector<joiner::JoinPartition*>& joinPartitions)
{ {
boost::shared_ptr<LoaderOutput> out; boost::shared_ptr<LoaderOutput> out;
std::vector<JoinPartition*> joinPartitions;
// Collect all join partitions.
jp->collectJoinPartitions(joinPartitions);
#ifdef DEBUG_DJS
cout << "Collected join partitions: " << endl;
for (uint32_t i = 0; i < joinPartitions.size(); ++i)
cout << joinPartitions[i]->getUniqueID() << ", ";
cout << endl;
#endif
try try
{ {
uint32_t partitionIndex = 0; uint32_t partitionIndex = 0;
bool partitionDone = true; bool partitionDone = true;
RowGroup& rowGroup = smallRG;
// Iterate over partitions. // Iterate over partitions.
while (partitionIndex < joinPartitions.size() && !cancelled()) while (partitionIndex < joinPartitions.size() && !cancelled())
@@ -294,7 +278,6 @@ void DiskJoinStep::loadFcn()
while (true) while (true)
{ {
messageqcpp::ByteStream bs; messageqcpp::ByteStream bs;
RowGroup rowGroup;
RGData rgData; RGData rgData;
joinPartition->readByteStream(0, &bs); joinPartition->readByteStream(0, &bs);
@@ -314,10 +297,10 @@ void DiskJoinStep::loadFcn()
break; break;
} }
currentSize += rowGroup.getRowCount() * rowGroup.getColumnCount() * 64; currentSize += rowGroup.getDataSize();
out->smallData.push_back(rgData); out->smallData.push_back(rgData);
if (currentSize > partitionSize) if (currentSize > smallSideSizeLimit)
{ {
#ifdef DEBUG_DJS #ifdef DEBUG_DJS
cout << "Memory limit exceeded for the partition: " << joinPartition->getUniqueID() << endl; cout << "Memory limit exceeded for the partition: " << joinPartition->getUniqueID() << endl;
@@ -339,7 +322,7 @@ void DiskJoinStep::loadFcn()
// Initialize `LoaderOutput` and add it to `FIFO`. // Initialize `LoaderOutput` and add it to `FIFO`.
out->partitionID = joinPartition->getUniqueID(); out->partitionID = joinPartition->getUniqueID();
out->jp = joinPartition; out->jp = joinPartition;
loadFIFO->insert(out); loadFIFO[threadID]->insert(out);
// If this partition is done - take a next one. // If this partition is done - take a next one.
if (partitionDone) if (partitionDone)
@@ -354,25 +337,24 @@ void DiskJoinStep::loadFcn()
abort(); abort();
} }
loadFIFO->endOfInput(); loadFIFO[threadID]->endOfInput();
} }
void DiskJoinStep::buildFcn() void DiskJoinStep::buildFcn(const uint32_t threadID)
{ {
boost::shared_ptr<LoaderOutput> in; boost::shared_ptr<LoaderOutput> in;
boost::shared_ptr<BuilderOutput> out; boost::shared_ptr<BuilderOutput> out;
bool more = true; bool more = true;
int it = loadFIFO->getIterator(); int it = loadFIFO[threadID]->getIterator();
int i, j; int i, j;
Row smallRow; Row smallRow;
RowGroup l_smallRG = smallRG; RowGroup l_smallRG = smallRG;
l_smallRG.initRow(&smallRow); l_smallRG.initRow(&smallRow);
while (1) while (true)
{ {
// cout << "getting a partition from the loader" << endl; more = loadFIFO[threadID]->next(it, &in);
more = loadFIFO->next(it, &in);
if (!more || cancelled()) if (!more || cancelled())
goto out; goto out;
@@ -383,7 +365,6 @@ void DiskJoinStep::buildFcn()
out->jp = in->jp; out->jp = in->jp;
out->tupleJoiner = joiner->copyForDiskJoin(); out->tupleJoiner = joiner->copyForDiskJoin();
// cout << "building a tuplejoiner" << endl;
for (i = 0; i < (int)in->smallData.size(); i++) for (i = 0; i < (int)in->smallData.size(); i++)
{ {
l_smallRG.setData(&in->smallData[i]); l_smallRG.setData(&in->smallData[i]);
@@ -394,26 +375,25 @@ void DiskJoinStep::buildFcn()
} }
out->tupleJoiner->doneInserting(); out->tupleJoiner->doneInserting();
buildFIFO->insert(out); buildFIFO[threadID]->insert(out);
} }
out: out:
while (more) while (more)
more = loadFIFO->next(it, &in); more = loadFIFO[threadID]->next(it, &in);
buildFIFO->endOfInput(); buildFIFO[threadID]->endOfInput();
} }
void DiskJoinStep::joinFcn() void DiskJoinStep::joinFcn(const uint32_t threadID)
{ {
/* This function mostly serves as an adapter between the // This function mostly serves as an adapter between the
input data and the joinOneRG() fcn in THJS. */ // input data and the joinOneRG() fcn in THJS.
boost::shared_ptr<BuilderOutput> in; boost::shared_ptr<BuilderOutput> in;
bool more = true; bool more = true;
int it = buildFIFO->getIterator(); int it = buildFIFO[threadID]->getIterator();
int i, j; int i;
vector<RGData> joinResults; vector<RGData> joinResults;
RowGroup l_largeRG = largeRG, l_smallRG = smallRG; RowGroup l_largeRG = largeRG, l_smallRG = smallRG;
RowGroup l_outputRG = outputRG; RowGroup l_outputRG = outputRG;
@@ -467,9 +447,9 @@ void DiskJoinStep::joinFcn()
try try
{ {
while (1) while (true)
{ {
more = buildFIFO->next(it, &in); more = buildFIFO[threadID]->next(it, &in);
if (!more || cancelled()) if (!more || cancelled())
goto out; goto out;
@@ -485,12 +465,9 @@ void DiskJoinStep::joinFcn()
joinMatches, smallRowTemplates, outputDL.get(), &joiners, &colMappings, &fergMappings, joinMatches, smallRowTemplates, outputDL.get(), &joiners, &colMappings, &fergMappings,
&smallNullMem); &smallNullMem);
for (j = 0; j < (int)joinResults.size(); j++) if (joinResults.size())
{ outputResult(joinResults);
// l_outputRG.setData(&joinResults[j]);
// cout << "got joined output " << l_outputRG.toString() << endl;
outputDL->insert(joinResults[j]);
}
thjs->returnMemory(); thjs->returnMemory();
joinResults.clear(); joinResults.clear();
largeData = in->jp->getNextLargeRGData(); largeData = in->jp->getNextLargeRGData();
@@ -565,15 +542,10 @@ void DiskJoinStep::joinFcn()
outputRow.nextRow(); outputRow.nextRow();
} }
if (l_outputRG.getRowCount() > 0) if (l_outputRG.getRowCount())
{ outputResult({rgData});
// cout << "inserting an rg with " << l_outputRG.getRowCount() << endl;
outputDL->insert(rgData);
}
if (thjs) if (thjs)
{
thjs->returnMemory(); thjs->returnMemory();
}
} }
} }
} }
@@ -589,9 +561,9 @@ void DiskJoinStep::joinFcn()
out: out:
while (more) while (more)
more = buildFIFO->next(it, &in); more = buildFIFO[threadID]->next(it, &in);
if (lastLargeIteration || cancelled()) if (cancelled())
{ {
reportStats(); reportStats();
outputDL->endOfInput(); outputDL->endOfInput();
@@ -599,39 +571,107 @@ out:
} }
} }
void DiskJoinStep::initializeFIFO(const uint32_t threadCount)
{
loadFIFO.clear();
buildFIFO.clear();
for (uint32_t i = 0; i < threadCount; ++i)
{
boost::shared_ptr<LoaderOutputFIFO> lFIFO(new LoaderOutputFIFO(1, 1));
boost::shared_ptr<BuilderOutputFIFO> bFIFO(new BuilderOutputFIFO(1, 1));
loadFIFO.push_back(lFIFO);
buildFIFO.push_back(bFIFO);
}
}
void DiskJoinStep::processJoinPartitions(const uint32_t threadID, const uint32_t smallSideSizeLimitPerThread,
const std::vector<JoinPartition*>& joinPartitions)
{
std::vector<uint64_t> pipelineThreads;
pipelineThreads.reserve(3);
pipelineThreads.push_back(
jobstepThreadPool.invoke(Loader(this, threadID, smallSideSizeLimitPerThread, joinPartitions)));
pipelineThreads.push_back(jobstepThreadPool.invoke(Builder(this, threadID)));
pipelineThreads.push_back(jobstepThreadPool.invoke(Joiner(this, threadID)));
jobstepThreadPool.join(pipelineThreads);
}
void DiskJoinStep::prepareJobs(const std::vector<JoinPartition*>& joinPartitions,
JoinPartitionJobs& joinPartitionsJobs)
{
const uint32_t issuedThreads = jobstepThreadPool.getIssuedThreads();
const uint32_t maxNumOfThreads = jobstepThreadPool.getMaxThreads();
const uint32_t numOfThreads =
std::min(std::min(maxNumOfJoinThreads, std::max(maxNumOfThreads - issuedThreads, (uint32_t)1)),
(uint32_t)joinPartitions.size());
const uint32_t workSize = joinPartitions.size() / numOfThreads;
uint32_t offset = 0;
joinPartitionsJobs.reserve(numOfThreads);
for (uint32_t threadNum = 0; threadNum < numOfThreads; ++threadNum, offset += workSize)
{
auto start = joinPartitions.begin() + offset;
auto end = start + workSize;
std::vector<JoinPartition*> joinPartitionJob(start, end);
joinPartitionsJobs.push_back(std::move(joinPartitionJob));
}
for (uint32_t i = 0, e = joinPartitions.size() % numOfThreads; i < e; ++i, ++offset)
joinPartitionsJobs[i].push_back(joinPartitions[offset]);
}
void DiskJoinStep::outputResult(const std::vector<rowgroup::RGData>& result)
{
std::lock_guard<std::mutex> lk(outputMutex);
for (const auto &rgData : result)
outputDL->insert(rgData);
}
void DiskJoinStep::spawnJobs(const std::vector<std::vector<JoinPartition*>>& joinPartitionsJobs,
const uint32_t smallSideSizeLimitPerThread)
{
const uint32_t threadsCount = joinPartitionsJobs.size();
std::vector<uint64_t> processorThreadsId;
processorThreadsId.reserve(threadsCount);
for (uint32_t threadID = 0; threadID < threadsCount; ++threadID)
{
processorThreadsId.push_back(jobstepThreadPool.invoke(
JoinPartitionsProcessor(this, threadID, smallSideSizeLimitPerThread, joinPartitionsJobs[threadID])));
}
jobstepThreadPool.join(processorThreadsId);
}
void DiskJoinStep::mainRunner() void DiskJoinStep::mainRunner()
{ {
/*
Read from smallDL, insert into small side
Read from largeDL, insert into large side
Start the processing threads
*/
try try
{ {
smallReader(); smallReader();
while (!lastLargeIteration && !cancelled()) while (!lastLargeIteration && !cancelled())
{ {
// cout << "large iteration " << largeIterationCount << endl;
jp->initForLargeSideFeed(); jp->initForLargeSideFeed();
largeReader(); largeReader();
// cout << "done reading iteration " << largeIterationCount-1 << endl;
jp->initForProcessing(); jp->initForProcessing();
if (cancelled()) // Collect all join partitions.
break; std::vector<JoinPartition*> joinPartitions;
jp->collectJoinPartitions(joinPartitions);
loadFIFO.reset( // Split partitions for each threads.
new FIFO<boost::shared_ptr<LoaderOutput> >(1, 1)); // double buffering should be good enough JoinPartitionJobs joinPartitionsJobs;
buildFIFO.reset(new FIFO<boost::shared_ptr<BuilderOutput> >(1, 1)); prepareJobs(joinPartitions, joinPartitionsJobs);
std::vector<uint64_t> thrds; // Initialize data lists.
thrds.reserve(3); const uint32_t numOfThreads = joinPartitionsJobs.size();
thrds.push_back(jobstepThreadPool.invoke(Loader(this))); initializeFIFO(numOfThreads);
thrds.push_back(jobstepThreadPool.invoke(Builder(this)));
thrds.push_back(jobstepThreadPool.invoke(Joiner(this))); // Spawn jobs.
jobstepThreadPool.join(thrds); const uint32_t smallSideSizeLimitPerThread = partitionSize / numOfThreads;
spawnJobs(joinPartitionsJobs, smallSideSizeLimitPerThread);
} }
} }
catch (...) catch (...)
@@ -643,7 +683,7 @@ void DiskJoinStep::mainRunner()
} }
// make sure all inputs were drained & output closed // make sure all inputs were drained & output closed
if (cancelled()) if (UNLIKELY(cancelled()))
{ {
try try
{ {
@@ -661,6 +701,12 @@ void DiskJoinStep::mainRunner()
closedOutput = true; closedOutput = true;
} }
} }
if (LIKELY(!closedOutput))
{
outputDL->endOfInput();
closedOutput = true;
}
} }
const string DiskJoinStep::toString() const const string DiskJoinStep::toString() const

View File

@@ -25,6 +25,8 @@
namespace joblist namespace joblist
{ {
using JoinPartitionJobs = std::vector<std::vector<joiner::JoinPartition*>>;
class DiskJoinStep : public JobStep class DiskJoinStep : public JobStep
{ {
public: public:
@@ -44,6 +46,14 @@ class DiskJoinStep : public JobStep
protected: protected:
private: private:
void initializeFIFO(uint32_t threadCount);
void processJoinPartitions(const uint32_t threadID, const uint32_t smallSideSizeLimitPerThread,
const vector<joiner::JoinPartition*>& joinPartitions);
void prepareJobs(const std::vector<joiner::JoinPartition*>& joinPartitions,
JoinPartitionJobs& joinPartitionsJobs);
void outputResult(const std::vector<rowgroup::RGData>& result);
void spawnJobs(const std::vector<std::vector<joiner::JoinPartition*>>& joinPartitionsJobs,
const uint32_t smallSideSizeLimitPerThread);
boost::shared_ptr<joiner::JoinPartition> jp; boost::shared_ptr<joiner::JoinPartition> jp;
rowgroup::RowGroup largeRG, smallRG, outputRG, joinFERG; rowgroup::RowGroup largeRG, smallRG, outputRG, joinFERG;
std::vector<uint32_t> largeKeyCols, smallKeyCols; std::vector<uint32_t> largeKeyCols, smallKeyCols;
@@ -80,6 +90,26 @@ class DiskJoinStep : public JobStep
uint64_t mainThread; // thread handle from thread pool uint64_t mainThread; // thread handle from thread pool
struct JoinPartitionsProcessor
{
JoinPartitionsProcessor(DiskJoinStep* djs, const uint32_t threadID, const uint32_t smallSideSizeLimit,
const std::vector<joiner::JoinPartition*>& joinPartitions)
: djs(djs), threadID(threadID), smallSideSizeLimit(smallSideSizeLimit), joinPartitions(joinPartitions)
{
}
void operator()()
{
utils::setThreadName("DJSJoinPartitionsProcessor");
djs->processJoinPartitions(threadID, smallSideSizeLimit, joinPartitions);
}
DiskJoinStep* djs;
uint32_t threadID;
uint32_t smallSideSizeLimit;
std::vector<joiner::JoinPartition*> joinPartitions;
};
/* Loader structs */ /* Loader structs */
struct LoaderOutput struct LoaderOutput
{ {
@@ -87,21 +117,30 @@ class DiskJoinStep : public JobStep
uint64_t partitionID; uint64_t partitionID;
joiner::JoinPartition* jp; joiner::JoinPartition* jp;
}; };
boost::shared_ptr<joblist::FIFO<boost::shared_ptr<LoaderOutput> > > loadFIFO;
using LoaderOutputFIFO = joblist::FIFO<boost::shared_ptr<LoaderOutput>>;
std::vector<boost::shared_ptr<LoaderOutputFIFO>> loadFIFO;
struct Loader struct Loader
{ {
Loader(DiskJoinStep* d) : djs(d) Loader(DiskJoinStep* d, const uint32_t threadID, const uint32_t smallSideSizeLimit,
const std::vector<joiner::JoinPartition*>& joinPartitions)
: djs(d), threadID(threadID), smallSideSizeLimit(smallSideSizeLimit), joinPartitions(joinPartitions)
{ {
} }
void operator()() void operator()()
{ {
utils::setThreadName("DJSLoader"); utils::setThreadName("DJSLoader");
djs->loadFcn(); djs->loadFcn(threadID, smallSideSizeLimit, joinPartitions);
} }
DiskJoinStep* djs; DiskJoinStep* djs;
uint32_t threadID;
uint32_t smallSideSizeLimit;
std::vector<joiner::JoinPartition*> joinPartitions;
}; };
void loadFcn(); void loadFcn(const uint32_t threadID, const uint32_t smallSideSizeLimit,
const std::vector<joiner::JoinPartition*>& joinPartitions);
/* Builder structs */ /* Builder structs */
struct BuilderOutput struct BuilderOutput
@@ -112,36 +151,39 @@ class DiskJoinStep : public JobStep
joiner::JoinPartition* jp; joiner::JoinPartition* jp;
}; };
boost::shared_ptr<joblist::FIFO<boost::shared_ptr<BuilderOutput> > > buildFIFO; using BuilderOutputFIFO = joblist::FIFO<boost::shared_ptr<BuilderOutput>>;
std::vector<boost::shared_ptr<BuilderOutputFIFO>> buildFIFO;
struct Builder struct Builder
{ {
Builder(DiskJoinStep* d) : djs(d) Builder(DiskJoinStep* d, const uint32_t threadID) : djs(d), threadID(threadID)
{ {
} }
void operator()() void operator()()
{ {
utils::setThreadName("DJSBuilder"); utils::setThreadName("DJSBuilder");
djs->buildFcn(); djs->buildFcn(threadID);
} }
DiskJoinStep* djs; DiskJoinStep* djs;
uint32_t threadID;
}; };
void buildFcn(); void buildFcn(const uint32_t threadID);
/* Joining structs */ /* Joining structs */
struct Joiner struct Joiner
{ {
Joiner(DiskJoinStep* d) : djs(d) Joiner(DiskJoinStep* d, const uint32_t threadID) : djs(d), threadID(threadID)
{ {
} }
void operator()() void operator()()
{ {
utils::setThreadName("DJSJoiner"); utils::setThreadName("DJSJoiner");
djs->joinFcn(); djs->joinFcn(threadID);
} }
DiskJoinStep* djs; DiskJoinStep* djs;
uint32_t threadID;
}; };
void joinFcn(); void joinFcn(const uint32_t threadID);
// limits & usage // limits & usage
boost::shared_ptr<int64_t> smallUsage; boost::shared_ptr<int64_t> smallUsage;
@@ -154,6 +196,9 @@ class DiskJoinStep : public JobStep
uint32_t joinerIndex; uint32_t joinerIndex;
bool closedOutput; bool closedOutput;
std::mutex outputMutex;
const uint32_t maxNumOfJoinThreads = 32;
}; };
} // namespace joblist } // namespace joblist

View File

@@ -1838,10 +1838,10 @@ void CircularJoinGraphTransformer::chooseEdgeToTransform(Cycle& cycle,
} }
if (jobInfo.trace) if (jobInfo.trace)
std::cout << "FK FK key not found, removing the first one inner join edge" << std::endl; std::cout << "FK FK key not found, removing the last one inner join edge" << std::endl;
// Take just a first. // Take just a last one.
resultEdge = std::make_pair(cycle.front(), 0 /*Dummy weight*/); resultEdge = std::make_pair(cycle.back(), 0 /*Dummy weight*/);
} }
void CircularJoinGraphTransformer::removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge) void CircularJoinGraphTransformer::removeAssociatedHashJoinStepFromJoinSteps(const JoinEdge& joinEdge)

View File

@@ -200,10 +200,10 @@ void TupleHashJoinStep::join()
joinRan = true; joinRan = true;
jobstepThreadPool.join(mainRunner); jobstepThreadPool.join(mainRunner);
if (djs) if (djs.size())
{ {
for (int i = 0; i < (int)djsJoiners.size(); i++) for (auto& diskJoinStep : djs)
djs[i].join(); diskJoinStep->join();
jobstepThreadPool.join(djsReader); jobstepThreadPool.join(djsReader);
jobstepThreadPool.join(djsRelay); jobstepThreadPool.join(djsRelay);
@@ -631,10 +631,10 @@ void TupleHashJoinStep::djsReaderFcn(int index)
while (more) while (more)
more = fifos[index]->next(it, &rgData); more = fifos[index]->next(it, &rgData);
for (int i = 0; i < (int)djsJoiners.size(); i++) for (auto& diskJoinStep : djs)
{ {
fExtendedInfo += djs[i].extendedInfo(); fExtendedInfo += diskJoinStep->extendedInfo();
fMiniInfo += djs[i].miniInfo(); fMiniInfo += diskJoinStep->miniInfo();
} }
outputDL->endOfInput(); outputDL->endOfInput();
@@ -750,7 +750,6 @@ void TupleHashJoinStep::hjRunner()
outputIt = outputDL->getIterator(); outputIt = outputDL->getIterator();
} }
djs.reset(new DiskJoinStep[smallSideCount]);
fifos.reset(new boost::shared_ptr<RowGroupDL>[smallSideCount + 1]); fifos.reset(new boost::shared_ptr<RowGroupDL>[smallSideCount + 1]);
for (i = 0; i <= smallSideCount; i++) for (i = 0; i <= smallSideCount; i++)
@@ -762,7 +761,8 @@ void TupleHashJoinStep::hjRunner()
{ {
// these link themselves fifos[0]->DSJ[0]->fifos[1]->DSJ[1] ... ->fifos[smallSideCount], // these link themselves fifos[0]->DSJ[0]->fifos[1]->DSJ[1] ... ->fifos[smallSideCount],
// THJS puts data into fifos[0], reads it from fifos[smallSideCount] // THJS puts data into fifos[0], reads it from fifos[smallSideCount]
djs[i] = DiskJoinStep(this, i, djsJoinerMap[i], (i == smallSideCount - 1)); djs.push_back(std::shared_ptr<DiskJoinStep>(
new DiskJoinStep(this, i, djsJoinerMap[i], (i == smallSideCount - 1))));
} }
sl.unlock(); sl.unlock();
@@ -774,7 +774,7 @@ void TupleHashJoinStep::hjRunner()
vector<RGData> empty; vector<RGData> empty;
resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit); resourceManager->returnMemory(memUsedByEachJoin[djsJoinerMap[i]], sessionMemLimit);
atomicops::atomicZero(&memUsedByEachJoin[i]); atomicops::atomicZero(&memUsedByEachJoin[i]);
djs[i].loadExistingData(rgData[djsJoinerMap[i]]); djs[i]->loadExistingData(rgData[djsJoinerMap[i]]);
rgData[djsJoinerMap[i]].swap(empty); rgData[djsJoinerMap[i]].swap(empty);
} }
} }
@@ -801,7 +801,7 @@ void TupleHashJoinStep::hjRunner()
reader = true; reader = true;
for (i = 0; i < smallSideCount; i++) for (i = 0; i < smallSideCount; i++)
djs[i].run(); djs[i]->run();
} }
catch (thread_resource_error&) catch (thread_resource_error&)
{ {
@@ -883,7 +883,7 @@ void TupleHashJoinStep::hjRunner()
} }
// todo: forwardCPData needs to grab data from djs // todo: forwardCPData needs to grab data from djs
if (!djs) if (djs.empty())
forwardCPData(); // this fcn has its own exclusion list forwardCPData(); // this fcn has its own exclusion list
// decide if perform aggregation on PM // decide if perform aggregation on PM
@@ -933,7 +933,7 @@ void TupleHashJoinStep::hjRunner()
{ {
largeBPS->useJoiners(tbpsJoiners); largeBPS->useJoiners(tbpsJoiners);
if (djs) if (djs.size())
largeBPS->setJoinedResultRG(largeRG + outputRG); largeBPS->setJoinedResultRG(largeRG + outputRG);
else else
largeBPS->setJoinedResultRG(outputRG); largeBPS->setJoinedResultRG(outputRG);
@@ -948,7 +948,7 @@ void TupleHashJoinStep::hjRunner()
For now, the alg is "assume if any joins are done on the UM, fe2 has to go on For now, the alg is "assume if any joins are done on the UM, fe2 has to go on
the UM." The structs and logic aren't in place yet to track all of the tables the UM." The structs and logic aren't in place yet to track all of the tables
through a joblist. */ through a joblist. */
if (fe2 && !djs) if (fe2 && !djs.size())
{ {
/* Can't do a small outer join when the PM sends back joined rows */ /* Can't do a small outer join when the PM sends back joined rows */
runFE2onPM = true; runFE2onPM = true;
@@ -974,7 +974,7 @@ void TupleHashJoinStep::hjRunner()
else if (fe2) else if (fe2)
runFE2onPM = false; runFE2onPM = false;
if (!fDelivery && !djs) if (!fDelivery && !djs.size())
{ {
/* connect the largeBPS directly to the next step */ /* connect the largeBPS directly to the next step */
JobStepAssociation newJsa; JobStepAssociation newJsa;
@@ -993,7 +993,7 @@ void TupleHashJoinStep::hjRunner()
// there are no in-mem UM or PM joins, only disk-joins // there are no in-mem UM or PM joins, only disk-joins
startAdjoiningSteps(); startAdjoiningSteps();
} }
else if (!djs) else if (!djs.size())
// if there's no largeBPS, all joins are either done by DJS or join threads, // if there's no largeBPS, all joins are either done by DJS or join threads,
// this clause starts the THJS join threads. // this clause starts the THJS join threads.
startJoinThreads(); startJoinThreads();
@@ -1026,7 +1026,7 @@ uint32_t TupleHashJoinStep::nextBand(messageqcpp::ByteStream& bs)
else else
deliveredRG = &outputRG; deliveredRG = &outputRG;
if (largeBPS && !djs) if (largeBPS && !djs.size())
{ {
dl = largeDL; dl = largeDL;
it = largeIt; it = largeIt;
@@ -2044,10 +2044,10 @@ void TupleHashJoinStep::abort()
JobStep::abort(); JobStep::abort();
boost::mutex::scoped_lock sl(djsLock); boost::mutex::scoped_lock sl(djsLock);
if (djs) if (djs.size())
{ {
for (uint32_t i = 0; i < djsJoiners.size(); i++) for (uint32_t i = 0, e = djs.size(); e < i; i++)
djs[i].abort(); djs[i]->abort();
} }
} }

View File

@@ -584,7 +584,7 @@ class TupleHashJoinStep : public JobStep, public TupleDeliveryStep
std::set<uint32_t> fFunctionJoinKeys; // for skipping CP forward std::set<uint32_t> fFunctionJoinKeys; // for skipping CP forward
/* Disk-based join support */ /* Disk-based join support */
boost::scoped_array<DiskJoinStep> djs; std::vector<std::shared_ptr<DiskJoinStep>> djs;
boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos; boost::scoped_array<boost::shared_ptr<RowGroupDL> > fifos;
void djsReaderFcn(int index); void djsReaderFcn(int index);
uint64_t djsReader; // thread handle from thread pool uint64_t djsReader; // thread handle from thread pool

View File

@@ -481,7 +481,8 @@ int64_t JoinPartition::processSmallBuffer(RGData& rgData)
ret = writeByteStream(0, bs); ret = writeByteStream(0, bs);
htSizeEstimate += rg.getRowCount() * rg.getColumnCount() * 64; if (rg.getRowCount())
htSizeEstimate += rg.getDataSize();
// Check whether this partition is now too big -> convert to split mode. // Check whether this partition is now too big -> convert to split mode.
if (htTargetSize < htSizeEstimate && canConvertToSplitMode()) if (htTargetSize < htSizeEstimate && canConvertToSplitMode())
ret += convertToSplitMode(); ret += convertToSplitMode();