diff --git a/ndb/include/mgmcommon/ConfigRetriever.hpp b/ndb/include/mgmcommon/ConfigRetriever.hpp index dd34eb0cfbb..00f5ef71160 100644 --- a/ndb/include/mgmcommon/ConfigRetriever.hpp +++ b/ndb/include/mgmcommon/ConfigRetriever.hpp @@ -37,7 +37,7 @@ public: */ int init(); - int do_connect(); + int do_connect(int exit_on_connect_failure= false); /** * Get configuration for current (nodeId given in local config file) node. diff --git a/ndb/include/mgmcommon/NdbConfig.h b/ndb/include/mgmcommon/NdbConfig.h index eb90f5e7c78..1bca825ab8d 100644 --- a/ndb/include/mgmcommon/NdbConfig.h +++ b/ndb/include/mgmcommon/NdbConfig.h @@ -21,6 +21,7 @@ extern "C" { #endif +const char* NdbConfig_get_path(int *len); void NdbConfig_SetPath(const char *path); char* NdbConfig_NdbCfgName(int with_ndb_home); char* NdbConfig_ErrorFileName(int node_id); diff --git a/ndb/include/ndbapi/ndb_cluster_connection.hpp b/ndb/include/ndbapi/ndb_cluster_connection.hpp index 5c3f53dd870..59d5a038844 100644 --- a/ndb/include/ndbapi/ndb_cluster_connection.hpp +++ b/ndb/include/ndbapi/ndb_cluster_connection.hpp @@ -20,16 +20,26 @@ class TransporterFacade; class ConfigRetriever; +class NdbThread; + +extern "C" { + void* run_ndb_cluster_connection_connect_thread(void*); +} class Ndb_cluster_connection { public: Ndb_cluster_connection(const char * connect_string = 0); ~Ndb_cluster_connection(); - int connect(); + int connect(int reconnect= 0); + int start_connect_thread(int (*connect_callback)(void)= 0); private: + friend void* run_ndb_cluster_connection_connect_thread(void*); + void connect_thread(); char *m_connect_string; TransporterFacade *m_facade; ConfigRetriever *m_config_retriever; + NdbThread *m_connect_thread; + int (*m_connect_callback)(void); }; #endif diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index afee135d793..26bac2cf10b 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -78,7 +78,7 @@ ConfigRetriever::init() { } int -ConfigRetriever::do_connect(){ +ConfigRetriever::do_connect(int exit_on_connect_failure){ if(!m_handle) m_handle= ndb_mgm_create_handle(); @@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){ if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) { return 0; } + if (exit_on_connect_failure) + return 1; setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle)); case MgmId_File: break; diff --git a/ndb/src/common/mgmcommon/NdbConfig.c b/ndb/src/common/mgmcommon/NdbConfig.c index c3f4abf61a7..6c93f9fc7cf 100644 --- a/ndb/src/common/mgmcommon/NdbConfig.c +++ b/ndb/src/common/mgmcommon/NdbConfig.c @@ -21,27 +21,34 @@ static char *datadir_path= 0; +const char * +NdbConfig_get_path(int *_len) +{ + const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); + int path_len= 0; + if (path) + path_len= strlen(path); + if (path_len == 0 && datadir_path) { + path= datadir_path; + path_len= strlen(path); + } + if (path_len == 0) { + path= "."; + path_len= strlen(path); + } + if (_len) + *_len= path_len; + return path; +} + static char* NdbConfig_AllocHomePath(int _len) { - const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); - int len= _len; - int path_len= 0; - char *buf; - - if (path == 0) - path= datadir_path; - - if (path) - path_len= strlen(path); - - len+= path_len; - buf= NdbMem_Allocate(len); - if (path_len > 0) - snprintf(buf, len, "%s%s", path, DIR_SEPARATOR); - else - buf[0]= 0; - + int path_len; + const char *path= NdbConfig_get_path(&path_len); + int len= _len+path_len; + char *buf= NdbMem_Allocate(len); + snprintf(buf, len, "%s%s", path, DIR_SEPARATOR); return buf; } diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp index 1f080b003bc..491733975a8 100644 --- a/ndb/src/kernel/main.cpp +++ b/ndb/src/kernel/main.cpp @@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){ theConfig->fetch_configuration(); } + chdir(NdbConfig_get_path(0)); + if (theConfig->getDaemonMode()) { // Become a daemon char *lockfile= NdbConfig_PidFileName(globalData.ownId); diff --git a/ndb/src/kernel/vm/WatchDog.cpp b/ndb/src/kernel/vm/WatchDog.cpp index a90f63aff37..4e07dc1df90 100644 --- a/ndb/src/kernel/vm/WatchDog.cpp +++ b/ndb/src/kernel/vm/WatchDog.cpp @@ -15,6 +15,9 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include +#include + #include "WatchDog.hpp" #include "GlobalData.hpp" #include @@ -24,7 +27,9 @@ extern "C" void* runWatchDog(void* w){ + my_thread_init(); ((WatchDog*)w)->run(); + my_thread_end(); NdbThread_Exit(0); return NULL; } diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index b9947fcf0e7..17a0e443b48 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include +#include #include #include @@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): { ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); - noOfConnectedNodes = 0; + noOfConnectedNodes= 0; + theClusterMgrThread= 0; } ClusterMgr::~ClusterMgr(){ @@ -137,20 +139,21 @@ ClusterMgr::startThread() { void ClusterMgr::doStop( ){ + DBUG_ENTER("ClusterMgr::doStop"); NdbMutex_Lock(clusterMgrThreadMutex); - if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); - return; + DBUG_VOID_RETURN; } - void *status; theStop = 1; - - NdbThread_WaitFor(theClusterMgrThread, &status); - NdbThread_Destroy(&theClusterMgrThread); - + if (theClusterMgrThread) { + NdbThread_WaitFor(theClusterMgrThread, &status); + NdbThread_Destroy(&theClusterMgrThread); + theClusterMgrThread= 0; + } NdbMutex_Unlock(clusterMgrThreadMutex); + DBUG_VOID_RETURN; } void @@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData) void ArbitMgr::doStop(const Uint32* theData) { + DBUG_ENTER("ArbitMgr::doStop"); ArbitSignal aSignal; NdbMutex_Lock(theThreadMutex); if (theThread != NULL) { @@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData) theState = StateInit; } NdbMutex_Unlock(theThreadMutex); + DBUG_VOID_RETURN; } // private methods @@ -548,7 +553,9 @@ extern "C" void* runArbitMgr_C(void* me) { + my_thread_init(); ((ArbitMgr*) me)->threadMain(); + my_thread_end(); NdbThread_Exit(0); return NULL; } diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index 67c768e2594..556e37ba3a0 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include +#include #include #include "TransporterFacade.hpp" #include "ClusterMgr.hpp" @@ -329,14 +330,6 @@ copy(Uint32 * & insertPtr, abort(); } -extern "C" -void -atexit_stop_instance(){ - DBUG_ENTER("atexit_stop_instance"); - TransporterFacade::stop_instance(); - DBUG_VOID_RETURN; -} - /** * Note that this function need no locking since its * only called from the constructor of Ndb (the NdbObject) @@ -352,11 +345,6 @@ TransporterFacade::start_instance(int nodeId, return -1; } - /** - * Install atexit handler - */ - atexit(atexit_stop_instance); - /** * Install signal handler for SIGPIPE * @@ -379,14 +367,8 @@ TransporterFacade::start_instance(int nodeId, void TransporterFacade::stop_instance(){ DBUG_ENTER("TransporterFacade::stop_instance"); - if(theFacadeInstance == NULL){ - /** - * We are called from atexit function - */ - DBUG_VOID_RETURN; - } - - theFacadeInstance->doStop(); + if(theFacadeInstance) + theFacadeInstance->doStop(); DBUG_VOID_RETURN; } @@ -405,10 +387,16 @@ TransporterFacade::doStop(){ */ void *status; theStopReceive = 1; - NdbThread_WaitFor(theReceiveThread, &status); - NdbThread_WaitFor(theSendThread, &status); - NdbThread_Destroy(&theReceiveThread); - NdbThread_Destroy(&theSendThread); + if (theReceiveThread) { + NdbThread_WaitFor(theReceiveThread, &status); + NdbThread_Destroy(&theReceiveThread); + theReceiveThread= 0; + } + if (theSendThread) { + NdbThread_WaitFor(theSendThread, &status); + NdbThread_Destroy(&theSendThread); + theSendThread= 0; + } DBUG_VOID_RETURN; } @@ -416,7 +404,9 @@ extern "C" void* runSendRequest_C(void * me) { + my_thread_init(); ((TransporterFacade*) me)->threadMainSend(); + my_thread_end(); NdbThread_Exit(0); return me; } @@ -459,7 +449,9 @@ extern "C" void* runReceiveResponse_C(void * me) { + my_thread_init(); ((TransporterFacade*) me)->threadMainReceive(); + my_thread_end(); NdbThread_Exit(0); return me; } diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp index c85698780f5..9fb7a563807 100644 --- a/ndb/src/ndbapi/ndb_cluster_connection.cpp +++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -15,16 +15,19 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include -#include +#include #include #include #include #include +#include #include #include #include +static int g_run_connect_thread= 0; + Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) { m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade(); @@ -33,26 +36,75 @@ Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) else m_connect_string= 0; m_config_retriever= 0; + m_connect_thread= 0; + m_connect_callback= 0; } -int Ndb_cluster_connection::connect() +extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me) +{ + my_thread_init(); + g_run_connect_thread= 1; + ((Ndb_cluster_connection*) me)->connect_thread(); + my_thread_end(); + NdbThread_Exit(0); + return me; +} + +void Ndb_cluster_connection::connect_thread() +{ + DBUG_ENTER("Ndb_cluster_connection::connect_thread"); + int r; + while (g_run_connect_thread) { + if ((r = connect(1)) == 0) + break; + if (r == -1) { + printf("Ndb_cluster_connection::connect_thread error\n"); + abort(); + } + } + if (m_connect_callback) + (*m_connect_callback)(); + DBUG_VOID_RETURN; +} + +int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void)) +{ + DBUG_ENTER("Ndb_cluster_connection::start_connect_thread"); + m_connect_callback= connect_callback; + m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread, + (void**)this, + 32768, + "ndb_cluster_connection", + NDB_THREAD_PRIO_LOW); + DBUG_RETURN(0); +} + +int Ndb_cluster_connection::connect(int reconnect) { DBUG_ENTER("Ndb_cluster_connection::connect"); - if (m_config_retriever != 0) { - DBUG_RETURN(0); - } - - m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API); - m_config_retriever->setConnectString(m_connect_string); - const char* error = 0; do { - if(m_config_retriever->init() == -1) - break; - - if(m_config_retriever->do_connect() == -1) - break; - + if (m_config_retriever == 0) + { + m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API); + m_config_retriever->setConnectString(m_connect_string); + if(m_config_retriever->init() == -1) + break; + } + else + if (reconnect == 0) + DBUG_RETURN(0); + if (reconnect) + { + int r= m_config_retriever->do_connect(1); + if (r == 1) + DBUG_RETURN(1); // mgmt server not up yet + if (r == -1) + break; + } + else + if(m_config_retriever->do_connect() == -1) + break; Uint32 nodeId = m_config_retriever->allocNodeId(); for(Uint32 i = 0; nodeId == 0 && i<5; i++){ NdbSleep_SecSleep(3); @@ -60,15 +112,12 @@ int Ndb_cluster_connection::connect() } if(nodeId == 0) break; - ndb_mgm_configuration * props = m_config_retriever->getConfig(); if(props == 0) break; m_facade->start_instance(nodeId, props); free(props); - m_facade->connected(); - DBUG_RETURN(0); } while(0); @@ -83,7 +132,16 @@ int Ndb_cluster_connection::connect() Ndb_cluster_connection::~Ndb_cluster_connection() { - if (m_facade != 0) { + if (m_connect_thread) + { + void *status; + g_run_connect_thread= 0; + NdbThread_WaitFor(m_connect_thread, &status); + NdbThread_Destroy(&m_connect_thread); + m_connect_thread= 0; + } + if (m_facade != 0) + { delete m_facade; if (m_facade != TransporterFacade::theFacadeInstance) abort();