mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
added management function to purge stale sessions in the management server
ndb/include/util/Bitmask.hpp: added bitXORC ndb/include/util/SocketServer.hpp: added method to apply function on each session ndb/src/common/util/SocketServer.cpp: added method to apply function on each session
This commit is contained in:
@ -733,6 +733,7 @@ extern "C" {
|
|||||||
int param, unsigned long long * value);
|
int param, unsigned long long * value);
|
||||||
int ndb_mgm_get_string_parameter(const ndb_mgm_configuration_iterator*,
|
int ndb_mgm_get_string_parameter(const ndb_mgm_configuration_iterator*,
|
||||||
int param, const char ** value);
|
int param, const char ** value);
|
||||||
|
int ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -104,6 +104,11 @@ public:
|
|||||||
*/
|
*/
|
||||||
static void bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]);
|
static void bitXOR(unsigned size, Uint32 data[], const Uint32 data2[]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* bitXORC - Bitwise (x ^ ~y) into first operand.
|
||||||
|
*/
|
||||||
|
static void bitXORC(unsigned size, Uint32 data[], const Uint32 data2[]);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* contains - Check if all bits set in data2 are set in data
|
* contains - Check if all bits set in data2 are set in data
|
||||||
*/
|
*/
|
||||||
@ -261,6 +266,14 @@ BitmaskImpl::bitXOR(unsigned size, Uint32 data[], const Uint32 data2[])
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void
|
||||||
|
BitmaskImpl::bitXORC(unsigned size, Uint32 data[], const Uint32 data2[])
|
||||||
|
{
|
||||||
|
for (unsigned i = 0; i < size; i++) {
|
||||||
|
data[i] ^= ~data2[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
inline bool
|
inline bool
|
||||||
BitmaskImpl::contains(unsigned size, Uint32 data[], const Uint32 data2[])
|
BitmaskImpl::contains(unsigned size, Uint32 data[], const Uint32 data2[])
|
||||||
{
|
{
|
||||||
@ -451,6 +464,12 @@ public:
|
|||||||
static void bitXOR(Uint32 data[], const Uint32 data2[]);
|
static void bitXOR(Uint32 data[], const Uint32 data2[]);
|
||||||
BitmaskPOD<size>& bitXOR(const BitmaskPOD<size>& mask2);
|
BitmaskPOD<size>& bitXOR(const BitmaskPOD<size>& mask2);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* bitXORC - Bitwise (x ^ ~y) into first operand.
|
||||||
|
*/
|
||||||
|
static void bitXORC(Uint32 data[], const Uint32 data2[]);
|
||||||
|
BitmaskPOD<size>& bitXORC(const BitmaskPOD<size>& mask2);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* contains - Check if all bits set in data2 (that) are also set in data (this)
|
* contains - Check if all bits set in data2 (that) are also set in data (this)
|
||||||
*/
|
*/
|
||||||
@ -712,6 +731,21 @@ BitmaskPOD<size>::bitXOR(const BitmaskPOD<size>& mask2)
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <unsigned size>
|
||||||
|
inline void
|
||||||
|
BitmaskPOD<size>::bitXORC(Uint32 data[], const Uint32 data2[])
|
||||||
|
{
|
||||||
|
BitmaskImpl::bitXORC(size,data, data2);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <unsigned size>
|
||||||
|
inline BitmaskPOD<size>&
|
||||||
|
BitmaskPOD<size>::bitXORC(const BitmaskPOD<size>& mask2)
|
||||||
|
{
|
||||||
|
BitmaskPOD<size>::bitXORC(rep.data, mask2.rep.data);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
template <unsigned size>
|
template <unsigned size>
|
||||||
char *
|
char *
|
||||||
BitmaskPOD<size>::getText(const Uint32 data[], char* buf)
|
BitmaskPOD<size>::getText(const Uint32 data[], char* buf)
|
||||||
|
@ -37,7 +37,7 @@ public:
|
|||||||
public:
|
public:
|
||||||
virtual ~Session() {}
|
virtual ~Session() {}
|
||||||
virtual void runSession(){}
|
virtual void runSession(){}
|
||||||
virtual void stopSession(){}
|
virtual void stopSession(){ m_stop = true; }
|
||||||
protected:
|
protected:
|
||||||
friend class SocketServer;
|
friend class SocketServer;
|
||||||
friend void* sessionThread_C(void*);
|
friend void* sessionThread_C(void*);
|
||||||
@ -98,6 +98,8 @@ public:
|
|||||||
*/
|
*/
|
||||||
void stopSessions(bool wait = false);
|
void stopSessions(bool wait = false);
|
||||||
|
|
||||||
|
void foreachSession(void (*f)(SocketServer::Session*, void*), void *data);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct SessionInstance {
|
struct SessionInstance {
|
||||||
Service * m_service;
|
Service * m_service;
|
||||||
|
@ -258,6 +258,15 @@ transfer(NDB_SOCKET_TYPE sock){
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
|
||||||
|
{
|
||||||
|
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
||||||
|
(*func)(m_sessions[i].m_session, data);
|
||||||
|
}
|
||||||
|
checkSessions();
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
SocketServer::checkSessions(){
|
SocketServer::checkSessions(){
|
||||||
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
||||||
@ -278,8 +287,10 @@ void
|
|||||||
SocketServer::stopSessions(bool wait){
|
SocketServer::stopSessions(bool wait){
|
||||||
int i;
|
int i;
|
||||||
for(i = m_sessions.size() - 1; i>=0; i--)
|
for(i = m_sessions.size() - 1; i>=0; i--)
|
||||||
m_sessions[i].m_session->m_stop = true;
|
{
|
||||||
|
m_sessions[i].m_session->stopSession();
|
||||||
|
m_sessions[i].m_session->m_stop = true; // to make sure
|
||||||
|
}
|
||||||
for(i = m_services.size() - 1; i>=0; i--)
|
for(i = m_services.size() - 1; i>=0; i--)
|
||||||
m_services[i].m_service->stopSessions();
|
m_services[i].m_service->stopSessions();
|
||||||
|
|
||||||
|
@ -1834,4 +1834,46 @@ ndb_mgm_set_string_parameter(NdbMgmHandle handle,
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extern "C"
|
||||||
|
int
|
||||||
|
ndb_mgm_purge_stale_sessions(NdbMgmHandle handle, char **purged){
|
||||||
|
CHECK_HANDLE(handle, 0);
|
||||||
|
CHECK_CONNECTED(handle, 0);
|
||||||
|
|
||||||
|
Properties args;
|
||||||
|
|
||||||
|
const ParserRow<ParserDummy> reply[]= {
|
||||||
|
MGM_CMD("purge stale sessions reply", NULL, ""),
|
||||||
|
MGM_ARG("purged", String, Optional, ""),
|
||||||
|
MGM_ARG("result", String, Mandatory, "Error message"),
|
||||||
|
MGM_END()
|
||||||
|
};
|
||||||
|
|
||||||
|
const Properties *prop;
|
||||||
|
prop= ndb_mgm_call(handle, reply, "purge stale sessions", &args);
|
||||||
|
|
||||||
|
if(prop == NULL) {
|
||||||
|
SET_ERROR(handle, EIO, "Unable to purge stale sessions");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int res= -1;
|
||||||
|
do {
|
||||||
|
const char * buf;
|
||||||
|
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
|
||||||
|
ndbout_c("ERROR Message: %s\n", buf);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (purged) {
|
||||||
|
if (prop->get("purged", &buf))
|
||||||
|
*purged= strdup(buf);
|
||||||
|
else
|
||||||
|
*purged= 0;
|
||||||
|
}
|
||||||
|
res= 0;
|
||||||
|
} while(0);
|
||||||
|
delete prop;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
template class Vector<const ParserRow<ParserDummy>*>;
|
template class Vector<const ParserRow<ParserDummy>*>;
|
||||||
|
@ -106,6 +106,7 @@ private:
|
|||||||
*/
|
*/
|
||||||
void executeHelp(char* parameters);
|
void executeHelp(char* parameters);
|
||||||
void executeShow(char* parameters);
|
void executeShow(char* parameters);
|
||||||
|
void executePurge(char* parameters);
|
||||||
void executeShutdown(char* parameters);
|
void executeShutdown(char* parameters);
|
||||||
void executeRun(char* parameters);
|
void executeRun(char* parameters);
|
||||||
void executeInfo(char* parameters);
|
void executeInfo(char* parameters);
|
||||||
@ -264,6 +265,7 @@ static const char* helpText =
|
|||||||
#ifdef HAVE_GLOBAL_REPLICATION
|
#ifdef HAVE_GLOBAL_REPLICATION
|
||||||
"REP CONNECT <host:port> Connect to REP server on host:port\n"
|
"REP CONNECT <host:port> Connect to REP server on host:port\n"
|
||||||
#endif
|
#endif
|
||||||
|
"PURGE STALE SESSIONS Reset reserved nodeid's in the mgmt server\n"
|
||||||
"QUIT Quit management client\n"
|
"QUIT Quit management client\n"
|
||||||
;
|
;
|
||||||
|
|
||||||
@ -541,6 +543,10 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect)
|
|||||||
executeAbortBackup(allAfterFirstToken);
|
executeAbortBackup(allAfterFirstToken);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
else if (strcmp(firstToken, "PURGE") == 0) {
|
||||||
|
executePurge(allAfterFirstToken);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
#ifdef HAVE_GLOBAL_REPLICATION
|
#ifdef HAVE_GLOBAL_REPLICATION
|
||||||
else if(strcmp(firstToken, "REPLICATION") == 0 ||
|
else if(strcmp(firstToken, "REPLICATION") == 0 ||
|
||||||
strcmp(firstToken, "REP") == 0) {
|
strcmp(firstToken, "REP") == 0) {
|
||||||
@ -982,6 +988,46 @@ print_nodes(ndb_mgm_cluster_state *state, ndb_mgm_configuration_iterator *it,
|
|||||||
ndbout << endl;
|
ndbout << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
CommandInterpreter::executePurge(char* parameters)
|
||||||
|
{
|
||||||
|
int command_ok= 0;
|
||||||
|
do {
|
||||||
|
if (emptyString(parameters))
|
||||||
|
break;
|
||||||
|
char* firstToken = strtok(parameters, " ");
|
||||||
|
char* nextToken = strtok(NULL, " \0");
|
||||||
|
if (strcmp(firstToken,"STALE") == 0 &&
|
||||||
|
nextToken &&
|
||||||
|
strcmp(nextToken, "SESSIONS") == 0) {
|
||||||
|
command_ok= 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} while(0);
|
||||||
|
|
||||||
|
if (!command_ok) {
|
||||||
|
ndbout_c("Unexpected command, expected: PURGE STALE SESSIONS");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int i;
|
||||||
|
char *str;
|
||||||
|
connect();
|
||||||
|
|
||||||
|
if (ndb_mgm_purge_stale_sessions(m_mgmsrv, &str)) {
|
||||||
|
ndbout_c("Command failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (str) {
|
||||||
|
ndbout_c("Purged sessions with node id's: %s", str);
|
||||||
|
free(str);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ndbout_c("No sessions purged");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
CommandInterpreter::executeShow(char* parameters)
|
CommandInterpreter::executeShow(char* parameters)
|
||||||
{
|
{
|
||||||
|
@ -400,11 +400,13 @@ MgmtSrvr::getPort() const {
|
|||||||
|
|
||||||
/* Constructor */
|
/* Constructor */
|
||||||
MgmtSrvr::MgmtSrvr(NodeId nodeId,
|
MgmtSrvr::MgmtSrvr(NodeId nodeId,
|
||||||
|
SocketServer *socket_server,
|
||||||
const BaseString &configFilename,
|
const BaseString &configFilename,
|
||||||
LocalConfig &local_config,
|
LocalConfig &local_config,
|
||||||
Config * config):
|
Config * config):
|
||||||
_blockNumber(1), // Hard coded block number since it makes it easy to send
|
_blockNumber(1), // Hard coded block number since it makes it easy to send
|
||||||
// signals to other management servers.
|
// signals to other management servers.
|
||||||
|
m_socket_server(socket_server),
|
||||||
_ownReference(0),
|
_ownReference(0),
|
||||||
m_local_config(local_config),
|
m_local_config(local_config),
|
||||||
theSignalIdleList(NULL),
|
theSignalIdleList(NULL),
|
||||||
@ -2094,6 +2096,25 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
|
|||||||
return nodeTypes[nodeId];
|
return nodeTypes[nodeId];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const
|
||||||
|
{
|
||||||
|
if (theFacade && theFacade->theClusterMgr)
|
||||||
|
{
|
||||||
|
for(Uint32 i = 0; i < MAX_NODES; i++)
|
||||||
|
{
|
||||||
|
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB)
|
||||||
|
{
|
||||||
|
const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
|
||||||
|
if (node.connected)
|
||||||
|
{
|
||||||
|
connected_nodes.bitOR(node.m_state.m_connected_nodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
||||||
enum ndb_mgm_node_type type,
|
enum ndb_mgm_node_type type,
|
||||||
@ -2106,7 +2127,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
|||||||
*nodeId, type, client_addr));
|
*nodeId, type, client_addr));
|
||||||
if (g_no_nodeid_checks) {
|
if (g_no_nodeid_checks) {
|
||||||
if (*nodeId == 0) {
|
if (*nodeId == 0) {
|
||||||
error_string.appfmt("no-nodeid-ckecks set in manegment server.\n"
|
error_string.appfmt("no-nodeid-checks set in management server.\n"
|
||||||
"node id must be set explicitly in connectstring");
|
"node id must be set explicitly in connectstring");
|
||||||
DBUG_RETURN(false);
|
DBUG_RETURN(false);
|
||||||
}
|
}
|
||||||
@ -2115,16 +2136,11 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
|||||||
Guard g(m_node_id_mutex);
|
Guard g(m_node_id_mutex);
|
||||||
int no_mgm= 0;
|
int no_mgm= 0;
|
||||||
NodeBitmask connected_nodes(m_reserved_nodes);
|
NodeBitmask connected_nodes(m_reserved_nodes);
|
||||||
for(Uint32 i = 0; i < MAX_NODES; i++)
|
get_connected_nodes(connected_nodes);
|
||||||
{
|
{
|
||||||
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB &&
|
for(Uint32 i = 0; i < MAX_NODES; i++)
|
||||||
theFacade && theFacade->theClusterMgr) {
|
if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM)
|
||||||
const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
|
no_mgm++;
|
||||||
if (node.connected) {
|
|
||||||
connected_nodes.bitOR(node.m_state.m_connected_nodes);
|
|
||||||
}
|
|
||||||
} else if (getNodeType(i) == NDB_MGM_NODE_TYPE_MGM)
|
|
||||||
no_mgm++;
|
|
||||||
}
|
}
|
||||||
bool found_matching_id= false;
|
bool found_matching_id= false;
|
||||||
bool found_matching_type= false;
|
bool found_matching_type= false;
|
||||||
@ -2227,6 +2243,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
|||||||
m_connect_address[id_found].s_addr= 0;
|
m_connect_address[id_found].s_addr= 0;
|
||||||
}
|
}
|
||||||
m_reserved_nodes.set(id_found);
|
m_reserved_nodes.set(id_found);
|
||||||
|
char tmp_str[128];
|
||||||
|
m_reserved_nodes.getText(tmp_str);
|
||||||
|
g_EventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.",
|
||||||
|
id_found, get_connect_address(id_found), tmp_str);
|
||||||
DBUG_RETURN(true);
|
DBUG_RETURN(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2283,6 +2303,36 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
|
|||||||
error_string.appfmt("No node defined with id=%d in config file.",
|
error_string.appfmt("No node defined with id=%d in config file.",
|
||||||
*nodeId);
|
*nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
g_EventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. "
|
||||||
|
"Returned error string \"%s\"",
|
||||||
|
*nodeId,
|
||||||
|
client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>",
|
||||||
|
error_string.c_str());
|
||||||
|
|
||||||
|
NodeBitmask connected_nodes2;
|
||||||
|
get_connected_nodes(connected_nodes2);
|
||||||
|
{
|
||||||
|
BaseString tmp_connected, tmp_not_connected;
|
||||||
|
for(Uint32 i = 0; i < MAX_NODES; i++)
|
||||||
|
{
|
||||||
|
if (connected_nodes2.get(i))
|
||||||
|
{
|
||||||
|
if (!m_reserved_nodes.get(i))
|
||||||
|
tmp_connected.appfmt(" %d", i);
|
||||||
|
}
|
||||||
|
else if (m_reserved_nodes.get(i))
|
||||||
|
{
|
||||||
|
tmp_not_connected.appfmt(" %d", i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (tmp_connected.length() > 0)
|
||||||
|
g_EventLogger.info("Mgmt server state: node id's %s connected but not reserved",
|
||||||
|
tmp_connected.c_str());
|
||||||
|
if (tmp_not_connected.length() > 0)
|
||||||
|
g_EventLogger.info("Mgmt server state: node id's %s not connected but reserved",
|
||||||
|
tmp_not_connected.c_str());
|
||||||
|
}
|
||||||
DBUG_RETURN(false);
|
DBUG_RETURN(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2531,10 +2581,15 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
|
|||||||
{
|
{
|
||||||
Guard g(m_mgmsrv.m_node_id_mutex);
|
Guard g(m_mgmsrv.m_node_id_mutex);
|
||||||
if (!m_reserved_nodes.isclear()) {
|
if (!m_reserved_nodes.isclear()) {
|
||||||
|
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
|
||||||
// node has been reserved, force update signal to ndb nodes
|
// node has been reserved, force update signal to ndb nodes
|
||||||
global_flag_send_heartbeat_now= 1;
|
global_flag_send_heartbeat_now= 1;
|
||||||
|
|
||||||
|
char tmp_str[128];
|
||||||
|
m_mgmsrv.m_reserved_nodes.getText(tmp_str);
|
||||||
|
g_EventLogger.info("Mgmt server state: nodeid %d freed, m_reserved_nodes %s.",
|
||||||
|
get_nodeid(), tmp_str);
|
||||||
}
|
}
|
||||||
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -2543,6 +2598,17 @@ MgmtSrvr::Allocated_resources::reserve_node(NodeId id)
|
|||||||
m_reserved_nodes.set(id);
|
m_reserved_nodes.set(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NodeId
|
||||||
|
MgmtSrvr::Allocated_resources::get_nodeid() const
|
||||||
|
{
|
||||||
|
for(Uint32 i = 0; i < MAX_NODES; i++)
|
||||||
|
{
|
||||||
|
if (m_reserved_nodes.get(i))
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
MgmtSrvr::setDbParameter(int node, int param, const char * value,
|
MgmtSrvr::setDbParameter(int node, int param, const char * value,
|
||||||
BaseString& msg){
|
BaseString& msg){
|
||||||
|
@ -96,7 +96,10 @@ public:
|
|||||||
// methods to reserve/allocate resources which
|
// methods to reserve/allocate resources which
|
||||||
// will be freed when running destructor
|
// will be freed when running destructor
|
||||||
void reserve_node(NodeId id);
|
void reserve_node(NodeId id);
|
||||||
bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId);}
|
bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); }
|
||||||
|
bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); }
|
||||||
|
bool isclear() { return m_reserved_nodes.isclear(); }
|
||||||
|
NodeId get_nodeid() const;
|
||||||
private:
|
private:
|
||||||
MgmtSrvr &m_mgmsrv;
|
MgmtSrvr &m_mgmsrv;
|
||||||
NodeBitmask m_reserved_nodes;
|
NodeBitmask m_reserved_nodes;
|
||||||
@ -173,6 +176,7 @@ public:
|
|||||||
/* Constructor */
|
/* Constructor */
|
||||||
|
|
||||||
MgmtSrvr(NodeId nodeId, /* Local nodeid */
|
MgmtSrvr(NodeId nodeId, /* Local nodeid */
|
||||||
|
SocketServer *socket_server,
|
||||||
const BaseString &config_filename, /* Where to save config */
|
const BaseString &config_filename, /* Where to save config */
|
||||||
LocalConfig &local_config, /* Ndb.cfg filename */
|
LocalConfig &local_config, /* Ndb.cfg filename */
|
||||||
Config * config);
|
Config * config);
|
||||||
@ -499,6 +503,9 @@ public:
|
|||||||
int setDbParameter(int node, int parameter, const char * value, BaseString&);
|
int setDbParameter(int node, int parameter, const char * value, BaseString&);
|
||||||
|
|
||||||
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
|
const char *get_connect_address(Uint32 node_id) { return inet_ntoa(m_connect_address[node_id]); }
|
||||||
|
void get_connected_nodes(NodeBitmask &connected_nodes) const;
|
||||||
|
SocketServer *get_socket_server() { return m_socket_server; }
|
||||||
|
|
||||||
//**************************************************************************
|
//**************************************************************************
|
||||||
private:
|
private:
|
||||||
//**************************************************************************
|
//**************************************************************************
|
||||||
@ -525,6 +532,8 @@ private:
|
|||||||
|
|
||||||
int _blockNumber;
|
int _blockNumber;
|
||||||
NodeId _ownNodeId;
|
NodeId _ownNodeId;
|
||||||
|
SocketServer *m_socket_server;
|
||||||
|
|
||||||
BlockReference _ownReference;
|
BlockReference _ownReference;
|
||||||
NdbMutex *m_configMutex;
|
NdbMutex *m_configMutex;
|
||||||
const Config * _config;
|
const Config * _config;
|
||||||
|
@ -242,6 +242,8 @@ ParserRow<MgmApiSession> commands[] = {
|
|||||||
MGM_ARG("node", Int, Optional, "Node"),
|
MGM_ARG("node", Int, Optional, "Node"),
|
||||||
MGM_ARG("filter", String, Mandatory, "Event category"),
|
MGM_ARG("filter", String, Mandatory, "Event category"),
|
||||||
|
|
||||||
|
MGM_CMD("purge stale sessions", &MgmApiSession::purge_stale_sessions, ""),
|
||||||
|
|
||||||
MGM_END()
|
MGM_END()
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -1412,6 +1414,46 @@ done:
|
|||||||
m_output->println("");
|
m_output->println("");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct PurgeStruct
|
||||||
|
{
|
||||||
|
NodeBitmask free_nodes;/* free nodes as reported
|
||||||
|
* by ndbd in apiRegReqConf
|
||||||
|
*/
|
||||||
|
BaseString *str;
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data)
|
||||||
|
{
|
||||||
|
MgmApiSession *s= (MgmApiSession *)_s;
|
||||||
|
struct PurgeStruct &ps= *(struct PurgeStruct *)data;
|
||||||
|
if (s->m_allocated_resources->is_reserved(ps.free_nodes))
|
||||||
|
{
|
||||||
|
ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
|
||||||
|
s->stopSession();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
|
||||||
|
const class Properties &args)
|
||||||
|
{
|
||||||
|
struct PurgeStruct ps;
|
||||||
|
BaseString str;
|
||||||
|
ps.str = &str;
|
||||||
|
|
||||||
|
m_mgmsrv.get_connected_nodes(ps.free_nodes);
|
||||||
|
ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes
|
||||||
|
|
||||||
|
m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps);
|
||||||
|
|
||||||
|
m_output->println("purge stale sessions reply");
|
||||||
|
if (str.length() > 0)
|
||||||
|
m_output->println("purged:%s",str.c_str());
|
||||||
|
m_output->println("result: Ok");
|
||||||
|
m_output->println("");
|
||||||
|
}
|
||||||
|
|
||||||
template class MutexVector<int>;
|
template class MutexVector<int>;
|
||||||
template class Vector<ParserRow<MgmApiSession> const*>;
|
template class Vector<ParserRow<MgmApiSession> const*>;
|
||||||
template class Vector<unsigned short>;
|
template class Vector<unsigned short>;
|
||||||
|
@ -28,7 +28,9 @@
|
|||||||
/** Undefine this to remove backwards compatibility for "GET CONFIG". */
|
/** Undefine this to remove backwards compatibility for "GET CONFIG". */
|
||||||
#define MGM_GET_CONFIG_BACKWARDS_COMPAT
|
#define MGM_GET_CONFIG_BACKWARDS_COMPAT
|
||||||
|
|
||||||
class MgmApiSession : public SocketServer::Session {
|
class MgmApiSession : public SocketServer::Session
|
||||||
|
{
|
||||||
|
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
|
||||||
private:
|
private:
|
||||||
typedef Parser<MgmApiSession> Parser_t;
|
typedef Parser<MgmApiSession> Parser_t;
|
||||||
|
|
||||||
@ -84,6 +86,8 @@ public:
|
|||||||
|
|
||||||
void setParameter(Parser_t::Context &ctx, const class Properties &args);
|
void setParameter(Parser_t::Context &ctx, const class Properties &args);
|
||||||
void listen_event(Parser_t::Context &ctx, const class Properties &args);
|
void listen_event(Parser_t::Context &ctx, const class Properties &args);
|
||||||
|
|
||||||
|
void purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args);
|
||||||
|
|
||||||
void repCommand(Parser_t::Context &ctx, const class Properties &args);
|
void repCommand(Parser_t::Context &ctx, const class Properties &args);
|
||||||
};
|
};
|
||||||
|
@ -83,7 +83,6 @@ struct MgmGlobals {
|
|||||||
int g_no_nodeid_checks= 0;
|
int g_no_nodeid_checks= 0;
|
||||||
static MgmGlobals glob;
|
static MgmGlobals glob;
|
||||||
|
|
||||||
|
|
||||||
/******************************************************************************
|
/******************************************************************************
|
||||||
* Function prototypes
|
* Function prototypes
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
@ -226,7 +225,7 @@ int main(int argc, char** argv)
|
|||||||
if (!readGlobalConfig())
|
if (!readGlobalConfig())
|
||||||
goto error_end;
|
goto error_end;
|
||||||
|
|
||||||
glob.mgmObject = new MgmtSrvr(glob.localNodeId,
|
glob.mgmObject = new MgmtSrvr(glob.localNodeId, glob.socketServer,
|
||||||
BaseString(glob.config_filename),
|
BaseString(glob.config_filename),
|
||||||
local_config,
|
local_config,
|
||||||
glob.cluster_config);
|
glob.cluster_config);
|
||||||
|
Reference in New Issue
Block a user