1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

MCOL-307 Add Redistribute command to mcsadmin

This commit is contained in:
David Hall
2016-11-08 15:19:31 -06:00
parent 15d6cafb53
commit bd17d20710
4 changed files with 237 additions and 5 deletions

View File

@ -26,7 +26,12 @@
<Arg1>None</Arg1>
</Cmd3>
<Cmd4>
<Name>AVAILABLE</Name>
<Name>redistribute</Name>
<Desc1>Redistribute data accross all dbroots to balance disk usage</Desc1>
<Arg1>START to begin a redistribution</Arg1>
<Arg2>START REMOVE [dbroots] to redistribute to all but the enumerated dbroots, leaving those empty</Arg2>
<Arg3>STOP to stop redistribution before completion</Arg3>
<Arg4>STATUS to to view statistics and progress</Arg4>
</Cmd4>
<Cmd5>
<Name>AVAILABLE</Name>

View File

@ -29,15 +29,23 @@ extern int h_errno;
#include "mcsadmin.h"
#include "boost/filesystem/operations.hpp"
#include "boost/filesystem/path.hpp"
#include "boost/scoped_ptr.hpp"
#include "boost/tokenizer.hpp"
#include "sessionmanager.h"
#include "dbrm.h"
#include "messagequeue.h"
#include "we_messages.h"
#include "we_redistributedef.h"
namespace fs = boost::filesystem;
using namespace alarmmanager;
using namespace std;
using namespace oam;
using namespace config;
using namespace messageqcpp;
using namespace redistribute;
#include "installdir.h"
@ -60,6 +68,8 @@ string HOME = "/root";
bool repeatStop;
static void checkPromptThread();
bool connectToDBRoot1PM(Oam& oam, boost::scoped_ptr<MessageQueueClient>& msgQueueClient);
bool SendToWES(Oam& oam, ByteStream bs);
bool waitForActive()
{
@ -702,8 +712,151 @@ int processCommand(string* arguments)
}
break;
case 4: // Available
case 4: // redistribute
{
set<uint32_t> removeDbroots; // set of dbroots we want to leave empty
vector<uint32_t> srcDbroots; // all of the currently configured dbroots
vector<uint32_t> destDbroots; // srcDbroots - removeDbroots
set<int>::iterator dbiter;
if (arguments[1] == "start")
{
// Get a list of all the configured dbroots in the xml file.
DBRootConfigList dbRootConfigList;
std::set<int> configuredDBRoots;
oam.getSystemDbrootConfig(dbRootConfigList);
for (DBRootConfigList::iterator i = dbRootConfigList.begin(); i != dbRootConfigList.end(); ++i)
configuredDBRoots.insert(*i);
// The user may choose to redistribute in such a way as to
// leave certain dbroots empty, presumably for later removal.
if (arguments[2] == "remove")
{
int dbroot;
bool error = false;
for (int i=3; arguments[i] != ""; ++i)
{
dbroot = atoi(arguments[i].c_str());
if (dbroot == 1)
{
cout << "Not allowed to remove dbroot-1" << endl;
error = true;
}
else
{
if (configuredDBRoots.find(dbroot) == configuredDBRoots.end())
{
ostringstream oss;
cout << "DBRoot-" << dbroot << " is not configured" << endl;
error = true;
}
else
{
removeDbroots.insert((uint32_t)dbroot);
}
}
}
if (error)
{
cout << "Errors encountered. Abort" << endl;
break;
}
}
// Create a list of source dbroots -- where the data currently resides.
for (dbiter = configuredDBRoots.begin(); dbiter != configuredDBRoots.end(); ++dbiter)
srcDbroots.push_back((uint32_t)*dbiter);
// Create a list of destination dbroots -- where the data is to go.
for (dbiter = configuredDBRoots.begin(); dbiter != configuredDBRoots.end(); ++dbiter)
{
// Only use the dbroots not in the remove list
if (removeDbroots.find((uint32_t)*dbiter) == removeDbroots.end())
{
destDbroots.push_back((uint32_t)*dbiter);
}
}
// Print out what we're about to do
cout << "Redistribute START ";
if (removeDbroots.size() > 0)
{
cout << " Removing dbroots:";
set<uint32_t>::iterator iter;
for (iter = removeDbroots.begin(); iter != removeDbroots.end(); ++iter)
{
cout << " " << *iter;
}
}
cout << endl;
cout << "Source dbroots:";
vector<uint32_t>::iterator iter;
for (iter = srcDbroots.begin(); iter != srcDbroots.end(); ++iter)
cout << " " << *iter;
cout << endl << "Destination dbroots:";
for (iter = destDbroots.begin(); iter != destDbroots.end(); ++iter)
cout << " " << *iter;
cout << endl << endl;
// Connect to PM for dbroot1
ByteStream bs;
// message WES ID, sequence #, action id
uint32_t sequence = time(0);
bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
// Send the CLEAR message to WriteEngineServer (WES). Wipes out previous state.
RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_CLEAR);
bs.append((const ByteStream::byte*) &header, sizeof(header));
SendToWES(oam, bs);
// Send the START message
bs.restart();
sequence = time(0);
bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
header.sequenceNum=sequence;
header.messageId = RED_CNTL_START;
bs.append((const ByteStream::byte*) &header, sizeof(header));
uint32_t options = 0;
if (removeDbroots.size() > 0)
{
options |= RED_OPTN_REMOVE;
}
bs << options;
// source db roots,
bs << (uint32_t) srcDbroots.size();
for (uint64_t i = 0; i < srcDbroots.size(); ++i)
bs << (uint32_t) srcDbroots[i];
// destination db roots,
bs << (uint32_t) destDbroots.size();
for (uint64_t i = 0; i < destDbroots.size(); ++i)
bs << (uint32_t) destDbroots[i];
SendToWES(oam, bs);
}
else if (arguments[1] == "stop")
{
ByteStream bs;
// message WES ID, sequence #, action id
uint32_t sequence = time(0);
bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_STOP);
bs.append((const ByteStream::byte*) &header, sizeof(header));
SendToWES(oam, bs);
}
else if (arguments[1] == "status")
{
ByteStream bs;
// message WES ID, sequence #, action id
uint32_t sequence = time(0);
bs << (ByteStream::byte) WriteEngine::WE_SVR_REDISTRIBUTE;
RedistributeMsgHeader header(0, 0, sequence, RED_CNTL_STATUS);
bs.append((const ByteStream::byte*) &header, sizeof(header));
SendToWES(oam, bs);
}
else
{
cout << "redistribute must have one of START, STOP or STATUS" << endl;
}
}
break;
@ -8394,5 +8547,78 @@ CC_SUSPEND_ANSWER AskSuspendQuestion(int CmdID)
}
}
// Make a connection to the PM that uses DBRoot1. Used in redistribute
// return true if successful, false if fail.
bool connectToDBRoot1PM(Oam& oam, boost::scoped_ptr<MessageQueueClient>& msgQueueClient)
{
int pmId = 0;
ModuleTypeConfig moduletypeconfig;
try
{
oam.getDbrootPmConfig(1, pmId);
oam.getSystemConfig("pm", moduletypeconfig);
}
catch (const std::exception& ex)
{
cerr << "Caught exception when getting DBRoot1" << ex.what() << endl;
return false;
}
catch (...)
{
cerr << "Caught exception when getting DBRoot1 -- unknown" << endl;
return false;
}
// Find the PM that has dbroot1, then make connection to its WES.
ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer";
try
{
msgQueueClient.reset(new MessageQueueClient(oss.str()));
}
catch (const std::exception& ex)
{
cerr << "Caught exception when connecting to " << oss.str() << " : " << ex.what() << endl;
return false;
}
catch (...)
{
cerr << "Caught exception when connecting to " << oss.str() << " : unknown" << endl;
}
return true;
}
bool SendToWES(Oam& oam, ByteStream bs)
{
boost::scoped_ptr<MessageQueueClient> msgQueueClient;
if (!connectToDBRoot1PM(oam, msgQueueClient))
return false;
uint32_t status = RED_STATE_UNDEF;
msgQueueClient->write(bs);
SBS sbs;
sbs = msgQueueClient->read();
if (sbs->length() == 0)
{
cerr << "WriteEngineServer returned an empty stream. Might be a network error" << endl;
}
else if (sbs->length() < 5)
{
cerr << "WriteEngineServer returned too few bytes. Refistribute status is unknown" << endl;
}
else
{
ByteStream::byte wesMsgId;
*sbs >> wesMsgId;
*sbs >> status;
string msg;
*sbs >> msg;
cout << "WriteEngineServer returned status " << status << ": "<< msg << endl;
}
return true;
}
// vim:ts=4 sw=4:

View File

@ -189,7 +189,7 @@ int RedistributeControl::handleStartMsg(messageqcpp::ByteStream& bs, messageqcpp
if (status != RED_STATE_IDLE)
{
if (status == RED_STATE_ACTIVE)
oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active session before start a new one.";
oss << "Redistribute is already running. Command is ignored. You need to stop and clear this active session before starting a new one.";
else
oss << "Redistribute is not in IDLE state. Command is ignored. Please check the status of last session, then reset the state to IDLE using action CLEAR.";

View File

@ -22,6 +22,7 @@
#ifndef WE_REDISTRIBUTEDEF_H
#define WE_REDISTRIBUTEDEF_H
#include <stdint.h>
#include <time.h>
namespace redistribute