mirror of
https://github.com/MariaDB/server.git
synced 2025-08-01 03:47:19 +03:00
added connect thread to Ndb_cluster_connection +
some other small fixes ndb/include/mgmcommon/ConfigRetriever.hpp: added options to do_connect to contol how connects failures should be treated ndb/include/mgmcommon/NdbConfig.h: method to retrieve datadir path (to user for chdir) ndb/include/ndbapi/ndb_cluster_connection.hpp: Added connect thread ndb/src/common/mgmcommon/ConfigRetriever.cpp: added options to do_connect to contol how connects failures should be treated ndb/src/common/mgmcommon/NdbConfig.c: method to retrieve datadir path (to user for chdir) ndb/src/kernel/main.cpp: ndbd to do chdir ndb/src/kernel/vm/WatchDog.cpp: added my_thread_init for debug ndb/src/ndbapi/ClusterMgr.cpp: added my_thread_init for debug ndb/src/ndbapi/TransporterFacade.cpp: removed call to atexit ndb/src/ndbapi/ndb_cluster_connection.cpp: added connect thread
This commit is contained in:
@ -37,7 +37,7 @@ public:
|
||||
*/
|
||||
int init();
|
||||
|
||||
int do_connect();
|
||||
int do_connect(int exit_on_connect_failure= false);
|
||||
|
||||
/**
|
||||
* Get configuration for current (nodeId given in local config file) node.
|
||||
|
@ -21,6 +21,7 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
const char* NdbConfig_get_path(int *len);
|
||||
void NdbConfig_SetPath(const char *path);
|
||||
char* NdbConfig_NdbCfgName(int with_ndb_home);
|
||||
char* NdbConfig_ErrorFileName(int node_id);
|
||||
|
@ -20,16 +20,26 @@
|
||||
|
||||
class TransporterFacade;
|
||||
class ConfigRetriever;
|
||||
class NdbThread;
|
||||
|
||||
extern "C" {
|
||||
void* run_ndb_cluster_connection_connect_thread(void*);
|
||||
}
|
||||
|
||||
class Ndb_cluster_connection {
|
||||
public:
|
||||
Ndb_cluster_connection(const char * connect_string = 0);
|
||||
~Ndb_cluster_connection();
|
||||
int connect();
|
||||
int connect(int reconnect= 0);
|
||||
int start_connect_thread(int (*connect_callback)(void)= 0);
|
||||
private:
|
||||
friend void* run_ndb_cluster_connection_connect_thread(void*);
|
||||
void connect_thread();
|
||||
char *m_connect_string;
|
||||
TransporterFacade *m_facade;
|
||||
ConfigRetriever *m_config_retriever;
|
||||
NdbThread *m_connect_thread;
|
||||
int (*m_connect_callback)(void);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -78,7 +78,7 @@ ConfigRetriever::init() {
|
||||
}
|
||||
|
||||
int
|
||||
ConfigRetriever::do_connect(){
|
||||
ConfigRetriever::do_connect(int exit_on_connect_failure){
|
||||
|
||||
if(!m_handle)
|
||||
m_handle= ndb_mgm_create_handle();
|
||||
@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){
|
||||
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (exit_on_connect_failure)
|
||||
return 1;
|
||||
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
|
||||
case MgmId_File:
|
||||
break;
|
||||
|
@ -21,27 +21,34 @@
|
||||
|
||||
static char *datadir_path= 0;
|
||||
|
||||
const char *
|
||||
NdbConfig_get_path(int *_len)
|
||||
{
|
||||
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
|
||||
int path_len= 0;
|
||||
if (path)
|
||||
path_len= strlen(path);
|
||||
if (path_len == 0 && datadir_path) {
|
||||
path= datadir_path;
|
||||
path_len= strlen(path);
|
||||
}
|
||||
if (path_len == 0) {
|
||||
path= ".";
|
||||
path_len= strlen(path);
|
||||
}
|
||||
if (_len)
|
||||
*_len= path_len;
|
||||
return path;
|
||||
}
|
||||
|
||||
static char*
|
||||
NdbConfig_AllocHomePath(int _len)
|
||||
{
|
||||
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
|
||||
int len= _len;
|
||||
int path_len= 0;
|
||||
char *buf;
|
||||
|
||||
if (path == 0)
|
||||
path= datadir_path;
|
||||
|
||||
if (path)
|
||||
path_len= strlen(path);
|
||||
|
||||
len+= path_len;
|
||||
buf= NdbMem_Allocate(len);
|
||||
if (path_len > 0)
|
||||
int path_len;
|
||||
const char *path= NdbConfig_get_path(&path_len);
|
||||
int len= _len+path_len;
|
||||
char *buf= NdbMem_Allocate(len);
|
||||
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR);
|
||||
else
|
||||
buf[0]= 0;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){
|
||||
theConfig->fetch_configuration();
|
||||
}
|
||||
|
||||
chdir(NdbConfig_get_path(0));
|
||||
|
||||
if (theConfig->getDaemonMode()) {
|
||||
// Become a daemon
|
||||
char *lockfile= NdbConfig_PidFileName(globalData.ownId);
|
||||
|
@ -15,6 +15,9 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
|
||||
#include "WatchDog.hpp"
|
||||
#include "GlobalData.hpp"
|
||||
#include <NdbOut.hpp>
|
||||
@ -24,7 +27,9 @@
|
||||
extern "C"
|
||||
void*
|
||||
runWatchDog(void* w){
|
||||
my_thread_init();
|
||||
((WatchDog*)w)->run();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
#include <ndb_limits.h>
|
||||
#include <ndb_version.h>
|
||||
|
||||
@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
|
||||
{
|
||||
ndbSetOwnVersion();
|
||||
clusterMgrThreadMutex = NdbMutex_Create();
|
||||
noOfConnectedNodes = 0;
|
||||
noOfConnectedNodes= 0;
|
||||
theClusterMgrThread= 0;
|
||||
}
|
||||
|
||||
ClusterMgr::~ClusterMgr(){
|
||||
@ -137,20 +139,21 @@ ClusterMgr::startThread() {
|
||||
|
||||
void
|
||||
ClusterMgr::doStop( ){
|
||||
DBUG_ENTER("ClusterMgr::doStop");
|
||||
NdbMutex_Lock(clusterMgrThreadMutex);
|
||||
|
||||
if(theStop){
|
||||
NdbMutex_Unlock(clusterMgrThreadMutex);
|
||||
return;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void *status;
|
||||
theStop = 1;
|
||||
|
||||
if (theClusterMgrThread) {
|
||||
NdbThread_WaitFor(theClusterMgrThread, &status);
|
||||
NdbThread_Destroy(&theClusterMgrThread);
|
||||
|
||||
theClusterMgrThread= 0;
|
||||
}
|
||||
NdbMutex_Unlock(clusterMgrThreadMutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData)
|
||||
void
|
||||
ArbitMgr::doStop(const Uint32* theData)
|
||||
{
|
||||
DBUG_ENTER("ArbitMgr::doStop");
|
||||
ArbitSignal aSignal;
|
||||
NdbMutex_Lock(theThreadMutex);
|
||||
if (theThread != NULL) {
|
||||
@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData)
|
||||
theState = StateInit;
|
||||
}
|
||||
NdbMutex_Unlock(theThreadMutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
// private methods
|
||||
@ -548,7 +553,9 @@ extern "C"
|
||||
void*
|
||||
runArbitMgr_C(void* me)
|
||||
{
|
||||
my_thread_init();
|
||||
((ArbitMgr*) me)->threadMain();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
#include <ndb_limits.h>
|
||||
#include "TransporterFacade.hpp"
|
||||
#include "ClusterMgr.hpp"
|
||||
@ -329,14 +330,6 @@ copy(Uint32 * & insertPtr,
|
||||
abort();
|
||||
}
|
||||
|
||||
extern "C"
|
||||
void
|
||||
atexit_stop_instance(){
|
||||
DBUG_ENTER("atexit_stop_instance");
|
||||
TransporterFacade::stop_instance();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that this function need no locking since its
|
||||
* only called from the constructor of Ndb (the NdbObject)
|
||||
@ -352,11 +345,6 @@ TransporterFacade::start_instance(int nodeId,
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Install atexit handler
|
||||
*/
|
||||
atexit(atexit_stop_instance);
|
||||
|
||||
/**
|
||||
* Install signal handler for SIGPIPE
|
||||
*
|
||||
@ -379,13 +367,7 @@ TransporterFacade::start_instance(int nodeId,
|
||||
void
|
||||
TransporterFacade::stop_instance(){
|
||||
DBUG_ENTER("TransporterFacade::stop_instance");
|
||||
if(theFacadeInstance == NULL){
|
||||
/**
|
||||
* We are called from atexit function
|
||||
*/
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
if(theFacadeInstance)
|
||||
theFacadeInstance->doStop();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
@ -405,10 +387,16 @@ TransporterFacade::doStop(){
|
||||
*/
|
||||
void *status;
|
||||
theStopReceive = 1;
|
||||
if (theReceiveThread) {
|
||||
NdbThread_WaitFor(theReceiveThread, &status);
|
||||
NdbThread_WaitFor(theSendThread, &status);
|
||||
NdbThread_Destroy(&theReceiveThread);
|
||||
theReceiveThread= 0;
|
||||
}
|
||||
if (theSendThread) {
|
||||
NdbThread_WaitFor(theSendThread, &status);
|
||||
NdbThread_Destroy(&theSendThread);
|
||||
theSendThread= 0;
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
@ -416,7 +404,9 @@ extern "C"
|
||||
void*
|
||||
runSendRequest_C(void * me)
|
||||
{
|
||||
my_thread_init();
|
||||
((TransporterFacade*) me)->threadMainSend();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
@ -459,7 +449,9 @@ extern "C"
|
||||
void*
|
||||
runReceiveResponse_C(void * me)
|
||||
{
|
||||
my_thread_init();
|
||||
((TransporterFacade*) me)->threadMainReceive();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
|
@ -15,16 +15,19 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <pthread.h>
|
||||
#include <my_pthread.h>
|
||||
|
||||
#include <ndb_cluster_connection.hpp>
|
||||
#include <TransporterFacade.hpp>
|
||||
#include <NdbOut.hpp>
|
||||
#include <NdbSleep.h>
|
||||
#include <NdbThread.h>
|
||||
#include <ndb_limits.h>
|
||||
#include <ConfigRetriever.hpp>
|
||||
#include <ndb_version.h>
|
||||
|
||||
static int g_run_connect_thread= 0;
|
||||
|
||||
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
|
||||
{
|
||||
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
|
||||
@ -33,26 +36,75 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
|
||||
else
|
||||
m_connect_string= 0;
|
||||
m_config_retriever= 0;
|
||||
m_connect_thread= 0;
|
||||
m_connect_callback= 0;
|
||||
}
|
||||
|
||||
int Ndb_cluster_connection::connect()
|
||||
extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
|
||||
{
|
||||
my_thread_init();
|
||||
g_run_connect_thread= 1;
|
||||
((Ndb_cluster_connection*) me)->connect_thread();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
|
||||
void Ndb_cluster_connection::connect_thread()
|
||||
{
|
||||
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
|
||||
int r;
|
||||
while (g_run_connect_thread) {
|
||||
if ((r = connect(1)) == 0)
|
||||
break;
|
||||
if (r == -1) {
|
||||
printf("Ndb_cluster_connection::connect_thread error\n");
|
||||
abort();
|
||||
}
|
||||
}
|
||||
if (m_connect_callback)
|
||||
(*m_connect_callback)();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
|
||||
{
|
||||
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
|
||||
m_connect_callback= connect_callback;
|
||||
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
|
||||
(void**)this,
|
||||
32768,
|
||||
"ndb_cluster_connection",
|
||||
NDB_THREAD_PRIO_LOW);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int Ndb_cluster_connection::connect(int reconnect)
|
||||
{
|
||||
DBUG_ENTER("Ndb_cluster_connection::connect");
|
||||
if (m_config_retriever != 0) {
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
|
||||
m_config_retriever->setConnectString(m_connect_string);
|
||||
|
||||
const char* error = 0;
|
||||
do {
|
||||
if (m_config_retriever == 0)
|
||||
{
|
||||
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
|
||||
m_config_retriever->setConnectString(m_connect_string);
|
||||
if(m_config_retriever->init() == -1)
|
||||
break;
|
||||
|
||||
}
|
||||
else
|
||||
if (reconnect == 0)
|
||||
DBUG_RETURN(0);
|
||||
if (reconnect)
|
||||
{
|
||||
int r= m_config_retriever->do_connect(1);
|
||||
if (r == 1)
|
||||
DBUG_RETURN(1); // mgmt server not up yet
|
||||
if (r == -1)
|
||||
break;
|
||||
}
|
||||
else
|
||||
if(m_config_retriever->do_connect() == -1)
|
||||
break;
|
||||
|
||||
Uint32 nodeId = m_config_retriever->allocNodeId();
|
||||
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
|
||||
NdbSleep_SecSleep(3);
|
||||
@ -60,15 +112,12 @@ int Ndb_cluster_connection::connect()
|
||||
}
|
||||
if(nodeId == 0)
|
||||
break;
|
||||
|
||||
ndb_mgm_configuration * props = m_config_retriever->getConfig();
|
||||
if(props == 0)
|
||||
break;
|
||||
m_facade->start_instance(nodeId, props);
|
||||
free(props);
|
||||
|
||||
m_facade->connected();
|
||||
|
||||
DBUG_RETURN(0);
|
||||
} while(0);
|
||||
|
||||
@ -83,7 +132,16 @@ int Ndb_cluster_connection::connect()
|
||||
|
||||
Ndb_cluster_connection::~Ndb_cluster_connection()
|
||||
{
|
||||
if (m_facade != 0) {
|
||||
if (m_connect_thread)
|
||||
{
|
||||
void *status;
|
||||
g_run_connect_thread= 0;
|
||||
NdbThread_WaitFor(m_connect_thread, &status);
|
||||
NdbThread_Destroy(&m_connect_thread);
|
||||
m_connect_thread= 0;
|
||||
}
|
||||
if (m_facade != 0)
|
||||
{
|
||||
delete m_facade;
|
||||
if (m_facade != TransporterFacade::theFacadeInstance)
|
||||
abort();
|
||||
|
Reference in New Issue
Block a user