/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /* * $Id: we_redistributecontrol.cpp 4450 2013-01-21 14:13:24Z rdempsey $ */ #include #include #include #include #include #include #include #include #include //#include using namespace std; #include "boost/scoped_ptr.hpp" #include "boost/scoped_array.hpp" #include "boost/thread.hpp" #include "boost/thread/mutex.hpp" #include "boost/filesystem/path.hpp" #include "boost/filesystem/operations.hpp" using namespace boost; #include "installdir.h" #include "configcpp.h" using namespace config; #include "liboamcpp.h" using namespace oam; #include "messagequeue.h" #include "bytestream.h" using namespace messageqcpp; #include "logger.h" #include "calpontsystemcatalog.h" using namespace execplan; #include "we_messages.h" #include "we_redistributedef.h" #include "we_redistributecontrolthread.h" #include "we_redistributeworkerthread.h" #include "we_redistributecontrol.h" namespace redistribute { RedistributeControl* RedistributeControl::fInstance = NULL; boost::mutex instanceMutex; const string RedistributeDir("/data1/systemFiles/redistribute"); const string InfoFileName("/redistribute.info"); const string PlanFileName("/redistribute.plan"); RedistributeControl* RedistributeControl::instance() { // The constructor is protected by instanceMutex lock. boost::mutex::scoped_lock lock(instanceMutex); if (fInstance == NULL) fInstance = new RedistributeControl(); return fInstance; } RedistributeControl::RedistributeControl() : fInfoFilePtr(NULL), fPlanFilePtr(NULL) { // default path /usr/local/mariadb/columnstore/data1/systemFiles/redistribute fRedistributeDir = "/var/lib/columnstore/" + RedistributeDir; fInfoFilePath = fRedistributeDir + InfoFileName; fPlanFilePath = fRedistributeDir + PlanFileName; fOam.reset(new oam::Oam); fDbrm.reset(new BRM::DBRM); fSysLogger.reset(new logging::Logger(32)); // 32 - writeengineserver in SubsystemIDs.txt logging::MsgMap msgMap; msgMap[logging::M0002] = logging::Message(logging::M0002); fSysLogger->msgMap(msgMap); // struct stat st; // if (stat(fRedistributeDir.c_str(), &st) != 0) // boost::filesystem::path dirPath(fRedistributeDir); if (boost::filesystem::exists(fRedistributeDir)) { // try to open info file for update if dir exists RedistributeInfo info; fInfoFilePtr = fopen(fInfoFilePath.c_str(), "r+"); if (fInfoFilePtr != NULL && 1 == fread(&info, sizeof(info), 1, fInfoFilePtr)) { fRedistributeInfo = info; // if there was an active session, mark it as failed until support resume. if (fRedistributeInfo.state == RED_STATE_ACTIVE) updateState(RED_STATE_FAILED); } } } RedistributeControl::~RedistributeControl() { fOam.reset(); fDbrm.reset(); delete fInstance; fInstance = NULL; } int RedistributeControl::handleUIMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) { boost::mutex::scoped_lock sessionLock(fSessionMutex); uint32_t status = RED_STATE_UNDEF; const RedistributeMsgHeader* h = (const RedistributeMsgHeader*)bs.buf(); try { switch (h->messageId) { case RED_CNTL_START: status = handleStartMsg(bs, so); break; case RED_CNTL_STOP: status = handleStopMsg(bs, so); break; case RED_CNTL_CLEAR: status = handleClearMsg(bs, so); break; case RED_CNTL_STATUS: default: status = handleStatusMsg(bs, so); break; } } catch (const std::exception& ex) { if (fUIResponse.empty()) fUIResponse = ex.what(); } catch (...) { if (fUIResponse.empty()) fUIResponse = "Failed to process the redistribute command."; } // log the response logMessage(fUIResponse); // bs restart() in handlers bs.restart(); bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now. bs << status; bs << fUIResponse; so.write(bs); return status; } int RedistributeControl::handleStartMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) { ostringstream oss; uint32_t status = getCurrentState(); if (status != RED_STATE_IDLE) { if (status == RED_STATE_ACTIVE) oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active " "session before starting a new one."; else oss << "Redistribute is not in IDLE state. Command is ignored. Please check the status of last " "session, then reset the state to IDLE using action CLEAR."; fUIResponse = oss.str(); return status; } // must be IDLE state try { // skip the header part, may need save it. bs.advance(sizeof(RedistributeMsgHeader)); getStartOptions(bs); RedistributeControlThread::setStopAction(false); updateState(RED_STATE_ACTIVE); } catch (const std::exception& ex) { if (fErrorMsg.empty()) fErrorMsg = ex.what(); fRedistributeInfo.state = RED_STATE_FAILED; } catch (...) { fRedistributeInfo.state = RED_STATE_FAILED; } status = fRedistributeInfo.state; if (status == RED_STATE_ACTIVE) { oss << "Redistribute is started."; fControlThread.reset(new boost::thread(RedistributeControlThread(RED_CNTL_START))); // Let go the new thread unless we want to call interrupt on this thread in future. // Not going to join() because the redistribution could take very long. fControlThread->detach(); fControlThread.reset(); } else { updateState(RED_STATE_FAILED); oss << "Starting redistribute failed."; if (!fErrorMsg.empty()) oss << " " << fErrorMsg; } fUIResponse = oss.str(); return status; } int RedistributeControl::handleStatusMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so) { ostringstream oss; uint32_t status = getCurrentState(); RedistributeInfo info = fRedistributeInfo; switch (status) { case RED_STATE_IDLE: oss << "Redistribute is in IDLE state."; break; case RED_STATE_ACTIVE: oss << "Redistribute is in progress: total " << info.planned; if (info.planned > 1) oss << " logical partitions are planned to move.\n"; else oss << " logical partition is planned to move.\n"; if (info.planned > 0) { if (info.endTime > 0) oss << "In " << (info.endTime - info.startTime) << " seconds, "; oss << info.success << " success, " << info.skipped << " skipped, " << info.failed << " failed, " << ((info.success + info.skipped + info.failed) * 100 / info.planned) << "%."; } break; case RED_STATE_FINISH: oss << "Redistribute is finished.\n" << info.success << " success, " << info.skipped << " skipped, " << info.failed << " failed.\n"; if (info.endTime > 0) oss << "Total time: " << (info.endTime - info.startTime) << " seconds.\n"; break; case RED_STATE_FAILED: oss << "Redistribute is failed.\n"; try { size_t l = 0; // message length size_t n = fread(&l, sizeof(int), 1, fInfoFilePtr); if (n == 1) { boost::scoped_array buf(new char[l + 1]); n = fread(buf.get(), 1, l, fInfoFilePtr); if (n == l) { buf[l] = '\0'; fErrorMsg += buf.get(); oss << buf.get(); } } } catch (const std::exception&) { } catch (...) { } break; case RED_STATE_STOPPED: oss << "Redistribute is stopped by user.\n"; if (info.planned > 0) { if (info.endTime > 0) oss << "In " << (info.endTime - info.startTime) << " seconds, "; oss << info.success << " success, " << info.skipped << " skipped, " << info.failed << " failed, " << ((info.success + info.skipped + info.failed) * 100 / info.planned) << "%."; } break; default: oss << "Failed to retrieve redistribute information, the file " << fInfoFilePath << " may be corrupted."; break; } fUIResponse = oss.str(); return status; } int RedistributeControl::handleStopMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so) { ostringstream oss; uint32_t status = getCurrentState(); if (status != RED_STATE_ACTIVE) { oss << "Redistribute is not running. Command is ignored."; } else { RedistributeControlThread::setStopAction(true); updateState(RED_STATE_STOPPED); status = RED_STATE_STOPPED; boost::thread rct((RedistributeControlThread(RED_CNTL_STOP))); rct.join(); oss << "Redistribute is stopped."; } fUIResponse = oss.str(); return status; } int RedistributeControl::handleClearMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so) { ostringstream oss; uint32_t status = getCurrentState(); if (status == RED_STATE_ACTIVE) { oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running " "session."; } else { updateState(RED_STATE_IDLE); status = RED_STATE_IDLE; oss << "Cleared."; } fUIResponse = oss.str(); return status; } uint32_t RedistributeControl::getCurrentState() { uint32_t status = RED_STATE_UNDEF; ostringstream oss; boost::mutex::scoped_lock lock(fInfoFileMutex); if (!fInfoFilePtr) { status = RED_STATE_IDLE; } else { rewind(fInfoFilePtr); RedistributeInfo info; size_t n = fread(&info, sizeof(info), 1, fInfoFilePtr); if (n == 1) { fRedistributeInfo = info; status = info.state; } } return status; } bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs) { bool ret = true; uint32_t n = 0; uint32_t d = 0; try { bs >> fOptions; bs >> n; fSourceList.clear(); fSourceList.reserve(n); for (uint32_t i = 0; i < n; i++) { bs >> d; fSourceList.push_back(d); } bs >> n; fDestinationList.clear(); fDestinationList.reserve(n); for (uint32_t i = 0; i < n; i++) { bs >> d; fDestinationList.push_back(d); } if (fSourceList.size() == 0 || fDestinationList.size() == 0) throw runtime_error("Failed to get dbroot lists."); } catch (const std::exception& ex) { ret = false; fErrorMsg = ex.what(); } catch (...) { ret = false; fErrorMsg = "Failed to get dbroot lists."; } return ret; } void RedistributeControl::updateState(uint32_t s) { boost::mutex::scoped_lock lock(fInfoFileMutex); // allowed state change: // idle -> active // active -> finish // active -> stopped // active -> failed // finish -> idle // stopped -> idle // failed -> idle if (s == RED_STATE_IDLE) { if (fRedistributeInfo.state == RED_STATE_ACTIVE) return; // close the files if they are already opened if (fInfoFilePtr != NULL) { fclose(fInfoFilePtr); fInfoFilePtr = NULL; } if (fPlanFilePtr != NULL) { fclose(fPlanFilePtr); fPlanFilePtr = NULL; } // move old files to archive // zip or compress if the .plan file gets large time_t t = fRedistributeInfo.startTime; if (t == 0) t = time(NULL); ostringstream oss; struct tm m; localtime_r(&t, &m); oss << setfill('0') << setw(4) << (m.tm_year + 1900) << setw(2) << (m.tm_mon + 1) << setw(2) << (m.tm_mday) << setw(2) << (m.tm_hour) << setw(2) << (m.tm_min) << setw(2) << (m.tm_sec); try { if (boost::filesystem::exists(fInfoFilePath) && boost::filesystem::exists(fPlanFilePath)) { bool mergeOk = false; FILE* infoPtr = fopen(fInfoFilePath.c_str(), "r+b"); FILE* entryPtr = fopen(fPlanFilePath.c_str(), "rb"); int rc = 1; if (infoPtr != NULL && entryPtr != NULL) { rc = fseek(infoPtr, sizeof(RedistributeInfo), SEEK_SET); RedistributePlanEntry entry; while (rc == 0) { size_t n = fread(&entry, sizeof(entry), 1, entryPtr); if (n != 1) break; n = fwrite(&entry, sizeof(entry), 1, infoPtr); fflush(infoPtr); if (n != 1) rc = -1; } } if (rc == 0 && feof(entryPtr)) mergeOk = true; if (infoPtr != NULL) fclose(infoPtr); if (entryPtr != NULL) fclose(entryPtr); if (mergeOk) boost::filesystem::remove(fPlanFilePath); } if (boost::filesystem::exists(fInfoFilePath)) { string newInfoPath = fRedistributeDir + "/archive" + InfoFileName + "." + oss.str(); boost::filesystem::rename(fInfoFilePath, newInfoPath); } if (boost::filesystem::exists(fPlanFilePath)) { string newPlanPath = fRedistributeDir + "/archive" + PlanFileName + "." + oss.str(); boost::filesystem::rename(fPlanFilePath, newPlanPath); } } catch (const std::exception&) { } catch (...) { } fRedistributeInfo = RedistributeInfo(); return; } // safety check if (s != RED_STATE_ACTIVE && fRedistributeInfo.state != RED_STATE_ACTIVE) return; // in IDLE state there is no redistribute.info file if (s == RED_STATE_ACTIVE) { // boost::filesystem::path dirPath(fRedistributeDir); // if (boost::filesystem::exists(fRedistributeDir) && //!boost::filesystem::is_directory(fRedistributeDir)) boost::filesystem::remove(fRedistributeDir); if (!boost::filesystem::exists(fRedistributeDir)) { errno = 0; boost::filesystem::create_directory(fRedistributeDir); if (!boost::filesystem::exists(fRedistributeDir)) { int e = errno; ostringstream oss; oss << "Failed to create redistribute directory: "; oss << strerror(e) << " (" << e << ")"; throw runtime_error(oss.str()); } errno = 0; boost::filesystem::path archivePath(fRedistributeDir + "/archive"); boost::filesystem::create_directory(archivePath); if (!boost::filesystem::exists(archivePath)) { int e = errno; ostringstream oss; oss << "Failed to create redistribute archive directory: "; oss << strerror(e) << " (" << e << ")"; throw runtime_error(oss.str()); } } fRedistributeInfo.startTime = time(NULL); } // open the info file to write errno = 0; if (fInfoFilePtr == NULL) fInfoFilePtr = fopen(fInfoFilePath.c_str(), "w+"); if (fInfoFilePtr == NULL) { int e = errno; ostringstream oss; oss << "Failed to open " << fInfoFilePath << ": " << strerror(e) << " (" << e << ")"; throw runtime_error(oss.str()); } fRedistributeInfo.state = s; if (s == RED_STATE_FINISH) fRedistributeInfo.endTime = time(NULL); rewind(fInfoFilePtr); size_t n = fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr); if (n != 1) { fclose(fInfoFilePtr); fInfoFilePtr = NULL; int e = errno; ostringstream oss; oss << "Failed to write into " << fInfoFilePath << ": " << strerror(e) << " (" << e << ")"; throw runtime_error(oss.str()); } fflush(fInfoFilePtr); } void RedistributeControl::setEntryCount(uint32_t entryCount) { boost::mutex::scoped_lock lock(fInfoFileMutex); fRedistributeInfo.planned = entryCount; rewind(fInfoFilePtr); fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr); fflush(fInfoFilePtr); } void RedistributeControl::updateProgressInfo(uint32_t s, time_t t) { boost::mutex::scoped_lock lock(fInfoFileMutex); fRedistributeInfo.endTime = t; switch (s) { case RED_TRANS_SUCCESS: fRedistributeInfo.success++; break; case RED_TRANS_SKIPPED: fRedistributeInfo.skipped++; break; default: fRedistributeInfo.failed++; break; } rewind(fInfoFilePtr); fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr); fflush(fInfoFilePtr); } int RedistributeControl::handleJobMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so) { // boost::mutex::scoped_lock jobLock(fJobMutex); uint32_t status = RED_TRANS_SUCCESS; try { fWorkThread.reset(new boost::thread(RedistributeWorkerThread(bs, so))); fWorkThread->join(); } catch (const std::exception& ex) { status = RED_TRANS_FAILED; logMessage(ex.what()); } catch (...) { status = RED_TRANS_FAILED; } return status; } void RedistributeControl::logMessage(const string& msg) { logging::Message::Args args; args.add(string("RED:")); args.add(msg); fSysLogger->logMessage(logging::LOG_TYPE_INFO, logging::M0002, args, logging::LoggingID(32, 0, 0)); } } // namespace redistribute