From bd17d2071047e8d07f98003b7827b851c60d4164 Mon Sep 17 00:00:00 2001 From: David Hall Date: Tue, 8 Nov 2016 15:19:31 -0600 Subject: [PATCH] MCOL-307 Add Redistribute command to mcsadmin --- oam/etc/ConsoleCmds.xml | 7 +- oamapps/mcsadmin/mcsadmin.cpp | 228 +++++++++++++++++- .../redistribute/we_redistributecontrol.cpp | 6 +- writeengine/redistribute/we_redistributedef.h | 1 + 4 files changed, 237 insertions(+), 5 deletions(-) diff --git a/oam/etc/ConsoleCmds.xml b/oam/etc/ConsoleCmds.xml index 01b258751..37c7d14f9 100644 --- a/oam/etc/ConsoleCmds.xml +++ b/oam/etc/ConsoleCmds.xml @@ -26,7 +26,12 @@ None - AVAILABLE + redistribute + Redistribute data accross all dbroots to balance disk usage + START to begin a redistribution + START REMOVE [dbroots] to redistribute to all but the enumerated dbroots, leaving those empty + STOP to stop redistribution before completion + STATUS to to view statistics and progress AVAILABLE diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp index 50af96f0b..b2c3e27b4 100644 --- a/oamapps/mcsadmin/mcsadmin.cpp +++ b/oamapps/mcsadmin/mcsadmin.cpp @@ -29,15 +29,23 @@ extern int h_errno; #include "mcsadmin.h" #include "boost/filesystem/operations.hpp" #include "boost/filesystem/path.hpp" +#include "boost/scoped_ptr.hpp" #include "boost/tokenizer.hpp" #include "sessionmanager.h" #include "dbrm.h" +#include "messagequeue.h" +#include "we_messages.h" +#include "we_redistributedef.h" + + namespace fs = boost::filesystem; using namespace alarmmanager; using namespace std; using namespace oam; using namespace config; +using namespace messageqcpp; +using namespace redistribute; #include "installdir.h" @@ -60,6 +68,8 @@ string HOME = "/root"; bool repeatStop; static void checkPromptThread(); +bool connectToDBRoot1PM(Oam& oam, boost::scoped_ptr& msgQueueClient); +bool SendToWES(Oam& oam, ByteStream bs); bool waitForActive() { @@ -702,8 +712,151 @@ int processCommand(string* arguments) } break; - case 4: // Available + case 4: // redistribute { + set removeDbroots; // set of dbroots we want to leave empty + vector srcDbroots; // all of the currently configured dbroots + vector destDbroots; // srcDbroots - removeDbroots + set::iterator dbiter; + if (arguments[1] == "start") + { + // Get a list of all the configured dbroots in the xml file. + DBRootConfigList dbRootConfigList; + std::set configuredDBRoots; + oam.getSystemDbrootConfig(dbRootConfigList); + for (DBRootConfigList::iterator i = dbRootConfigList.begin(); i != dbRootConfigList.end(); ++i) + configuredDBRoots.insert(*i); + + // The user may choose to redistribute in such a way as to + // leave certain dbroots empty, presumably for later removal. + if (arguments[2] == "remove") + { + int dbroot; + bool error = false; + for (int i=3; arguments[i] != ""; ++i) + { + dbroot = atoi(arguments[i].c_str()); + if (dbroot == 1) + { + cout << "Not allowed to remove dbroot-1" << endl; + error = true; + } + else + { + if (configuredDBRoots.find(dbroot) == configuredDBRoots.end()) + { + ostringstream oss; + cout << "DBRoot-" << dbroot << " is not configured" << endl; + error = true; + } + else + { + removeDbroots.insert((uint32_t)dbroot); + } + } + } + if (error) + { + cout << "Errors encountered. Abort" << endl; + break; + } + } + + // Create a list of source dbroots -- where the data currently resides. + for (dbiter = configuredDBRoots.begin(); dbiter != configuredDBRoots.end(); ++dbiter) + srcDbroots.push_back((uint32_t)*dbiter); + + // Create a list of destination dbroots -- where the data is to go. + for (dbiter = configuredDBRoots.begin(); dbiter != configuredDBRoots.end(); ++dbiter) + { + // Only use the dbroots not in the remove list + if (removeDbroots.find((uint32_t)*dbiter) == removeDbroots.end()) + { + destDbroots.push_back((uint32_t)*dbiter); + } + } + // Print out what we're about to do + cout << "Redistribute START "; + if (removeDbroots.size() > 0) + { + cout << " Removing dbroots:"; + set::iterator iter; + for (iter = removeDbroots.begin(); iter != removeDbroots.end(); ++iter) + { + cout << " " << *iter; + } + } + cout << endl; + cout << "Source dbroots:"; + vector::iterator iter; + for (iter = srcDbroots.begin(); iter != srcDbroots.end(); ++iter) + cout << " " << *iter; + cout << endl << "Destination dbroots:"; + for (iter = destDbroots.begin(); iter != destDbroots.end(); ++iter) + cout << " " << *iter; + cout << endl << endl; + + // Connect to PM for dbroot1 + ByteStream bs; + // message WES ID, sequence #, action id + uint32_t sequence = time(0); + bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE; + + // Send the CLEAR message to WriteEngineServer (WES). Wipes out previous state. + RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_CLEAR); + bs.append((const ByteStream::byte*) &header, sizeof(header)); + SendToWES(oam, bs); + + // Send the START message + bs.restart(); + sequence = time(0); + bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE; + header.sequenceNum=sequence; + header.messageId = RED_CNTL_START; + bs.append((const ByteStream::byte*) &header, sizeof(header)); + uint32_t options = 0; + if (removeDbroots.size() > 0) + { + options |= RED_OPTN_REMOVE; + } + bs << options; + + // source db roots, + bs << (uint32_t) srcDbroots.size(); + for (uint64_t i = 0; i < srcDbroots.size(); ++i) + bs << (uint32_t) srcDbroots[i]; + + // destination db roots, + bs << (uint32_t) destDbroots.size(); + for (uint64_t i = 0; i < destDbroots.size(); ++i) + bs << (uint32_t) destDbroots[i]; + + SendToWES(oam, bs); + } + else if (arguments[1] == "stop") + { + ByteStream bs; + // message WES ID, sequence #, action id + uint32_t sequence = time(0); + bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE; + RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_STOP); + bs.append((const ByteStream::byte*) &header, sizeof(header)); + SendToWES(oam, bs); + } + else if (arguments[1] == "status") + { + ByteStream bs; + // message WES ID, sequence #, action id + uint32_t sequence = time(0); + bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE; + RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_STATUS); + bs.append((const ByteStream::byte*) &header, sizeof(header)); + SendToWES(oam, bs); + } + else + { + cout << "redistribute must have one of START, STOP or STATUS" << endl; + } } break; @@ -8394,5 +8547,78 @@ CC_SUSPEND_ANSWER AskSuspendQuestion(int CmdID) } } +// Make a connection to the PM that uses DBRoot1. Used in redistribute +// return true if successful, false if fail. +bool connectToDBRoot1PM(Oam& oam, boost::scoped_ptr& msgQueueClient) +{ + int pmId = 0; + ModuleTypeConfig moduletypeconfig; + + try + { + oam.getDbrootPmConfig(1, pmId); + oam.getSystemConfig("pm", moduletypeconfig); + } + catch (const std::exception& ex) + { + cerr << "Caught exception when getting DBRoot1" << ex.what() << endl; + return false; + } + catch (...) + { + cerr << "Caught exception when getting DBRoot1 -- unknown" << endl; + return false; + } + + // Find the PM that has dbroot1, then make connection to its WES. + ostringstream oss; + oss << "pm" << pmId << "_WriteEngineServer"; + try + { + msgQueueClient.reset(new MessageQueueClient(oss.str())); + } + catch (const std::exception& ex) + { + cerr << "Caught exception when connecting to " << oss.str() << " : " << ex.what() << endl; + return false; + } + catch (...) + { + cerr << "Caught exception when connecting to " << oss.str() << " : unknown" << endl; + } + + return true; +} + +bool SendToWES(Oam& oam, ByteStream bs) +{ + boost::scoped_ptr msgQueueClient; + if (!connectToDBRoot1PM(oam, msgQueueClient)) + return false; + uint32_t status = RED_STATE_UNDEF; + msgQueueClient->write(bs); + + SBS sbs; + sbs = msgQueueClient->read(); + if (sbs->length() == 0) + { + cerr << "WriteEngineServer returned an empty stream. Might be a network error" << endl; + } + else if (sbs->length() < 5) + { + cerr << "WriteEngineServer returned too few bytes. Refistribute status is unknown" << endl; + } + else + { + ByteStream::byte wesMsgId; + *sbs >> wesMsgId; + *sbs >> status; + + string msg; + *sbs >> msg; + cout << "WriteEngineServer returned status " << status << ": "<< msg << endl; + } + return true; +} // vim:ts=4 sw=4: diff --git a/writeengine/redistribute/we_redistributecontrol.cpp b/writeengine/redistribute/we_redistributecontrol.cpp index 187bceb23..1b029763a 100644 --- a/writeengine/redistribute/we_redistributecontrol.cpp +++ b/writeengine/redistribute/we_redistributecontrol.cpp @@ -189,7 +189,7 @@ int RedistributeControl::handleStartMsg(messageqcpp::ByteStream& bs, messageqcpp if (status != RED_STATE_IDLE) { if (status == RED_STATE_ACTIVE) - oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active session before start a new one."; + oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active session before starting a new one."; else oss << "Redistribute is not in IDLE state. Command is ignored. Please check the status of last session, then reset the state to IDLE using action CLEAR."; @@ -338,7 +338,7 @@ int RedistributeControl::handleStopMsg(messageqcpp::ByteStream&, messageqcpp::IO uint32_t status = getCurrentState(); if (status != RED_STATE_ACTIVE) { - oss << "Redistribute is not running. Command is ignored."; + oss << "Redistribute is not running. Command is ignored."; } else { @@ -362,7 +362,7 @@ int RedistributeControl::handleClearMsg(messageqcpp::ByteStream&, messageqcpp::I uint32_t status = getCurrentState(); if (status == RED_STATE_ACTIVE) { - oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running session."; + oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running session."; } else { diff --git a/writeengine/redistribute/we_redistributedef.h b/writeengine/redistribute/we_redistributedef.h index cb320374b..f3b3bedd3 100644 --- a/writeengine/redistribute/we_redistributedef.h +++ b/writeengine/redistribute/we_redistributedef.h @@ -22,6 +22,7 @@ #ifndef WE_REDISTRIBUTEDEF_H #define WE_REDISTRIBUTEDEF_H +#include #include namespace redistribute