diff --git a/oam/etc/ConsoleCmds.xml b/oam/etc/ConsoleCmds.xml
index 01b258751..37c7d14f9 100644
--- a/oam/etc/ConsoleCmds.xml
+++ b/oam/etc/ConsoleCmds.xml
@@ -26,7 +26,12 @@
None
- AVAILABLE
+ redistribute
+ Redistribute data accross all dbroots to balance disk usage
+ START to begin a redistribution
+ START REMOVE [dbroots] to redistribute to all but the enumerated dbroots, leaving those empty
+ STOP to stop redistribution before completion
+ STATUS to to view statistics and progress
AVAILABLE
diff --git a/oamapps/mcsadmin/mcsadmin.cpp b/oamapps/mcsadmin/mcsadmin.cpp
index 50af96f0b..b2c3e27b4 100644
--- a/oamapps/mcsadmin/mcsadmin.cpp
+++ b/oamapps/mcsadmin/mcsadmin.cpp
@@ -29,15 +29,23 @@ extern int h_errno;
#include "mcsadmin.h"
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
+#include "boost/scoped_ptr.hpp"
#include "boost/tokenizer.hpp"
#include "sessionmanager.h"
#include "dbrm.h"
+#include "messagequeue.h"
+#include "we_messages.h"
+#include "we_redistributedef.h"
+
+
namespace fs = boost::filesystem;
using namespace alarmmanager;
using namespace std;
using namespace oam;
using namespace config;
+using namespace messageqcpp;
+using namespace redistribute;
#include "installdir.h"
@@ -60,6 +68,8 @@ string HOME = "/root";
bool repeatStop;
static void checkPromptThread();
+bool connectToDBRoot1PM(Oam& oam, boost::scoped_ptr& msgQueueClient);
+bool SendToWES(Oam& oam, ByteStream bs);
bool waitForActive()
{
@@ -702,8 +712,151 @@ int processCommand(string* arguments)
}
break;
- case 4: // Available
+ case 4: // redistribute
{
+ set removeDbroots; // set of dbroots we want to leave empty
+ vector srcDbroots; // all of the currently configured dbroots
+ vector destDbroots; // srcDbroots - removeDbroots
+ set::iterator dbiter;
+ if (arguments[1] == "start")
+ {
+ // Get a list of all the configured dbroots in the xml file.
+ DBRootConfigList dbRootConfigList;
+ std::set configuredDBRoots;
+ oam.getSystemDbrootConfig(dbRootConfigList);
+ for (DBRootConfigList::iterator i = dbRootConfigList.begin(); i != dbRootConfigList.end(); ++i)
+ configuredDBRoots.insert(*i);
+
+ // The user may choose to redistribute in such a way as to
+ // leave certain dbroots empty, presumably for later removal.
+ if (arguments[2] == "remove")
+ {
+ int dbroot;
+ bool error = false;
+ for (int i=3; arguments[i] != ""; ++i)
+ {
+ dbroot = atoi(arguments[i].c_str());
+ if (dbroot == 1)
+ {
+ cout << "Not allowed to remove dbroot-1" << endl;
+ error = true;
+ }
+ else
+ {
+ if (configuredDBRoots.find(dbroot) == configuredDBRoots.end())
+ {
+ ostringstream oss;
+ cout << "DBRoot-" << dbroot << " is not configured" << endl;
+ error = true;
+ }
+ else
+ {
+ removeDbroots.insert((uint32_t)dbroot);
+ }
+ }
+ }
+ if (error)
+ {
+ cout << "Errors encountered. Abort" << endl;
+ break;
+ }
+ }
+
+ // Create a list of source dbroots -- where the data currently resides.
+ for (dbiter = configuredDBRoots.begin(); dbiter != configuredDBRoots.end(); ++dbiter)
+ srcDbroots.push_back((uint32_t)*dbiter);
+
+ // Create a list of destination dbroots -- where the data is to go.
+ for (dbiter = configuredDBRoots.begin(); dbiter != configuredDBRoots.end(); ++dbiter)
+ {
+ // Only use the dbroots not in the remove list
+ if (removeDbroots.find((uint32_t)*dbiter) == removeDbroots.end())
+ {
+ destDbroots.push_back((uint32_t)*dbiter);
+ }
+ }
+ // Print out what we're about to do
+ cout << "Redistribute START ";
+ if (removeDbroots.size() > 0)
+ {
+ cout << " Removing dbroots:";
+ set::iterator iter;
+ for (iter = removeDbroots.begin(); iter != removeDbroots.end(); ++iter)
+ {
+ cout << " " << *iter;
+ }
+ }
+ cout << endl;
+ cout << "Source dbroots:";
+ vector::iterator iter;
+ for (iter = srcDbroots.begin(); iter != srcDbroots.end(); ++iter)
+ cout << " " << *iter;
+ cout << endl << "Destination dbroots:";
+ for (iter = destDbroots.begin(); iter != destDbroots.end(); ++iter)
+ cout << " " << *iter;
+ cout << endl << endl;
+
+ // Connect to PM for dbroot1
+ ByteStream bs;
+ // message WES ID, sequence #, action id
+ uint32_t sequence = time(0);
+ bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
+
+ // Send the CLEAR message to WriteEngineServer (WES). Wipes out previous state.
+ RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_CLEAR);
+ bs.append((const ByteStream::byte*) &header, sizeof(header));
+ SendToWES(oam, bs);
+
+ // Send the START message
+ bs.restart();
+ sequence = time(0);
+ bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
+ header.sequenceNum=sequence;
+ header.messageId = RED_CNTL_START;
+ bs.append((const ByteStream::byte*) &header, sizeof(header));
+ uint32_t options = 0;
+ if (removeDbroots.size() > 0)
+ {
+ options |= RED_OPTN_REMOVE;
+ }
+ bs << options;
+
+ // source db roots,
+ bs << (uint32_t) srcDbroots.size();
+ for (uint64_t i = 0; i < srcDbroots.size(); ++i)
+ bs << (uint32_t) srcDbroots[i];
+
+ // destination db roots,
+ bs << (uint32_t) destDbroots.size();
+ for (uint64_t i = 0; i < destDbroots.size(); ++i)
+ bs << (uint32_t) destDbroots[i];
+
+ SendToWES(oam, bs);
+ }
+ else if (arguments[1] == "stop")
+ {
+ ByteStream bs;
+ // message WES ID, sequence #, action id
+ uint32_t sequence = time(0);
+ bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
+ RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_STOP);
+ bs.append((const ByteStream::byte*) &header, sizeof(header));
+ SendToWES(oam, bs);
+ }
+ else if (arguments[1] == "status")
+ {
+ ByteStream bs;
+ // message WES ID, sequence #, action id
+ uint32_t sequence = time(0);
+ bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
+ RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_STATUS);
+ bs.append((const ByteStream::byte*) &header, sizeof(header));
+ SendToWES(oam, bs);
+ }
+ else
+ {
+ cout << "redistribute must have one of START, STOP or STATUS" << endl;
+ }
}
break;
@@ -8394,5 +8547,78 @@ CC_SUSPEND_ANSWER AskSuspendQuestion(int CmdID)
}
}
+// Make a connection to the PM that uses DBRoot1. Used in redistribute
+// return true if successful, false if fail.
+bool connectToDBRoot1PM(Oam& oam, boost::scoped_ptr& msgQueueClient)
+{
+ int pmId = 0;
+ ModuleTypeConfig moduletypeconfig;
+
+ try
+ {
+ oam.getDbrootPmConfig(1, pmId);
+ oam.getSystemConfig("pm", moduletypeconfig);
+ }
+ catch (const std::exception& ex)
+ {
+ cerr << "Caught exception when getting DBRoot1" << ex.what() << endl;
+ return false;
+ }
+ catch (...)
+ {
+ cerr << "Caught exception when getting DBRoot1 -- unknown" << endl;
+ return false;
+ }
+
+ // Find the PM that has dbroot1, then make connection to its WES.
+ ostringstream oss;
+ oss << "pm" << pmId << "_WriteEngineServer";
+ try
+ {
+ msgQueueClient.reset(new MessageQueueClient(oss.str()));
+ }
+ catch (const std::exception& ex)
+ {
+ cerr << "Caught exception when connecting to " << oss.str() << " : " << ex.what() << endl;
+ return false;
+ }
+ catch (...)
+ {
+ cerr << "Caught exception when connecting to " << oss.str() << " : unknown" << endl;
+ }
+
+ return true;
+}
+
+bool SendToWES(Oam& oam, ByteStream bs)
+{
+ boost::scoped_ptr msgQueueClient;
+ if (!connectToDBRoot1PM(oam, msgQueueClient))
+ return false;
+ uint32_t status = RED_STATE_UNDEF;
+ msgQueueClient->write(bs);
+
+ SBS sbs;
+ sbs = msgQueueClient->read();
+ if (sbs->length() == 0)
+ {
+ cerr << "WriteEngineServer returned an empty stream. Might be a network error" << endl;
+ }
+ else if (sbs->length() < 5)
+ {
+ cerr << "WriteEngineServer returned too few bytes. Refistribute status is unknown" << endl;
+ }
+ else
+ {
+ ByteStream::byte wesMsgId;
+ *sbs >> wesMsgId;
+ *sbs >> status;
+
+ string msg;
+ *sbs >> msg;
+ cout << "WriteEngineServer returned status " << status << ": "<< msg << endl;
+ }
+ return true;
+}
// vim:ts=4 sw=4:
diff --git a/writeengine/redistribute/we_redistributecontrol.cpp b/writeengine/redistribute/we_redistributecontrol.cpp
index 187bceb23..1b029763a 100644
--- a/writeengine/redistribute/we_redistributecontrol.cpp
+++ b/writeengine/redistribute/we_redistributecontrol.cpp
@@ -189,7 +189,7 @@ int RedistributeControl::handleStartMsg(messageqcpp::ByteStream& bs, messageqcpp
if (status != RED_STATE_IDLE)
{
if (status == RED_STATE_ACTIVE)
- oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active session before start a new one.";
+ oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active session before starting a new one.";
else
oss << "Redistribute is not in IDLE state. Command is ignored. Please check the status of last session, then reset the state to IDLE using action CLEAR.";
@@ -338,7 +338,7 @@ int RedistributeControl::handleStopMsg(messageqcpp::ByteStream&, messageqcpp::IO
uint32_t status = getCurrentState();
if (status != RED_STATE_ACTIVE)
{
- oss << "Redistribute is not running. Command is ignored.";
+ oss << "Redistribute is not running. Command is ignored.";
}
else
{
@@ -362,7 +362,7 @@ int RedistributeControl::handleClearMsg(messageqcpp::ByteStream&, messageqcpp::I
uint32_t status = getCurrentState();
if (status == RED_STATE_ACTIVE)
{
- oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running session.";
+ oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running session.";
}
else
{
diff --git a/writeengine/redistribute/we_redistributedef.h b/writeengine/redistribute/we_redistributedef.h
index cb320374b..f3b3bedd3 100644
--- a/writeengine/redistribute/we_redistributedef.h
+++ b/writeengine/redistribute/we_redistributedef.h
@@ -22,6 +22,7 @@
#ifndef WE_REDISTRIBUTEDEF_H
#define WE_REDISTRIBUTEDEF_H
+#include
#include
namespace redistribute