/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "diskjoinstep.h" #include "exceptclasses.h" #include "resourcemanager.h" using namespace std; using namespace rowgroup; using namespace joiner; using namespace logging; namespace joblist { DiskJoinStep::DiskJoinStep() { } DiskJoinStep::DiskJoinStep(TupleHashJoinStep* t, int djsIndex, int joinIndex, bool lastOne) : JobStep(*t), thjs(t), mainThread(0), joinerIndex(joinIndex), closedOutput(false) { /* grab all relevant vars from THJS make largeRG and outputRG make the RG mappings init a JoinPartition load the existing RGData into JoinPartition */ largeRG = thjs->largeRG + thjs->outputRG; if (lastOne) outputRG = thjs->outputRG; else outputRG = largeRG; smallRG = thjs->smallRGs[joinerIndex]; largeKeyCols = thjs->largeSideKeys[joinerIndex]; smallKeyCols = thjs->smallSideKeys[joinerIndex]; /* Should not be necessary if we can use THJS's logic to do the join */ fe = thjs->getJoinFilter(joinerIndex); if (fe) { joinFERG = thjs->joinFilterRG; SjoinFEMapping = makeMapping(smallRG, joinFERG); LjoinFEMapping = makeMapping(largeRG, joinFERG); } joiner = thjs->djsJoiners[djsIndex]; joinType = joiner->getJoinType(); typeless = joiner->isTypelessJoin(); joiner->clearData(); joiner->setInUM(); LOMapping = makeMapping(largeRG, outputRG); SOMapping = makeMapping(smallRG, outputRG); /* Link up */ largeDL = thjs->fifos[djsIndex]; outputDL = thjs->fifos[djsIndex + 1]; smallDL = thjs->smallDLs[joinerIndex]; largeIt = largeDL->getIterator(); smallUsage = thjs->djsSmallUsage; smallLimit = thjs->djsSmallLimit; largeLimit = thjs->djsLargeLimit; partitionSize = thjs->djsPartitionSize; maxPartitionTreeDepth = thjs->djsMaxPartitionTreeDepth; if (smallLimit == 0) smallLimit = numeric_limits::max(); if (largeLimit == 0) largeLimit = numeric_limits::max(); uint64_t totalUMMemory = thjs->resourceManager->getConfiguredUMMemLimit(); jp.reset(new JoinPartition(largeRG, smallRG, smallKeyCols, largeKeyCols, typeless, (joinType & ANTI) && (joinType & MATCHNULLS), (bool)fe, totalUMMemory, partitionSize, maxPartitionTreeDepth)); if (cancelled()) { // drain inputs, close output smallReader(); // only small input is supplying input at this point outputDL->endOfInput(); closedOutput = true; } largeIterationCount = 0; lastLargeIteration = false; fMiniInfo.clear(); fExtendedInfo.clear(); } DiskJoinStep::~DiskJoinStep() { abort(); if (mainThread) { jobstepThreadPool.join(mainThread); mainThread = 0; } if (jp) atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); } void DiskJoinStep::loadExistingData(vector& data) { int64_t memUsage; uint32_t i; for (i = 0; i < data.size() && !cancelled(); i++) { memUsage = jp->insertSmallSideRGData(data[i]); atomicops::atomicAdd(smallUsage.get(), memUsage); } } void DiskJoinStep::run() { mainThread = jobstepThreadPool.invoke(Runner(this)); } void DiskJoinStep::join() { if (mainThread) { jobstepThreadPool.join(mainThread); mainThread = 0; } if (jp) { atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); // int64_t memUsage; // memUsage = atomicops::atomicSub(smallUsage.get(), jp->getSmallSideDiskUsage()); // cout << "join(): small side usage was: " << jp->getSmallSideDiskUsage() << " final shared mem usage = " // << memUsage << endl; jp.reset(); } } void DiskJoinStep::smallReader() { RGData rgData; bool more = true; int64_t memUsage = 0, combinedMemUsage = 0; RowGroup l_smallRG = smallRG; try { while (more && !cancelled()) { more = smallDL->next(0, &rgData); if (more) { l_smallRG.setData(&rgData); memUsage = jp->insertSmallSideRGData(rgData); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); if (combinedMemUsage > smallLimit) { errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT)); status(ERR_DBJ_DISK_USAGE_LIMIT); cout << "DJS small reader: exceeded disk space limit" << endl; abort(); } } } if (LIKELY(!cancelled())) { memUsage = jp->doneInsertingSmallData(); combinedMemUsage = atomicops::atomicAdd(smallUsage.get(), memUsage); if (combinedMemUsage > smallLimit) { errorMessage(IDBErrorInfo::instance()->errorMsg(ERR_DBJ_DISK_USAGE_LIMIT)); status(ERR_DBJ_DISK_USAGE_LIMIT); cout << "DJS small reader: exceeded disk space limit" << endl; abort(); } } } // try catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL, "DiskJoinStep::smallReader()"); status(logging::ERR_EXEMGR_MALFUNCTION); abort(); } while (more) more = smallDL->next(0, &rgData); } void DiskJoinStep::largeReader() { RGData rgData; bool more = true; int64_t largeSize = 0; RowGroup l_largeRG = largeRG; largeIterationCount++; try { while (more && !cancelled() && largeSize < largeLimit) { more = largeDL->next(largeIt, &rgData); if (more) { l_largeRG.setData(&rgData); largeSize += jp->insertLargeSideRGData(rgData); } } jp->doneInsertingLargeData(); if (!more) lastLargeIteration = true; } catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL, "DiskJoinStep::largeReader()"); status(logging::ERR_EXEMGR_MALFUNCTION); abort(); } if (UNLIKELY(cancelled())) while (more) more = largeDL->next(largeIt, &rgData); } void DiskJoinStep::loadFcn(const uint32_t threadID, const uint32_t smallSideSizeLimit, const std::vector& joinPartitions) { boost::shared_ptr out; try { uint32_t partitionIndex = 0; bool partitionDone = true; RowGroup& rowGroup = smallRG; // Iterate over partitions. while (partitionIndex < joinPartitions.size() && !cancelled()) { uint64_t currentSize = 0; auto* joinPartition = joinPartitions[partitionIndex]; out.reset(new LoaderOutput()); if (partitionDone) joinPartition->setNextSmallOffset(0); while (true) { messageqcpp::ByteStream bs; RGData rgData; joinPartition->readByteStream(0, &bs); if (!bs.length()) { partitionDone = true; break; } rgData.deserialize(bs); rowGroup.setData(&rgData); // Check that current `RowGroup` has rows. if (!rowGroup.getRowCount()) { partitionDone = true; break; } currentSize += rowGroup.getDataSize(); out->smallData.push_back(rgData); if (currentSize > smallSideSizeLimit) { #ifdef DEBUG_DJS cout << "Memory limit exceeded for the partition: " << joinPartition->getUniqueID() << endl; cout << "Current size: " << currentSize << " Memory limit: " << partitionSize << endl; #endif partitionDone = false; currentSize = 0; break; } } if (!out->smallData.size()) { ++partitionIndex; partitionDone = true; continue; } // Initialize `LoaderOutput` and add it to `FIFO`. out->partitionID = joinPartition->getUniqueID(); out->jp = joinPartition; loadFIFO[threadID]->insert(out); // If this partition is done - take a next one. if (partitionDone) ++partitionIndex; } } catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL, "DiskJoinStep::loadFcn()"); status(logging::ERR_EXEMGR_MALFUNCTION); abort(); } loadFIFO[threadID]->endOfInput(); } void DiskJoinStep::buildFcn(const uint32_t threadID) { boost::shared_ptr in; boost::shared_ptr out; bool more = true; int it = loadFIFO[threadID]->getIterator(); int i, j; Row smallRow; RowGroup l_smallRG = smallRG; l_smallRG.initRow(&smallRow); while (true) { more = loadFIFO[threadID]->next(it, &in); if (!more || cancelled()) goto out; out.reset(new BuilderOutput()); out->smallData = in->smallData; out->partitionID = in->partitionID; out->jp = in->jp; out->tupleJoiner = joiner->copyForDiskJoin(); for (i = 0; i < (int)in->smallData.size(); i++) { l_smallRG.setData(&in->smallData[i]); l_smallRG.getRow(0, &smallRow); for (j = 0; j < (int)l_smallRG.getRowCount(); j++, smallRow.nextRow()) out->tupleJoiner->insert(smallRow, (largeIterationCount == 1)); } out->tupleJoiner->doneInserting(); buildFIFO[threadID]->insert(out); } out: while (more) more = loadFIFO[threadID]->next(it, &in); buildFIFO[threadID]->endOfInput(); } void DiskJoinStep::joinFcn(const uint32_t threadID) { // This function mostly serves as an adapter between the // input data and the joinOneRG() fcn in THJS. boost::shared_ptr in; bool more = true; int it = buildFIFO[threadID]->getIterator(); int i; vector joinResults; RowGroup l_largeRG = largeRG, l_smallRG = smallRG; RowGroup l_outputRG = outputRG; Row l_largeRow; Row l_joinFERow, l_outputRow, baseRow; vector> joinMatches; auto new_row = new Row[1]; std::shared_ptr smallRowTemplates(new_row); vector> joiners; std::shared_ptr[]> colMappings, fergMappings; boost::scoped_array> smallNullMem; boost::scoped_array joinFEMem; Row smallNullRow; boost::scoped_array baseRowMem; if (joiner->hasFEFilter()) { joinFERG.initRow(&l_joinFERow, true); joinFEMem.reset(new uint8_t[l_joinFERow.getSize()]); l_joinFERow.setData(rowgroup::Row::Pointer(joinFEMem.get())); } outputRG.initRow(&l_outputRow); outputRG.initRow(&baseRow, true); largeRG.initRow(&l_largeRow); baseRowMem.reset(new uint8_t[baseRow.getSize()]); baseRow.setData(rowgroup::Row::Pointer(baseRowMem.get())); joinMatches.emplace_back(vector()); smallRG.initRow(&smallRowTemplates[0]); joiners.resize(1); colMappings.reset(new std::shared_ptr[2]); colMappings[0] = SOMapping; colMappings[1] = LOMapping; if (fe) { fergMappings.reset(new std::shared_ptr[2]); fergMappings[0] = SjoinFEMapping; fergMappings[1] = LjoinFEMapping; } l_smallRG.initRow(&smallNullRow, true); smallNullMem.reset(new boost::scoped_array[1]); smallNullMem[0].reset(new uint8_t[smallNullRow.getSize()]); smallNullRow.setData(rowgroup::Row::Pointer(smallNullMem[0].get())); smallNullRow.initToNull(); try { while (true) { more = buildFIFO[threadID]->next(it, &in); if (!more || cancelled()) goto out; joiners[0] = in->tupleJoiner; boost::shared_ptr largeData; largeData = in->jp->getNextLargeRGData(); while (largeData) { l_largeRG.setData(largeData.get()); thjs->joinOneRG(0, joinResults, l_largeRG, l_outputRG, l_largeRow, l_joinFERow, l_outputRow, baseRow, joinMatches, smallRowTemplates, outputDL.get(), &joiners, &colMappings, &fergMappings, &smallNullMem); if (joinResults.size()) outputResult(joinResults); thjs->returnMemory(); joinResults.clear(); largeData = in->jp->getNextLargeRGData(); } if (joinType & SMALLOUTER) { if (!lastLargeIteration) { /* TODO: an optimization would be to detect whether any new rows were marked and if not suppress the save operation */ vector unmatched; in->tupleJoiner->getUnmarkedRows(&unmatched); // cout << "***** saving partition " << in->partitionID << " unmarked count=" << unmatched.size() << // " total count=" // << in->tupleJoiner->size() << " vector size=" << in->smallData.size() << endl; in->jp->saveSmallSidePartition(in->smallData); } else { // cout << "finishing small-outer output" << endl; vector unmatched; RGData rgData(l_outputRG); Row outputRow; l_outputRG.setData(&rgData); l_outputRG.resetRowGroup(0); l_outputRG.initRow(&outputRow); l_outputRG.getRow(0, &outputRow); l_largeRG.initRow(&l_largeRow, true); boost::scoped_array largeNullMem(new uint8_t[l_largeRow.getSize()]); l_largeRow.setData(rowgroup::Row::Pointer(largeNullMem.get())); l_largeRow.initToNull(); in->tupleJoiner->getUnmarkedRows(&unmatched); // cout << " small-outer count=" << unmatched.size() << endl; for (i = 0; i < (int)unmatched.size(); i++) { smallRowTemplates[0].setData(unmatched[i]); applyMapping(LOMapping, l_largeRow, &outputRow); applyMapping(SOMapping, smallRowTemplates[0], &outputRow); l_outputRG.incRowCount(); if (l_outputRG.getRowCount() == 8192) { outputResult(rgData); // cout << "inserting a full RG" << endl; if (thjs) { // FIXME: Possible false positive. Something wrong with this calculation, just put a warning // until fixed. if (!thjs->getMemory(l_outputRG.getMaxDataSize())) { // FIXME: This is also looks wrong. // calculate guess of size required for error message uint64_t memReqd = (unmatched.size() * outputRG.getDataSize(1)) / 1048576; uint64_t memLimit = thjs->resourceManager->getConfiguredUMMemLimit() / 1048576; std::cerr << "DiskJoin::joinFcn() possible OOM for the join result, mem required: " << memReqd << " mem limit: " << memLimit << std::endl; } } rgData.reinit(l_outputRG); l_outputRG.setData(&rgData); l_outputRG.resetRowGroup(0); l_outputRG.getRow(0, &outputRow); } else outputRow.nextRow(); } if (l_outputRG.getRowCount()) outputResult(rgData); if (thjs) thjs->returnMemory(); } } } } // the try stmt above; need to reformat. catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL, "DiskJoinStep::joinFcn()"); status(logging::ERR_EXEMGR_MALFUNCTION); abort(); } out: while (more) more = buildFIFO[threadID]->next(it, &in); if (cancelled()) { reportStats(); outputDL->endOfInput(); closedOutput = true; } } void DiskJoinStep::initializeFIFO(const uint32_t threadCount) { loadFIFO.clear(); buildFIFO.clear(); for (uint32_t i = 0; i < threadCount; ++i) { boost::shared_ptr lFIFO(new LoaderOutputFIFO(1, 1)); boost::shared_ptr bFIFO(new BuilderOutputFIFO(1, 1)); loadFIFO.push_back(lFIFO); buildFIFO.push_back(bFIFO); } } void DiskJoinStep::processJoinPartitions(const uint32_t threadID, const uint32_t smallSideSizeLimitPerThread, const std::vector& joinPartitions) { std::vector pipelineThreads; pipelineThreads.reserve(3); pipelineThreads.push_back( jobstepThreadPool.invoke(Loader(this, threadID, smallSideSizeLimitPerThread, joinPartitions))); pipelineThreads.push_back(jobstepThreadPool.invoke(Builder(this, threadID))); pipelineThreads.push_back(jobstepThreadPool.invoke(Joiner(this, threadID))); jobstepThreadPool.join(pipelineThreads); } void DiskJoinStep::prepareJobs(const std::vector& joinPartitions, JoinPartitionJobs& joinPartitionsJobs) { const uint32_t issuedThreads = jobstepThreadPool.getIssuedThreads(); const uint32_t maxNumOfThreads = jobstepThreadPool.getMaxThreads(); const uint32_t numOfThreads = std::min(std::min(maxNumOfJoinThreads, std::max(maxNumOfThreads - issuedThreads, (uint32_t)1)), (uint32_t)joinPartitions.size()); const uint32_t workSize = joinPartitions.size() / numOfThreads; uint32_t offset = 0; joinPartitionsJobs.reserve(numOfThreads); for (uint32_t threadNum = 0; threadNum < numOfThreads; ++threadNum, offset += workSize) { auto start = joinPartitions.begin() + offset; auto end = start + workSize; std::vector joinPartitionJob(start, end); joinPartitionsJobs.push_back(std::move(joinPartitionJob)); } for (uint32_t i = 0, e = joinPartitions.size() % numOfThreads; i < e; ++i, ++offset) joinPartitionsJobs[i].push_back(joinPartitions[offset]); } void DiskJoinStep::outputResult(const std::vector& result) { std::lock_guard lk(outputMutex); for (const auto& rgData : result) outputDL->insert(rgData); } void DiskJoinStep::outputResult(const rowgroup::RGData& result) { std::lock_guard lk(outputMutex); outputDL->insert(result); } void DiskJoinStep::spawnJobs(const std::vector>& joinPartitionsJobs, const uint32_t smallSideSizeLimitPerThread) { const uint32_t threadsCount = joinPartitionsJobs.size(); std::vector processorThreadsId; processorThreadsId.reserve(threadsCount); for (uint32_t threadID = 0; threadID < threadsCount; ++threadID) { processorThreadsId.push_back(jobstepThreadPool.invoke( JoinPartitionsProcessor(this, threadID, smallSideSizeLimitPerThread, joinPartitionsJobs[threadID]))); } jobstepThreadPool.join(processorThreadsId); } void DiskJoinStep::mainRunner() { try { smallReader(); while (!lastLargeIteration && !cancelled()) { jp->initForLargeSideFeed(); largeReader(); jp->initForProcessing(); // Collect all join partitions. std::vector joinPartitions; jp->collectJoinPartitions(joinPartitions); // Split partitions for each threads. JoinPartitionJobs joinPartitionsJobs; prepareJobs(joinPartitions, joinPartitionsJobs); // Initialize data lists. const uint32_t numOfThreads = joinPartitionsJobs.size(); initializeFIFO(numOfThreads); // Spawn jobs. const uint32_t smallSideSizeLimitPerThread = partitionSize / numOfThreads; spawnJobs(joinPartitionsJobs, smallSideSizeLimitPerThread); } } catch (...) { handleException(std::current_exception(), logging::ERR_EXEMGR_MALFUNCTION, logging::ERR_ALWAYS_CRITICAL, "DiskJoinStep::mainRunner()"); status(logging::ERR_EXEMGR_MALFUNCTION); abort(); } // make sure all inputs were drained & output closed if (UNLIKELY(cancelled())) { try { jp->initForLargeSideFeed(); } catch (...) { } // doesn't matter if this fails to open the large-file largeReader(); // large reader will only drain the fifo when cancelled() if (!closedOutput) { outputDL->endOfInput(); closedOutput = true; } } if (LIKELY(!closedOutput)) { outputDL->endOfInput(); closedOutput = true; } } const string DiskJoinStep::toString() const { return "DiskJoinStep\n"; } void DiskJoinStep::reportStats() { ostringstream os1, os2; os1 << "DiskJoinStep: joined (large) " << alias() << " to (small) " << joiner->getTableName() << ". Processing stages: " << largeIterationCount << ", disk usage small/large: " << jp->getMaxSmallSize() << "/" << jp->getMaxLargeSize() << ", total bytes read/written: " << jp->getBytesRead() << "/" << jp->getBytesWritten() << endl; fExtendedInfo = os1.str(); /* TODO: Can this report anything more useful in miniInfo? */ int64_t bytesToReport = jp->getBytesRead() + jp->getBytesWritten(); char units; if (bytesToReport > (1 << 30)) { bytesToReport >>= 30; units = 'G'; } else if (bytesToReport > (1 << 20)) { bytesToReport >>= 20; units = 'M'; } else if (bytesToReport > (1 << 10)) { bytesToReport >>= 10; units = 'K'; } else units = ' '; os2 << "DJS UM " << alias() << "-" << joiner->getTableName() << " - - " << bytesToReport << units << " - - -------- -\n"; fMiniInfo = os2.str(); if (traceOn()) logEnd(os1.str().c_str()); } } // namespace joblist