You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-3645 - Make ExeMgr destroy joblists in a different thread
A couple lambdas and a little sync...
This commit is contained in:
@@ -40,9 +40,6 @@
|
|||||||
* front-end.
|
* front-end.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <csignal>
|
#include <csignal>
|
||||||
@@ -75,6 +72,10 @@
|
|||||||
#include "utils_utf8.h"
|
#include "utils_utf8.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
|
#include <condition_variable>
|
||||||
|
|
||||||
#if defined(SKIP_OAM_INIT)
|
#if defined(SKIP_OAM_INIT)
|
||||||
#include "dbrm.h"
|
#include "dbrm.h"
|
||||||
#endif
|
#endif
|
||||||
@@ -531,6 +532,9 @@ public:
|
|||||||
csep.sessionID(0);
|
csep.sessionID(0);
|
||||||
joblist::SJLP jl;
|
joblist::SJLP jl;
|
||||||
bool incSessionThreadCnt = true;
|
bool incSessionThreadCnt = true;
|
||||||
|
std::mutex jlMutex;
|
||||||
|
std::condition_variable jlCleanupDone;
|
||||||
|
int destructing = 0;
|
||||||
|
|
||||||
bool selfJoin = false;
|
bool selfJoin = false;
|
||||||
bool tryTuples = false;
|
bool tryTuples = false;
|
||||||
@@ -545,7 +549,22 @@ public:
|
|||||||
tryTuples = false;
|
tryTuples = false;
|
||||||
usingTuples = false;
|
usingTuples = false;
|
||||||
|
|
||||||
jl.reset();
|
if (jl)
|
||||||
|
{
|
||||||
|
// puts the real destruction in another thread to avoid
|
||||||
|
// making the whole session wait. It can take several seconds.
|
||||||
|
std::unique_lock<std::mutex> scoped(jlMutex);
|
||||||
|
destructing++;
|
||||||
|
std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, &destructing] {
|
||||||
|
std::unique_lock<std::mutex> scoped(jlMutex);
|
||||||
|
const_cast<joblist::SJLP &>(jl).reset(); // this happens second; does real destruction
|
||||||
|
if (--destructing == 0)
|
||||||
|
jlCleanupDone.notify_one();
|
||||||
|
});
|
||||||
|
jl.reset(); // this runs first
|
||||||
|
bgdtor.detach();
|
||||||
|
}
|
||||||
|
|
||||||
bs = fIos.read();
|
bs = fIos.read();
|
||||||
|
|
||||||
if (bs.length() == 0)
|
if (bs.length() == 0)
|
||||||
@@ -1038,7 +1057,7 @@ new_plan:
|
|||||||
//@Bug 1306. Added timing info for real time tracking.
|
//@Bug 1306. Added timing info for real time tracking.
|
||||||
std::cout << ss << " at " << timeNow() << std::endl;
|
std::cout << ss << " at " << timeNow() << std::endl;
|
||||||
|
|
||||||
// log query status to debug log file
|
// log query stats to debug log file
|
||||||
args.reset();
|
args.reset();
|
||||||
args.add((int)csep.statementID());
|
args.add((int)csep.statementID());
|
||||||
args.add(fStats.fMaxMemPct);
|
args.add(fStats.fMaxMemPct);
|
||||||
@@ -1061,13 +1080,28 @@ new_plan:
|
|||||||
// here to make sure all syslogging from all the threads
|
// here to make sure all syslogging from all the threads
|
||||||
// are complete; and that our logDbProfEndStatement will
|
// are complete; and that our logDbProfEndStatement will
|
||||||
// appear "last" in the syslog for this SQL statement.
|
// appear "last" in the syslog for this SQL statement.
|
||||||
jl.reset();
|
// puts the real destruction in another thread to avoid
|
||||||
args.reset();
|
// making the whole session wait. It can take several seconds.
|
||||||
args.add((int)csep.statementID());
|
int stmtID = csep.statementID();
|
||||||
|
std::unique_lock<std::mutex> scoped(jlMutex);
|
||||||
|
// C7's compiler complains about the msgLog capture here
|
||||||
|
// msgLog is global scope, and passed by copy, so, unclear
|
||||||
|
// what the warning is about.
|
||||||
|
destructing++;
|
||||||
|
std::thread bgdtor([jl, &jlMutex, &jlCleanupDone, stmtID, &li, msgLog, &destructing] {
|
||||||
|
std::unique_lock<std::mutex> scoped(jlMutex);
|
||||||
|
const_cast<joblist::SJLP &>(jl).reset(); // this happens second; does real destruction
|
||||||
|
logging::Message::Args args;
|
||||||
|
args.add(stmtID);
|
||||||
msgLog.logMessage(logging::LOG_TYPE_DEBUG,
|
msgLog.logMessage(logging::LOG_TYPE_DEBUG,
|
||||||
logDbProfEndStatement,
|
logDbProfEndStatement,
|
||||||
args,
|
args,
|
||||||
li);
|
li);
|
||||||
|
if (--destructing == 0)
|
||||||
|
jlCleanupDone.notify_one();
|
||||||
|
});
|
||||||
|
jl.reset(); // this happens first
|
||||||
|
bgdtor.detach();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
// delete sessionMemMap entry for this session's memory % use
|
// delete sessionMemMap entry for this session's memory % use
|
||||||
@@ -1127,14 +1161,12 @@ new_plan:
|
|||||||
qts.local_query = csep.localQuery();
|
qts.local_query = csep.localQuery();
|
||||||
fTeleClient.postQueryTele(qts);
|
fTeleClient.postQueryTele(qts);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
jl.reset();
|
|
||||||
// Release CSC object (for sessionID) that was added by makeJobList()
|
// Release CSC object (for sessionID) that was added by makeJobList()
|
||||||
// Mask 0x80000000 is for associate user query and csc query
|
// Mask 0x80000000 is for associate user query and csc query.
|
||||||
|
// (actual joblist destruction happens at the top of this loop)
|
||||||
decThreadCntPerSession( csep.sessionID() | 0x80000000 );
|
decThreadCntPerSession( csep.sessionID() | 0x80000000 );
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (std::exception& ex)
|
catch (std::exception& ex)
|
||||||
{
|
{
|
||||||
@@ -1158,6 +1190,12 @@ new_plan:
|
|||||||
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logExeMgrExcpt, args, li);
|
msgLog.logMessage(logging::LOG_TYPE_CRITICAL, logExeMgrExcpt, args, li);
|
||||||
fIos.close();
|
fIos.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make sure we don't leave scope while joblists are being destroyed
|
||||||
|
std::unique_lock<std::mutex> scoped(jlMutex);
|
||||||
|
while (destructing > 0)
|
||||||
|
jlCleanupDone.wait(scoped);
|
||||||
|
std::cout << "session thread exiting" << std::endl;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user