1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-07 03:22:57 +03:00
This commit is contained in:
david hill
2017-07-17 15:43:18 -05:00
parent 3501c1a17a
commit d5e873e198
2 changed files with 519 additions and 412 deletions

View File

@@ -171,7 +171,7 @@ namespace joblist
DistributedEngineComm* DistributedEngineComm::fInstance = 0;
/*static*/
DistributedEngineComm* DistributedEngineComm::instance(ResourceManager& rm, bool isExeMgr)
DistributedEngineComm* DistributedEngineComm::instance(ResourceManager* rm, bool isExeMgr)
{
if (fInstance == 0)
fInstance = new DistributedEngineComm(rm, isExeMgr);
@@ -186,9 +186,9 @@ namespace joblist
fInstance = 0;
}
DistributedEngineComm::DistributedEngineComm(ResourceManager& rm, bool isExeMgr) :
DistributedEngineComm::DistributedEngineComm(ResourceManager* rm, bool isExeMgr) :
fRm(rm),
fLBIDShift(fRm.getPsLBID_Shift()),
fLBIDShift(fRm->getPsLBID_Shift()),
pmCount(0),
fIsExeMgr(isExeMgr)
{
@@ -219,10 +219,10 @@ void DistributedEngineComm::Setup()
newClients.clear();
newLocks.clear();
throttleThreshold = fRm.getDECThrottleThreshold();
uint32_t newPmCount = fRm.getPsCount();
int cpp = (fIsExeMgr ? fRm.getPsConnectionsPerPrimProc() : 1);
tbpsThreadCount = fRm.getJlNumScanReceiveThreads();
throttleThreshold = fRm->getDECThrottleThreshold();
uint32_t newPmCount = fRm->getPsCount();
int cpp = (fIsExeMgr ? fRm->getPsConnectionsPerPrimProc() : 1);
tbpsThreadCount = fRm->getJlNumScanReceiveThreads();
unsigned numConnections = newPmCount * cpp;
oam::Oam oam;
ModuleTypeConfig moduletypeconfig;
@@ -246,7 +246,7 @@ void DistributedEngineComm::Setup()
string fServer (oss.str());
boost::shared_ptr<MessageQueueClient>
cl(new MessageQueueClient(fServer, fRm.getConfig()));
cl(new MessageQueueClient(fServer, fRm->getConfig()));
boost::shared_ptr<boost::mutex> nl(new boost::mutex());
try {
if (cl->connect()) {
@@ -297,7 +297,7 @@ void DistributedEngineComm::Setup()
int DistributedEngineComm::Close()
{
//cout << "DistributedEngineComm::Close() called" << endl;
cout << "DistributedEngineComm::Close() called" << endl;
makeBusy(false);
// for each MessageQueueClient in pmConnections delete the MessageQueueClient;
@@ -337,9 +337,9 @@ Error:
// @bug 488 - error condition! push 0 length bs to messagequeuemap and
// eventually let jobstep error out.
mutex::scoped_lock lk(fMlock);
//cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
cout << "WARNING: DEC READ 0 LENGTH BS FROM " << client->otherEnd()<< endl;
MessageQueueMap::iterator map_tok;
/* MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
for (map_tok = fSessionMessages.begin(); map_tok != fSessionMessages.end(); ++map_tok)
@@ -356,21 +356,21 @@ Error:
{
mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = client->moduleName();
//cout << "moduleName=" << moduleName << endl;
cout << "moduleName=" << moduleName << endl;
for ( uint32_t i = 0; i < fPmConnections.size(); i++)
{
if (moduleName != fPmConnections[i]->moduleName())
tempConns.push_back(fPmConnections[i]);
//else
//cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
cout << "DEC remove PM" << fPmConnections[i]->otherEnd() << " moduleName=" << fPmConnections[i]->moduleName() << endl;
}
if (tempConns.size() == fPmConnections.size()) return;
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
//cout << "PMCOUNT=" << pmCount << endl;
cout << "PMCOUNT=" << pmCount << endl;
*/
// send alarm & log it
ALARMManager alarmMgr;
string alarmItem = client->addr2String();
@@ -380,7 +380,7 @@ Error:
ostringstream os;
os << "DEC: lost connection to " << client->addr2String();
writeToLog(__FILE__, __LINE__, os.str(), LOG_TYPE_CRITICAL);
}
// }
return;
}
@@ -861,9 +861,9 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
{
// @bug 488. error out under such condition instead of re-trying other connection,
// by pushing 0 size bytestream to messagequeue and throw excpetion
SBS sbs;
/* SBS sbs;
lk.lock();
//cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl;
cout << "WARNING: DEC WRITE BROKEN PIPE. PMS index = " << index << endl;
MessageQueueMap::iterator map_tok;
sbs.reset(new ByteStream(0));
@@ -879,10 +879,10 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
// reconfig the connection array
ClientList tempConns;
{
//cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
cout << "WARNING: DEC WRITE BROKEN PIPE " << fPmConnections[index]->otherEnd()<< endl;
mutex::scoped_lock onErrLock(fOnErrMutex);
string moduleName = fPmConnections[index]->moduleName();
//cout << "module name = " << moduleName << endl;
cout << "module name = " << moduleName << endl;
if (index >= fPmConnections.size()) return 0;
for (uint32_t i = 0; i < fPmConnections.size(); i++)
@@ -894,7 +894,7 @@ int DistributedEngineComm::writeToClient(size_t index, const ByteStream& bs, uin
fPmConnections.swap(tempConns);
pmCount = (pmCount == 0 ? 0 : pmCount - 1);
}
*/
// send alarm
ALARMManager alarmMgr;
string alarmItem("UNKNOWN");