1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

fix(disk-based-join): MCOL-5626 Fix for race in DJS with outer join. (#3064)

This commit is contained in:
Denis Khalikov
2023-12-15 11:20:27 +03:00
committed by GitHub
parent fb496644f8
commit 74c1a38f2c
2 changed files with 18 additions and 12 deletions

View File

@ -399,12 +399,12 @@ void DiskJoinStep::joinFcn(const uint32_t threadID)
RowGroup l_outputRG = outputRG;
Row l_largeRow;
Row l_joinFERow, l_outputRow, baseRow;
vector<vector<Row::Pointer> > joinMatches;
vector<vector<Row::Pointer>> joinMatches;
auto new_row = new Row[1];
std::shared_ptr<Row[]> smallRowTemplates(new_row);
vector<std::shared_ptr<TupleJoiner>> joiners;
std::shared_ptr<std::shared_ptr<int[]>[]> colMappings, fergMappings;
boost::scoped_array<boost::scoped_array<uint8_t> > smallNullMem;
boost::scoped_array<boost::scoped_array<uint8_t>> smallNullMem;
boost::scoped_array<uint8_t> joinFEMem;
Row smallNullRow;
@ -515,21 +515,20 @@ void DiskJoinStep::joinFcn(const uint32_t threadID)
if (l_outputRG.getRowCount() == 8192)
{
outputDL->insert(rgData);
outputResult(rgData);
// cout << "inserting a full RG" << endl;
if (thjs)
{
// FIXME: Possible false positive. Something wrong with this calculation, just put a warning
// until fixed.
if (!thjs->getMemory(l_outputRG.getMaxDataSize()))
{
// FIXME: This is also looks wrong.
// calculate guess of size required for error message
uint64_t memReqd = (unmatched.size() * outputRG.getDataSize(1)) / 1048576;
Message::Args args;
args.add(memReqd);
args.add(thjs->resourceManager->getConfiguredUMMemLimit() / 1048576);
std::cerr << logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_JOIN_RESULT_TOO_BIG,
args)
<< " @" << __FILE__ << ":" << __LINE__;
throw logging::IDBExcept(logging::ERR_JOIN_RESULT_TOO_BIG, args);
uint64_t memLimit = thjs->resourceManager->getConfiguredUMMemLimit() / 1048576;
std::cerr << "DiskJoin::joinFcn() possible OOM for the join result, mem required: "
<< memReqd << " mem limit: " << memLimit << std::endl;
}
}
@ -543,7 +542,7 @@ void DiskJoinStep::joinFcn(const uint32_t threadID)
}
if (l_outputRG.getRowCount())
outputResult({rgData});
outputResult(rgData);
if (thjs)
thjs->returnMemory();
}
@ -625,10 +624,16 @@ void DiskJoinStep::prepareJobs(const std::vector<JoinPartition*>& joinPartitions
void DiskJoinStep::outputResult(const std::vector<rowgroup::RGData>& result)
{
std::lock_guard<std::mutex> lk(outputMutex);
for (const auto &rgData : result)
for (const auto& rgData : result)
outputDL->insert(rgData);
}
void DiskJoinStep::outputResult(const rowgroup::RGData& result)
{
std::lock_guard<std::mutex> lk(outputMutex);
outputDL->insert(result);
}
void DiskJoinStep::spawnJobs(const std::vector<std::vector<JoinPartition*>>& joinPartitionsJobs,
const uint32_t smallSideSizeLimitPerThread)
{