1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Updated with latest InfiniDB Develop (4.6.6)

This commit is contained in:
David Hall
2016-01-14 10:27:21 -06:00
parent f6afc42dd0
commit 1bd427486c
27 changed files with 556 additions and 373 deletions

View File

@ -41,11 +41,14 @@ using namespace execplan;
#include "tupleunion.h"
#include "tupleaggregatestep.h"
#include "windowfunctionstep.h"
#include "configcpp.h"
#include "oamcache.h"
#include "atomicops.h"
namespace joblist
{
int JobList::fPmsConfigured = 0;
struct JSJoiner
{
@ -60,7 +63,7 @@ struct JSJoiner
JobList::JobList(bool isEM) :
fIsRunning(false),
fIsExeMgr(isEM),
fPmConnected(false),
fPmsConnected(0),
projectingTableOID(0),
fAborted(0),
fPriority(50)
@ -119,7 +122,7 @@ JobList::~JobList()
int JobList::doQuery()
{
// Don't start the steps if there is no PrimProc connection.
if (!fPmConnected)
if (fPmsConfigured < 1 || fPmsConnected < fPmsConfigured)
return 0;
JobStep *js;
@ -206,20 +209,38 @@ int JobList::doQuery()
int JobList::putEngineComm(DistributedEngineComm* dec)
{
int retryCnt = 0;
while (!fPmConnected)
if (fPmsConfigured == 0)
{
// Don't sleep until after the first retry
if (retryCnt > 1)
logging::LoggingID lid(05);
logging::MessageLog ml(lid);
logging::Message::Args args;
logging::Message m(0);
// We failed to get a connection
args.add("There are no PMs configured. Can't perform Query");
args.add(retryCnt);
m.format(args);
ml.logDebugMessage(m);
if (!errInfo)
errInfo.reset(new ErrorInfo);
errInfo->errCode = logging::ERR_NO_PRIMPROC;
errInfo->errMsg = logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_NO_PRIMPROC);
return errInfo->errCode;
}
// Check to be sure all PrimProcs are attached.
fPmsConnected = dec->connectedPmServers();
while (fPmsConnected < fPmsConfigured)
{
sleep(1);
}
fPmConnected = (dec->connectedPmServers() > 0);
fPmsConnected = dec->connectedPmServers();
// Give up after 20 seconds. Primproc isn't coming back
if (fPmConnected || retryCnt >= 20)
if (retryCnt >= 20)
{
break;
}
++retryCnt;
oam::OamCache *oamCache = oam::OamCache::makeOamCache();
oamCache->forceReload();
dec->Setup();
}
if (retryCnt > 0)
@ -228,10 +249,10 @@ int JobList::putEngineComm(DistributedEngineComm* dec)
logging::MessageLog ml(lid);
logging::Message::Args args;
logging::Message m(0);
if (!fPmConnected)
if (fPmsConnected < fPmsConfigured)
{
// We failed to get a connection
args.add("Failed to get a PrimProc connection. Retry count");
args.add("Failed to get all PrimProc connections. Retry count");
args.add(retryCnt);
m.format(args);
ml.logDebugMessage(m);

View File

@ -123,13 +123,19 @@ public:
// @bug4848, enhance and unify limit handling.
EXPORT virtual void abortOnLimit(JobStep* js);
static void setPMsConfigured(int pms) {fPmsConfigured = pms;}
protected:
//defaults okay
//JobList(const JobList& rhs);
//JobList& operator=(const JobList& rhs);
bool fIsRunning;
bool fIsExeMgr;
bool fPmConnected;
int fPmsConnected;
// Dirty pool kludge. Contains the number of PMs configured in Calpont.xml.
// This kludge reduces the number of calls needed to config.Config, which are expensive.
static int fPmsConfigured;
DeliveredTableMap fDeliveredTables;
execplan::CalpontSystemCatalog::OID projectingTableOID; //DeliveryWSDLs get a reference to this

View File

@ -1595,6 +1595,9 @@ SJLP makeJobList_(
CalpontSelectExecutionPlan* csep = dynamic_cast<CalpontSelectExecutionPlan*>(cplan);
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(csep->sessionID());
static config::Config* sysConfig = config::Config::makeConfig();
int pmsConfigured = atoi(sysConfig->getConfig("PrimitiveServers", "Count").c_str());
// We have to go ahead and create JobList now so we can store the joblist's
// projectTableOID pointer in JobInfo for use during jobstep creation.
SErrorInfo errorInfo(new ErrorInfo());
@ -1602,6 +1605,7 @@ SJLP makeJobList_(
boost::shared_ptr<int> subCount(new int);
*subCount = 0;
JobList* jl = new TupleJobList(isExeMgr);
jl->setPMsConfigured(pmsConfigured);
jl->priority(csep->priority());
jl->errorInfo(errorInfo);
rm.setTraceFlags(csep->traceFlags());
@ -1794,6 +1798,7 @@ SJLP JobListFactory::makeJobList(
SJLP ret;
string emsg;
unsigned errCode = 0;
ret = makeJobList_(cplan, rm, isExeMgr, errCode, emsg);
if (!ret)

View File

@ -708,7 +708,7 @@ int ha_calpont_impl_write_batch_row_(uchar *buf, TABLE* table, cal_impl_if::cal_
if (colpos == ci.columnTypes.size())
break;
if (headerByte >= ci.headerLength)
if (ci.headerLength > 0 && headerByte >= ci.headerLength)
{
// We've used more null bits than allowed. Something is seriously wrong.
std::string errormsg = "Null bit header is wrong size";

View File

@ -78,6 +78,7 @@ plugin_dir = /usr/local/Calpont/mysql/lib/mysql/plugin
# Replication Master Server (default)
# binary logging is required for replication
# log-bin=mysql-bin
# binlog_format=ROW
# required unique id between 1 and 2^32 - 1
# defaults to 1 if master-host

View File

@ -1189,6 +1189,8 @@ void added_a_pm(int)
catch (...)
{}
oam::OamCache *oamCache = oam::OamCache::makeOamCache();
oamCache->forceReload();
ec->Setup();
//set ACTIVE state

View File

@ -3,3 +3,16 @@
infinidb_local_query
log-bin=mysql-bin
server-id
max_length_for_sort_data
tmpdir
log-error
general_log_file
slow_query_log_file
general-log
slow-query-log
character-set-server
collation-server
init-connect
binlog_format
secure-auth
port

View File

@ -6,14 +6,12 @@
# check log for error
checkForError() {
grep ERROR /tmp/mysql_install.log > /tmp/error.check
if [ `cat /tmp/error.check | wc -c` -ne 0 ]; then
# check for password error
grep "ERROR 1045" /tmp/mysql_install.log > /tmp/error.check
if [ `cat /tmp/error.check | wc -c` -ne 0 ]; then
echo "MySQL Password missing or incorrect, check local file"
password=`$installdir/bin/getMySQLpw`
if [ $? -ne 0 ]; then
echo "MySQL Password missing or incorrect"
rm -f /tmp/error.check
$installdir/mysql/mysql-Calpont stop
sleep 2
@ -21,18 +19,8 @@ checkForError() {
fi
rm -f /tmp/error.check
return 1;
else
# ignore 1125 - already exist error
grep "ERROR 1125" /tmp/mysql_install.log > /tmp/error.check
if [ `cat /tmp/error.check | wc -c` -eq 0 ]; then
echo "ERROR: check log file: /tmp/mysql_install.log"
rm -f /tmp/error.check
$installdir/mysql/mysql-Calpont stop
sleep 2
exit 1;
fi
fi
fi
rm -f /tmp/error.check
return 0;
}
@ -100,7 +88,7 @@ if [ -x $installdir/mysql/mysql-Calpont ]; then
$installdir/mysql/install_calpont_mysql.sh --password=$password --installdir=$installdir
checkForError
if [ $? -ne 0 ]; then
echo "ERROR: missing or invalidate passed"
echo "ERROR: missing or invalid password"
$installdir/mysql/mysql-Calpont stop
sleep 2
exit 1;

View File

@ -41,9 +41,9 @@ if { $MYSQLPW == "none" } {
}
set BASH "/bin/bash "
if { $DEBUG == "1" } {
set BASH "/bin/bash -x "
}
#if { $DEBUG == "1" } {
# set BASH "/bin/bash -x "
#}
log_user $DEBUG
spawn -noecho /bin/bash

View File

@ -8191,7 +8191,7 @@ namespace oam
bool Oam::disableMySQLRep()
{
// build and send msg
int returnStatus = sendMsgToProcMgr(DISABLEMYSQLREP);
int returnStatus = sendMsgToProcMgr(DISABLEMYSQLREP, oam::UnassignedName, FORCEFUL, ACK_YES);
if (returnStatus != API_SUCCESS)
exceptionControl("disableMySQLRep", returnStatus);
@ -9079,6 +9079,8 @@ namespace oam
if ( (cloud == "amazon-ec2" || cloud == "amazon-vpc") &&
DBRootStorageType == "external" )
{
writeLog("amazonReattach function started ", LOG_TYPE_DEBUG );
//get Instance Name for to-pm
string toInstanceName = oam::UnassignedName;
try

View File

@ -97,6 +97,7 @@ void OamCache::checkReload()
map<int, int> pmToConnectionMap;
#ifdef _MSC_VER
moduleIds.push_back(*it);
pmToConnectionMap[*it] = i++;
#else
// Restore for Windows when we support multiple PMs
while (it != uniquePids.end())
@ -130,7 +131,13 @@ void OamCache::checkReload()
#endif
dbRootConnectionMap.reset(new map<int, int>());
for (i = 0; i < dbroots.size(); i++)
(*dbRootConnectionMap)[dbroots[i]] = pmToConnectionMap[(*dbRootPMMap)[dbroots[i]]];
{
map<int, int>::iterator pmIter = pmToConnectionMap.find((*dbRootPMMap)[dbroots[i]]);
if (pmIter != pmToConnectionMap.end())
{
(*dbRootConnectionMap)[dbroots[i]] = (*pmIter).second;
}
}
pmDbrootsMap.reset(new OamCache::PMDbrootsMap_t::element_type());
systemStorageInfo_t t;

View File

@ -42,6 +42,7 @@ public:
EXPORT virtual ~OamCache();
EXPORT void checkReload();
EXPORT void forceReload() {mtime=0;}
EXPORT dbRootPMMap_t getDBRootToPMMap();
EXPORT dbRootPMMap_t getDBRootToConnectionMap();

View File

@ -1750,6 +1750,26 @@ int processCommand(string* arguments)
break;
}
SystemStatus systemstatus;
try {
oam.getSystemStatus(systemstatus);
if (systemstatus.SystemOpState != oam::ACTIVE ) {
cout << endl << "**** removeDbroot Failed, System has to be in a ACTIVE state" << endl;
break;
}
}
catch (exception& e)
{
cout << endl << "**** removeDbroot Failed : " << e.what() << endl;
break;
}
catch(...)
{
cout << endl << "**** removeDbroot Failed, Failed return from getSystemStatus API" << endl;
break;
}
systemStorageInfo_t t;
try
{
@ -2044,6 +2064,11 @@ int processCommand(string* arguments)
cout << endl << "ERROR: Stopping InfiniDB Service failure, check /tmp/cc-stop.pdsh. exit..." << endl;
}
}
else
{
string cmd = startup::StartUp::installDir() + "/bin/infinidb stop > /tmp/status.log";
system(cmd.c_str());
}
}
catch (exception& e)
{
@ -4877,7 +4902,7 @@ int processCommand(string* arguments)
if ( MySQLPasswordConfig == oam::UnassignedName ) {
cout << endl;
string prompt = "Is there a 'MySQL' Password configured in " + HOME + "/.my.cnf (y,n): ";
string prompt = "Is there a 'MySQL' Password configured on the MySQL Front-end Modules in " + HOME + "/.my.cnf (y,n): ";
MySQLPasswordConfig = dataPrompt(prompt);
}
@ -4892,6 +4917,7 @@ int processCommand(string* arguments)
//set flag
try {
oam.setSystemConfig("MySQLRep", "y");
sleep(2);
}
catch(...) {}
@ -5824,7 +5850,7 @@ int processCommand(string* arguments)
if ( MySQLPasswordConfig == oam::UnassignedName ) {
cout << endl;
string prompt = "Is there a 'MySQL' Password configured in " + HOME + "/.my.cnf (y,n): ";
string prompt = "Is there a 'MySQL' Password configured on the MySQL Front-end Modules in " + HOME + "/.my.cnf (y,n): ";
MySQLPasswordConfig = dataPrompt(prompt);
}
@ -5839,6 +5865,7 @@ int processCommand(string* arguments)
//set flag
try {
oam.setSystemConfig("MySQLRep", "n");
sleep(2);
}
catch(...) {}

View File

@ -403,7 +403,7 @@ int sendUpgradeRequest(int IserverTypeInstall, bool pmwithum)
*
*
******************************************************************************************/
int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string port)
int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string port, bool pmwithum)
{
Oam oam;
@ -490,6 +490,13 @@ int sendReplicationRequest(int IserverTypeInstall, std::string password, std::st
}
else
{ // set for slave repl request
// don't do PMs unless PMwithUM flag is set
string moduleType = (*pt).DeviceName.substr(0,MAX_MODULE_TYPE_SIZE);
if ( moduleType == "pm" && !pmwithum ) {
pt++;
continue;
}
ByteStream msg;
ByteStream::byte requestID = oam::SLAVEREP;
msg << requestID;

View File

@ -38,7 +38,7 @@ extern void dbrmDirCheck();
extern void mysqlSetup();
extern int sendMsgProcMon( std::string module, ByteStream msg, int requestID, int timeout );
extern int sendUpgradeRequest(int IserverTypeInstall, bool pmwithum);
extern int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string mysqlPort);
extern int sendReplicationRequest(int IserverTypeInstall, std::string password, std::string mysqlPort, bool pmwithum);
extern void checkFilesPerPartion(int DBRootCount, Config* sysConfig);
extern void checkMysqlPort( string& mysqlPort, Config* sysConfig);
extern bool writeConfig(Config* sysConfig);

View File

@ -115,11 +115,44 @@ int main(int argc, char *argv[])
oldbuf = line;
string::size_type pos = oldbuf.find(includeArg,0);
if ( pos != string::npos ) {
//check if this is commented out
//found in my.cnf.rpmsave, check if this is commented out
if ( line[0] != '#' )
{
// no, find in my.cnf and replace
// no, check in my.cnf and replace if exist or add if it doesn't
ifstream mycnffile (mycnfFile.c_str());
vector <string> lines;
char line1[200];
string newbuf;
bool updated = false;
while (mycnffile.getline(line1, 200))
{
newbuf = line1;
string::size_type pos = newbuf.find(includeArg,0);
if ( pos != string::npos ) {
newbuf = oldbuf;
cout << "Updated argument: " << includeArg << endl;
updated = true;
}
//output to temp file
lines.push_back(newbuf);
}
//write out a new my.cnf
mycnffile.close();
unlink (mycnfFile.c_str());
ofstream newFile (mycnfFile.c_str());
//create new file
int fd = open(mycnfFile.c_str(), O_RDWR|O_CREAT, 0666);
copy(lines.begin(), lines.end(), ostream_iterator<string>(newFile, "\n"));
newFile.close();
close(fd);
if (!updated)
{ //not found, so add
ifstream mycnffile (mycnfFile.c_str());
vector <string> lines;
char line1[200];
@ -127,10 +160,12 @@ int main(int argc, char *argv[])
while (mycnffile.getline(line1, 200))
{
newbuf = line1;
string::size_type pos = newbuf.find(includeArg,0);
if ( pos != string::npos )
string::size_type pos = newbuf.find("[mysqld]",0);
if ( pos != string::npos ) {
lines.push_back(newbuf);
newbuf = oldbuf;
cout << "Added argument: " << includeArg << endl;
}
//output to temp file
lines.push_back(newbuf);
}
@ -148,12 +183,16 @@ int main(int argc, char *argv[])
close(fd);
}
}
break;
}
}
}
string cmd = "chown mysql:mysql " + mycnfFile;
system(cmd.c_str());
exit (0);
}
// vim:ts=4 sw=4:

View File

@ -431,6 +431,17 @@ int main(int argc, char *argv[])
exit(1);
}
// run my.cnf upgrade script
if ( reuseConfig == "y" )
{
cmd = installDir + "/bin/mycnfUpgrade > /tmp/mycnfUpgrade.log 2>&1";
int rtnCode = system(cmd.c_str());
if (WEXITSTATUS(rtnCode) != 0)
cout << "Error: Problem upgrade my.cnf, check /tmp/mycnfUpgrade.log" << endl;
else
cout << cout << "NOTE: my.cnf file was upgraded based on my.cnf.rpmsave" << endl;
}
//check mysql port changes
string MySQLPort;
try {
@ -2975,18 +2986,6 @@ int main(int argc, char *argv[])
cout << endl;
}
}
else
{
// run my.cnf upgrade script
if ( reuseConfig == "y" && MySQLRep == "y" &&
IserverTypeInstall == oam::INSTALL_COMBINE_DM_UM_PM )
{
cmd = installDir + "/bin/mycnfUpgrade > /tmp/mycnfUpgrade.log 2>&1";
int rtnCode = system(cmd.c_str());
if (WEXITSTATUS(rtnCode) != 0)
cout << "Error: Problem upgrade my.cnf, check /tmp/mycnfUpgrade.log" << endl;
}
}
cout << endl;
cout << "Next step is to enter the password to access the other Servers." << endl;
@ -3040,15 +3039,6 @@ int main(int argc, char *argv[])
{
cout << endl << "===== Running the InfiniDB MySQL setup scripts =====" << endl << endl;
// run my.cnf upgrade script
if ( reuseConfig == "y" && MySQLRep == "y" )
{
cmd = installDir + "/bin/mycnfUpgrade > /tmp/mycnfUpgrade.log 2>&1";
int rtnCode = system(cmd.c_str());
if (WEXITSTATUS(rtnCode) != 0)
cout << "Error: Problem upgrade my.cnf, check /tmp/mycnfUpgrade.log" << endl;
}
// call the mysql setup scripts
mysqlSetup();
sleep(5);
@ -3606,7 +3596,7 @@ int main(int argc, char *argv[])
cout.flush();
//send message to procmon's to run upgrade script
int status = sendReplicationRequest(IserverTypeInstall, password, mysqlPort);
int status = sendReplicationRequest(IserverTypeInstall, password, mysqlPort, pmwithum);
if ( status != 0 ) {
cout << endl << " InfiniDB Install Failed" << endl << endl;
@ -4835,28 +4825,21 @@ bool storageSetup(string cloud)
cout << endl;
while(true)
{
pcommand = callReadline("Do you want to enable Non-OAM-Parent-PM EBS failover support? [y,n] (" + AmazonPMFailover + ") > ");
pcommand = callReadline("Do you want to enable Instance failover support? [y,n] (" + AmazonPMFailover + ") > ");
if (pcommand)
{
if (strlen(pcommand) > 0)
{
AmazonPMFailover = pcommand;
if (strlen(pcommand) > 0) AmazonPMFailover = pcommand;
callFree(pcommand);
}
if ( AmazonPMFailover == "y" || AmazonPMFailover == "n" ) {
cout << endl;
break;
}
else
{
if ( noPrompting )
continue;
else
{
cout << "Invalid Entry, please enter 'y' for yes or 'n' for no" << endl;
if ( noPrompting )
exit(1);
continue;
}
}
callFree(pcommand);
}
}
try {
@ -4866,6 +4849,15 @@ bool storageSetup(string cloud)
{}
}
if( DBRootStorageType == "internal" && cloud == "amazon" )
{ //set AmazonPMFailover
try {
sysConfig->setConfig(InstallSection, "AmazonPMFailover", "n");
}
catch(...)
{}
}
if ( !writeConfig(sysConfig) ) {
cout << "ERROR: Failed trying to update InfiniDB System Configuration file" << endl;
return false;

View File

@ -82,6 +82,28 @@ void diskMonitor()
bool Externalflag = false;
string cloud = oam::UnassignedName;
try {
oam.getSystemConfig( "Cloud", cloud);
}
catch(...) {
cloud = oam::UnassignedName;
}
//get Gluster Config setting
string GlusterConfig = "n";
try {
oam.getSystemConfig( "GlusterConfig", GlusterConfig);
}
catch(...)
{
GlusterConfig = "n";
}
int diskSpaceCheck = 0;
while(true)
{
//check for external disk
DBrootList dbrootList;
if (moduleType == "pm") {
@ -118,28 +140,6 @@ void diskMonitor()
}
}
string cloud = oam::UnassignedName;
try {
oam.getSystemConfig( "Cloud", cloud);
}
catch(...) {
cloud = oam::UnassignedName;
}
//get Gluster Config setting
string GlusterConfig = "n";
try {
oam.getSystemConfig( "GlusterConfig", GlusterConfig);
}
catch(...)
{
GlusterConfig = "n";
}
int diskSpaceCheck = 0;
while(true)
{
SystemStatus systemstatus;
try {
oam.getSystemStatus(systemstatus);
@ -518,7 +518,7 @@ void diskMonitor()
args.add("dbroot monitoring: Lost access to ");
args.add(dbrootDir);
msg.format(args);
ml.logCriticalMessage(msg);
ml.logWarningMessage(msg);
oam.sendDeviceNotification(dbrootName, DBROOT_DOWN, moduleName);
(*p).downFlag = true;
@ -610,11 +610,12 @@ void diskMonitor()
//check disk space every 10 minutes
diskSpaceCheck++;
if ( diskSpaceCheck >= 20 )
if ( diskSpaceCheck >= 20 ) {
diskSpaceCheck = 0;
lfs.clear();
sdl.clear();
}
} // end of while loop
}

View File

@ -1026,6 +1026,7 @@ void BatchPrimitiveProcessor::execute()
sessionID,
&counterLock,
&busyLoaderCount,
sendThread,
&vssCache);
asyncLoaded[p] = true;
}
@ -2012,6 +2013,7 @@ void BatchPrimitiveProcessor::asyncLoadProjectColumns()
sessionID,
&counterLock,
&busyLoaderCount,
sendThread,
&vssCache);
asyncLoaded[i] = true;
}

View File

@ -796,6 +796,7 @@ struct AsynchLoader {
uint32_t sesID,
boost::mutex *m,
uint32_t *loaderCount,
boost::shared_ptr<BPPSendThread> st, // sendThread for abort upon exception.
VSSCache *vCache) :
lbid(l),
ver(v),
@ -807,6 +808,7 @@ struct AsynchLoader {
readCount(rCount),
busyLoaders(loaderCount),
mutex(m),
sendThread(st),
vssCache(vCache)
{ }
@ -818,9 +820,10 @@ struct AsynchLoader {
//cout << "asynch started " << pthread_self() << " l: " << lbid << endl;
try {
loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, sessionID, true, vssCache);
loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, true, vssCache);
}
catch (std::exception& ex) {
sendThread->abort();
cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl;
idbassert(asyncCounter > 0);
(void)atomicops::atomicDec(&asyncCounter);
@ -834,6 +837,7 @@ struct AsynchLoader {
return;
}
catch (...) {
sendThread->abort();
cerr << "AsynchLoader caught unknown exception: " << endl;
//FIXME Use a locked processor primitive?
idbassert(asyncCounter > 0);
@ -869,6 +873,7 @@ private:
uint32_t *readCount;
uint32_t *busyLoaders;
boost::mutex *mutex;
boost::shared_ptr<BPPSendThread> sendThread;
VSSCache *vssCache;
};
@ -882,6 +887,7 @@ void loadBlockAsync(uint64_t lbid,
uint32_t sessionID,
boost::mutex *m,
uint32_t *busyLoaders,
boost::shared_ptr<BPPSendThread> sendThread, // sendThread for abort upon exception.
VSSCache *vssCache)
{
blockCacheClient bc(*BRPp[cacheNum(lbid)]);
@ -913,7 +919,7 @@ void loadBlockAsync(uint64_t lbid,
mutex::scoped_lock sl(*m);
try {
boost::thread thd(AsynchLoader(lbid, c, txn, compType, cCount, rCount,
LBIDTrace, sessionID, m, busyLoaders, vssCache));
LBIDTrace, sessionID, m, busyLoaders, sendThread, vssCache));
(*busyLoaders)++;
}
catch (boost::thread_resource_error &e) {

View File

@ -93,14 +93,14 @@ namespace primitiveprocessor
typedef std::map<uint32_t, SBPPV> BPPMap;
extern BPPMap bppMap;
void prefetchBlocks(uint64_t lbid, uint32_t* rCount);
void prefetchBlocks(uint64_t lbid, const int compType, uint32_t* rCount);
void prefetchExtent(uint64_t lbid, uint32_t ver, uint32_t txn, uint32_t* rCount);
void loadBlock(uint64_t lbid, BRM::QueryContext q, uint32_t txn, int compType, void* bufferPtr,
bool* pWasBlockInCache, uint32_t* rCount=NULL, bool LBIDTrace = false,
uint32_t sessionID = 0, bool doPrefetch=true, VSSCache *vssCache = NULL);
void loadBlockAsync(uint64_t lbid, const BRM::QueryContext &q, uint32_t txn, int CompType,
uint32_t *cCount, uint32_t *rCount, bool LBIDTrace, uint32_t sessionID,
boost::mutex *m, uint32_t *busyLoaders, VSSCache* vssCache=0);
boost::mutex *m, uint32_t *busyLoaders, boost::shared_ptr<BPPSendThread> sendThread, VSSCache* vssCache=0);
uint32_t loadBlocks(BRM::LBID_t *lbids, BRM::QueryContext q, BRM::VER_t txn, int compType,
uint8_t **bufferPtrs, uint32_t *rCount, bool LBIDTrace, uint32_t sessionID,
uint32_t blockCount, bool *wasVersioned, bool doPrefetch = true, VSSCache *vssCache = NULL);

View File

@ -1216,6 +1216,13 @@ void pingDeviceThread()
if (LANOUTAGEACTIVE)
break;
try {
oam.getSystemConfig("MySQLRep", MySQLRep);
}
catch(...) {
MySQLRep = "n";
}
if (moduleInfoList[moduleName] >= ModuleHeartbeatCount ||
opState == oam::DOWN || opState == oam::AUTO_DISABLED)
{
@ -1503,7 +1510,13 @@ void pingDeviceThread()
processManager.restartProcessType("ExeMgr", moduleName);
}
string moduleType = moduleName.substr(0,MAX_MODULE_TYPE_SIZE);
if ( MySQLRep == "y" ) {
if ( moduleType == "um" ||
( moduleType == "pm" && config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) ||
( moduleType == "pm" && PMwithUM == "y") ) {
//setup MySQL Replication for started modules
log.writeLog(__LINE__, "Setup MySQL Replication for module recovering from outage on " + moduleName, LOG_TYPE_DEBUG);
@ -1513,6 +1526,7 @@ void pingDeviceThread()
devicenetworklist.push_back(devicenetworkconfig);
processManager.setMySQLReplication(devicenetworklist);
}
}
else
{
if( moduleName.find("pm") == 0 ) {
@ -1531,6 +1545,8 @@ void pingDeviceThread()
//set query system state ready
processManager.setQuerySystemState(true);
processManager.setSystemState(oam::ACTIVE);
//clear count
moduleInfoList[moduleName] = 0;
}
@ -1674,7 +1690,6 @@ void pingDeviceThread()
// state = stopped, then try starting, if fail, remove/addmodule to launch new instance
// state = terminate or nothing, remove/addmodule to launch new instance
if ( amazon ) {
if ( moduleName.find("um") == 0 )
{
// resume the dbrm
@ -1932,6 +1947,9 @@ void pingDeviceThread()
// resume the dbrm
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
//set query system state ready
processManager.setQuerySystemState(true);
}
}
else
@ -2087,8 +2105,8 @@ void pingDeviceThread()
if ( parentOAMModule == config.moduleName() ||
parentOAMModule == "FAILED" ) {
//send sighup to these guys incase they marked any PrimProcs offline
processManager.reinitProcessType("ExeMgr");
//srestart to these guys incase they marked any PrimProcs offline
processManager.restartProcessType("ExeMgr");
processManager.reinitProcessType("DDLProc");
processManager.reinitProcessType("DMLProc");
}

View File

@ -53,7 +53,7 @@ extern string USER;
extern bool HDFS;
extern string localHostName;
extern string PMwithUM;
extern string MySQLRep;
extern string AmazonPMFailover;
typedef map<string, int> moduleList;
extern moduleList moduleInfoList;
@ -617,7 +617,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
oam.dbrmctl("resume");
log.writeLog(__LINE__, "'dbrmctl resume' done", LOG_TYPE_DEBUG);
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
//setup MySQL Replication for started modules
log.writeLog(__LINE__, "Setup MySQL Replication for module being started", LOG_TYPE_DEBUG);
@ -738,7 +738,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
//distribute config file
processManager.distributeConfigFile("system");
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
}
}
else
@ -1702,7 +1702,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
processManager.distributeConfigFile("system");
processManager.reinitProcessType("WriteEngineServer");
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
processManager.reinitProcessType("DDLProc");
processManager.reinitProcessType("DMLProc");
}
@ -1752,7 +1752,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
processManager.distributeConfigFile("system");
processManager.reinitProcessType("WriteEngineServer");
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
processManager.reinitProcessType("DDLProc");
processManager.reinitProcessType("DMLProc");
}
@ -2193,7 +2193,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
processManager.distributeConfigFile("system");
processManager.reinitProcessType("WriteEngineServer");
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
processManager.reinitProcessType("DDLProc");
processManager.reinitProcessType("DMLProc");
}
@ -2226,7 +2226,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
processManager.distributeConfigFile("system");
processManager.reinitProcessType("WriteEngineServer");
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
processManager.reinitProcessType("DDLProc");
processManager.reinitProcessType("DMLProc");
}
@ -2557,6 +2557,28 @@ void processMSG(messageqcpp::IOSocket* cfIos)
break;
}
case DISABLEMYSQLREP:
{
log.writeLog(__LINE__, "MSG RECEIVED: Disable MySQL Replication");
// target = root password
oam::DeviceNetworkList devicenetworklist;
status = processManager.setMySQLReplication(devicenetworklist, oam::UnassignedName, false, true, target, false);
log.writeLog(__LINE__, "Disable MySQL Replication status: " + oam.itoa(status) );
ackMsg << (ByteStream::byte) oam::ACK;
ackMsg << actionType;
ackMsg << target;
ackMsg << (ByteStream::byte) status;
try {
fIos.write(ackMsg);
}
catch(...) {}
break;
}
case GLUSTERASSIGN:
{
string dbroot;
@ -2742,7 +2764,7 @@ void processMSG(messageqcpp::IOSocket* cfIos)
processManager.distributeConfigFile("system");
processManager.reinitProcessType("WriteEngineServer");
processManager.reinitProcessType("ExeMgr");
processManager.restartProcessType("ExeMgr");
processManager.reinitProcessType("DDLProc");
processManager.reinitProcessType("DMLProc");
}
@ -3302,7 +3324,7 @@ int ProcessManager::disableModule(string target, bool manualFlag)
/******************************************************************************************
* @brief recycleProcess
*
* purpose: recyle process, general;ly after some disable module is run
* purpose: recyle process, generally after some disable module is run
*
******************************************************************************************/
void ProcessManager::recycleProcess(string module)
@ -3327,20 +3349,23 @@ void ProcessManager::recycleProcess(string module)
restartProcessType("mysql");
}
else
reinitProcessType("ExeMgr");
restartProcessType("ExeMgr");
if ( PrimaryUMModuleName == module )
{
restartProcessType("DDLProc", "none", false);
restartProcessType("DDLProc", module);
// restartProcessType("DDLProc", module, false);
sleep(1);
restartProcessType("DMLProc", "none", false);
restartProcessType("DMLProc", module);
// restartProcessType("DMLProc", module, false);
}
if( moduleType == "pm" && PrimaryUMModuleName != module)
{
reinitProcessType("DDLProc");
sleep(1);
restartProcessType("DMLProc", "none", false);
restartProcessType("DMLProc", module);
// restartProcessType("DMLProc", module, false);
}
return;
@ -8809,10 +8834,10 @@ int ProcessManager::OAMParentModuleChange()
{}
//do amazon failover
if (amazon)
if (amazon && AmazonPMFailover == "n")
{
log.writeLog(__LINE__, " ", LOG_TYPE_DEBUG);
log.writeLog(__LINE__, "*** OAMParentModule outage, recover the Instance ***", LOG_TYPE_DEBUG);
log.writeLog(__LINE__, "*** OAMParentModule outage, AmazonPMFailover not set, wating for instance to restart ***", LOG_TYPE_DEBUG);
string currentIPAddr = oam.getEC2InstanceIpAddress(downOAMParentHostname);
if (currentIPAddr == "stopped")
@ -9068,6 +9093,12 @@ int ProcessManager::OAMParentModuleChange()
if ( status != 0 )
log.writeLog(__LINE__, "startModuleThread: pthread_create failed, return status = " + oam.itoa(status), LOG_TYPE_ERROR);
if (status == 0)
{
pthread_join(startmodulethread, NULL);
status = startsystemthreadStatus;
}
//restart/reinit processes to force their release of the controller node port
if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM) &&
( moduleNameList.size() <= 1 && config.moduleType() == "pm") )
@ -9135,7 +9166,7 @@ int ProcessManager::OAMParentModuleChange()
if (opState != oam::AUTO_DISABLED) {
if ((*pt).DeviceName != downOAMParentName ) {
if ((*pt).DeviceName != config.moduleName() ) {
processManager.setModuleState((*pt).DeviceName, oam::AUTO_INIT);
// processManager.setModuleState((*pt).DeviceName, oam::AUTO_INIT);
pthread_t startmodulethread;
string moduleName = (*pt).DeviceName;
int status = pthread_create (&startmodulethread, NULL, (void*(*)(void*)) &startModuleThread, &moduleName);
@ -9152,25 +9183,6 @@ int ProcessManager::OAMParentModuleChange()
}
}
//wait until local module is active before continuing
while(true)
{
int opState = oam::ACTIVE;
bool degraded;
try {
oam.getModuleStatus(config.moduleName(), opState, degraded);
}
catch(...)
{
// log.writeLog(__LINE__, "EXCEPTION ERROR on getModuleStatus on module " + config.moduleName() + ": Caught unknown exception!", LOG_TYPE_ERROR);
}
if (opState == oam::ACTIVE)
break;
sleep(1);
}
//restart DDLProc/DMLProc to perform any rollbacks, if needed
//dont rollback in amazon, wait until down pm recovers
if ( ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM )
@ -9194,13 +9206,8 @@ int ProcessManager::OAMParentModuleChange()
// clear alarm
aManager.sendAlarmReport(config.moduleName().c_str(), MODULE_SWITCH_ACTIVE, CLEAR);
if (amazon) {
//Set the down module instance state so it will be auto restarted
processManager.setModuleState(downOAMParentName, oam::AUTO_OFFLINE);
// sleep to give time for local pm to fully go active
sleep(30);
}
//set status to ACTIVE while failover is in progress
processManager.setSystemState(oam::ACTIVE);
log.writeLog(__LINE__, "*** Exiting OAMParentModuleChange function ***", LOG_TYPE_DEBUG);
@ -9867,6 +9874,14 @@ int ProcessManager::setMySQLReplication(oam::DeviceNetworkList devicenetworklist
{
Oam oam;
string MySQLRep;
try {
oam.getSystemConfig("MySQLRep", MySQLRep);
}
catch(...) {
MySQLRep = "n";
}
if ( MySQLRep == "n" && enable )
return oam::API_SUCCESS;

View File

@ -214,7 +214,8 @@ int main(int argc, char **argv)
//check if currently configured as Parent OAM Module on startup
if ( gOAMParentModuleFlag ) {
if ( config.OAMStandbyName() != oam::UnassignedName ) {
if ( ( config.OAMStandbyName() != oam::UnassignedName ) &&
DBRootStorageType != "internal" ) {
//try for 20 minutes checking if the standby node is up
string parentOAMModule;
log.writeLog(__LINE__, "starting has parent, double check. checking with old Standby Module", LOG_TYPE_DEBUG);
@ -600,10 +601,15 @@ int main(int argc, char **argv)
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
//mysql status monitor thread
if ( ( config.ServerInstallType() == oam::INSTALL_COMBINE_DM_UM_PM ) ||
(PMwithUM == "y") )
{
pthread_t mysqlThread;
ret = pthread_create (&mysqlThread, NULL, (void*(*)(void*)) &mysqlMonitorThread, NULL);
if ( ret != 0 )
log.writeLog(__LINE__, "pthread_create failed, return code = " + oam.itoa(ret), LOG_TYPE_ERROR);
}
//update syslog file priviledges
aMonitor.changeModLog();
@ -1086,16 +1092,6 @@ static void chldHandleThread(MonitorConfig config)
processRestartPeriod = 120;
}
try
{
oam.getProcessStatus(systemprocessstatus);
}
catch(...)
{
sleep(5);
continue;
}
listPtr = aPtr->begin();
for (; listPtr != aPtr->end(); ++listPtr)
{
@ -1104,19 +1100,16 @@ static void chldHandleThread(MonitorConfig config)
// Update internal process state when in INIT and System is ACTIVE/FAILED
// Updated System process state when AOS and different from internal
int outOfSyncCount = 0;
if ( delayCount == 10 ) {
if ( delayCount == 2 ) {
while(true)
{
int state = (*listPtr).state; //set as default
int PID = (*listPtr).processID; //set as default
for( unsigned int j = 0 ; j < systemprocessstatus.processstatus.size(); j++)
{
if ( systemprocessstatus.processstatus[j].ProcessName == (*listPtr).ProcessName
&& systemprocessstatus.processstatus[j].Module.find(config.moduleName(),0) != string::npos) {
state = systemprocessstatus.processstatus[j].ProcessOpState;
PID = systemprocessstatus.processstatus[j].ProcessID;
try {
ProcessStatus procstat;
oam.getProcessStatus((*listPtr).ProcessName, config.moduleName(), procstat);
state = procstat.ProcessOpState;
PID = procstat.ProcessID;
if (state == oam::BUSY_INIT ) {
// updated local state ot BUSY_INIT
@ -1133,6 +1126,8 @@ static void chldHandleThread(MonitorConfig config)
if ( (cal - (*listPtr).currentTime) > 20 ) {
// issue ALARM and update status to FAILED
aMonitor.sendAlarm((*listPtr).ProcessName, PROCESS_INIT_FAILURE, SET);
// (*listPtr).state = oam::FAILED;
// aMonitor.updateProcessInfo((*listPtr).ProcessName, oam::FAILED, (*listPtr).processID);
//force restart the un-initted process
log.writeLog(__LINE__, (*listPtr).ProcessName + "/" + oam.itoa((*listPtr).processID) + " failed to init in 20 seconds, force killing it so it can restart", LOG_TYPE_CRITICAL);
@ -1144,6 +1139,17 @@ static void chldHandleThread(MonitorConfig config)
break;
}
}
catch (exception& ex)
{
string error = ex.what();
// log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: " + error, LOG_TYPE_ERROR);
break;
}
catch(...)
{
// log.writeLog(__LINE__, "EXCEPTION ERROR on getProcessStatus: Caught unknown exception!", LOG_TYPE_ERROR);
break;
}
if (state != (*listPtr).state || PID != (*listPtr).processID) {
if ( state == oam::STANDBY && (*listPtr).state == oam::ACTIVE )
@ -1180,11 +1186,11 @@ static void chldHandleThread(MonitorConfig config)
catch (exception& ex)
{
string error = ex.what();
log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: " + error, LOG_TYPE_ERROR);
// log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: " + error, LOG_TYPE_ERROR);
}
catch(...)
{
log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: Caught unknown exception!", LOG_TYPE_ERROR);
// log.writeLog(__LINE__, "EXCEPTION ERROR on setModuleStatus: Caught unknown exception!", LOG_TYPE_ERROR);
}
break;
@ -1211,7 +1217,6 @@ static void chldHandleThread(MonitorConfig config)
break;
}
}
}
//Handle died or out of sync process if in the right state
if ( (*listPtr).state == oam::MAN_OFFLINE )
@ -2326,14 +2331,6 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos)
fShmProcessStatus[shmIndex].ProcessID = PID;
memcpy(fShmProcessStatus[shmIndex].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE);
//if DMLProc set to ACTIVE, set system state to ACTIVE
if ( processName == "DMLProc" && state == oam::ACTIVE )
{
fShmSystemStatus[0].OpState = state;
memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE);
log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG);
}
//if DMLProc set to BUSY_INIT, set system state to BUSY_INIT
if ( processName == "DMLProc" && state == oam::BUSY_INIT )
{
@ -2341,6 +2338,19 @@ void processStatusMSG(messageqcpp::IOSocket* cfIos)
memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE);
log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG);
}
//if DMLProc set to ACTIVE, set system state to ACTIVE if in an INIT state
if ( processName == "DMLProc" && state == oam::ACTIVE )
{
if ( fShmSystemStatus[0].OpState == oam::BUSY_INIT ||
fShmSystemStatus[0].OpState == oam::MAN_INIT ||
fShmSystemStatus[0].OpState == oam::AUTO_INIT )
{
fShmSystemStatus[0].OpState = state;
memcpy(fShmSystemStatus[0].StateChangeDate, oam.getCurrentTime().c_str(), DATESIZE);
log.writeLog(__LINE__, "statusControl: REQUEST RECEIVED: Set System State = " + oamState[state], LOG_TYPE_DEBUG);
}
}
}
break;

View File

@ -546,18 +546,6 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
//Check for SIMPLEX runtype processes
initType = checkSpecialProcessState( processconfig.ProcessName, processconfig.RunType, processconfig.ModuleType );
if ( actIndicator == oam::GRACEFUL_STANDBY) {
//this module running Parent OAM Standby
runStandby = true;
log.writeLog(__LINE__, "ProcMon Running Hot-Standby");
// delete any old active alarm log file
unlink ("/var/log/Calpont/activeAlarms");
}
//Check for SIMPLEX runtype processes
initType = checkSpecialProcessState( processconfig.ProcessName, processconfig.RunType, processconfig.ModuleType );
if ( initType == oam::COLD_STANDBY) {
//there is a mate active, skip
config.buildList(processconfig.ModuleType,
@ -1771,6 +1759,19 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
string masterLogFile = oam::UnassignedName;
string masterLogPos = oam::UnassignedName;
if ( (PMwithUM == "n") && (config.moduleType() == "pm") && ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM) )
{
ackMsg << (ByteStream::byte) ACK;
ackMsg << (ByteStream::byte) MASTERREP;
ackMsg << (ByteStream::byte) oam::API_FAILURE;
ackMsg << masterLogFile;
ackMsg << masterLogPos;
mq.write(ackMsg);
log.writeLog(__LINE__, "MASTERREP: Error PM invalid msg - ACK back to ProcMgr return status = " + oam.itoa((int) oam::API_FAILURE));
break;
}
//change local my.cnf file
int ret;
int retry;
@ -1831,6 +1832,17 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
string port;
msg >> port;
if ( (PMwithUM == "n") && (config.moduleType() == "pm") && ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM) )
{
ackMsg << (ByteStream::byte) ACK;
ackMsg << (ByteStream::byte) SLAVEREP;
ackMsg << (ByteStream::byte) oam::API_FAILURE;
mq.write(ackMsg);
log.writeLog(__LINE__, "SLAVEREP: Error PM invalid msg - ACK back to ProcMgr return status = " + oam.itoa((int) oam::API_FAILURE));
break;
}
//change local my.cnf file
int ret = changeMyCnf("slave");
@ -1867,6 +1879,16 @@ void ProcessMonitor::processMessage(messageqcpp::ByteStream msg, messageqcpp::IO
string module;
msg >> module;
if ( (PMwithUM == "n") && (config.moduleType() == "pm") && ( config.ServerInstallType() != oam::INSTALL_COMBINE_DM_UM_PM) )
{
ackMsg << (ByteStream::byte) ACK;
ackMsg << (ByteStream::byte) MASTERDIST;
ackMsg << (ByteStream::byte) oam::API_FAILURE;
mq.write(ackMsg);
log.writeLog(__LINE__, "MASTERDIST: runMasterRep - ACK back to ProcMgr return status = " + oam.itoa((int) oam::API_FAILURE));
}
if ( password == oam::UnassignedName )
password = "ssh";
@ -3796,8 +3818,6 @@ int ProcessMonitor::createDataDirs(std::string cloud)
if (UMStorageType == "external")
{
if(!amazonVolumeCheck()) {
//Set the alarm
sendAlarm(config.moduleName().c_str(), STARTUP_DIAGNOTICS_FAILURE, SET);
return API_FAILURE;
}
}
@ -3856,8 +3876,6 @@ int ProcessMonitor::createDataDirs(std::string cloud)
config.moduleID() == moduleID)
{
if(!amazonVolumeCheck(id)) {
//Set the alarm
sendAlarm(config.moduleName().c_str(), STARTUP_DIAGNOTICS_FAILURE, SET);
return API_FAILURE;
}
}
@ -4696,33 +4714,16 @@ int ProcessMonitor::changeMyCnf(std::string type)
buf = "server-id = 1";
}
/* pos = buf.find("# log-bin=mysql-bin",0);
pos = buf.find("# binlog_format=ROW",0);
if ( pos != string::npos ) {
buf = "log-bin=mysql-bin";
buf = "binlog_format=ROW";
}
*/
pos = buf.find("infinidb_local_query=1",0);
if ( pos != string::npos && pos == 0) {
buf = "# infinidb_local_query=1";
}
/* pos = buf.find("slave-skip-errors=all",0);
if ( pos != string::npos && pos == 0) {
buf = "# slave-skip-errors=all" + buf;
}
pos = buf.find("# relay-log=",0);
if ( pos != string::npos && pos == 0) {
buf = buf;
}
else
{
pos = buf.find("relay-log=",0);
if ( pos != string::npos && pos == 0) {
buf = "# " + buf;
}
}
*/
//output to temp file
lines.push_back(buf);
}
@ -4823,16 +4824,6 @@ int ProcessMonitor::changeMyCnf(std::string type)
buf = "server-id = " + slaveID;
}
/* pos = buf.find("# relay-log=",0);
if ( pos != string::npos ) {
buf = "relay-log=" + dbDir + "/relay-bin";
}
pos = buf.find("# slave-skip-errors=all",0);
if ( pos != string::npos ) {
buf = "slave-skip-errors=all";
}
*/
// set local query flag if on pm
if ( (PMwithUM == "y") && config.moduleType() == "pm" )
{
@ -4849,11 +4840,11 @@ int ProcessMonitor::changeMyCnf(std::string type)
}
}
/* pos = buf.find("log-bin=mysql-bin",0);
pos = buf.find("binlog_format=ROW",0);
if ( pos != string::npos && pos == 0 ) {
buf = "# log-bin=mysql-bin";
buf = "# binlog_format=ROW";
}
*/
//output to temp file
lines.push_back(buf);
}
@ -4893,6 +4884,11 @@ int ProcessMonitor::changeMyCnf(std::string type)
buf = "# log-bin=mysql-bin";
}
pos = buf.find("binlog_format=ROW",0);
if ( pos != string::npos && pos == 0 ) {
buf = "# binlog_format=ROW";
}
pos = buf.find("infinidb_local_query=1",0);
if ( pos != string::npos && pos == 0) {
buf = "# infinidb_local_query=1";
@ -5585,7 +5581,7 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID)
}
if ( status != "available" ) {
log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_CRITICAL);
log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_WARNING);
return false;
}
else
@ -5610,7 +5606,7 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID)
}
else
{
log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume failed to attached: " + volumeName, LOG_TYPE_CRITICAL);
log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume failed to attached: " + volumeName, LOG_TYPE_WARNING);
return false;
}
}
@ -5642,7 +5638,7 @@ bool ProcessMonitor::amazonVolumeCheck(int dbrootID)
}
if ( status != "available" ) {
log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_CRITICAL);
log.writeLog(__LINE__, "amazonVolumeCheck function failed, volume not attached and not available: " + volumeName, LOG_TYPE_WARNING);
return false;
}
else

View File

@ -1805,6 +1805,23 @@ int main(int argc, char *argv[])
catch(...)
{}
try {
string AmazonPMFailover = sysConfigOld->getConfig("Installation", "AmazonPMFailover");
if ( !AmazonPMFailover.empty() )
{
try {
sysConfigNew->setConfig("Installation", "AmazonPMFailover", AmazonPMFailover);
}
catch(...)
{
cout << "ERROR: Problem setting AmazonPMFailover in the Calpont System Configuration file" << endl;
exit(-1);
}
}
}
catch(...)
{}
//Write out Updated System Configuration File
sysConfigNew->write();

View File

@ -442,17 +442,24 @@ int RedistributeControlThread::executeRedistributePlan()
bool isActive = false;
while (!isActive)
{
bool noExcept = true;
SystemStatus systemstatus;
systemstatus.SystemOpState = oam::ACTIVE;
try
{
fControl->fOam->getSystemStatus(systemstatus);
}
catch (const std::exception& ex)
{
fErrorMsg += ex.what();
noExcept = false;
}
catch (...)
{}
{
noExcept = false;
}
if ((isActive = (systemstatus.SystemOpState == oam::ACTIVE) == false))
sleep(1);
if (noExcept && ((isActive = (systemstatus.SystemOpState == oam::ACTIVE)) == false))
sleep(1);;
}
#endif