diff --git a/ndb/include/Makefile.am b/ndb/include/Makefile.am index b1b7951f216..7b3f80b5560 100644 --- a/ndb/include/Makefile.am +++ b/ndb/include/Makefile.am @@ -16,6 +16,7 @@ ndbapi/NdbError.hpp \ ndbapi/NdbEventOperation.hpp \ ndbapi/NdbIndexOperation.hpp \ ndbapi/NdbOperation.hpp \ +ndbapi/ndb_cluster_connection.hpp \ ndbapi/NdbBlob.hpp \ ndbapi/NdbPool.hpp \ ndbapi/NdbRecAttr.hpp \ diff --git a/ndb/include/mgmcommon/ConfigRetriever.hpp b/ndb/include/mgmcommon/ConfigRetriever.hpp index dd34eb0cfbb..00f5ef71160 100644 --- a/ndb/include/mgmcommon/ConfigRetriever.hpp +++ b/ndb/include/mgmcommon/ConfigRetriever.hpp @@ -37,7 +37,7 @@ public: */ int init(); - int do_connect(); + int do_connect(int exit_on_connect_failure= false); /** * Get configuration for current (nodeId given in local config file) node. diff --git a/ndb/include/mgmcommon/NdbConfig.h b/ndb/include/mgmcommon/NdbConfig.h index eb90f5e7c78..1bca825ab8d 100644 --- a/ndb/include/mgmcommon/NdbConfig.h +++ b/ndb/include/mgmcommon/NdbConfig.h @@ -21,6 +21,7 @@ extern "C" { #endif +const char* NdbConfig_get_path(int *len); void NdbConfig_SetPath(const char *path); char* NdbConfig_NdbCfgName(int with_ndb_home); char* NdbConfig_ErrorFileName(int node_id); diff --git a/ndb/include/ndbapi/Ndb.hpp b/ndb/include/ndbapi/Ndb.hpp index 7904ecef305..387447f00f4 100644 --- a/ndb/include/ndbapi/Ndb.hpp +++ b/ndb/include/ndbapi/Ndb.hpp @@ -860,6 +860,7 @@ #include #include +#include #include #include @@ -992,6 +993,8 @@ public: * deprecated. */ Ndb(const char* aCatalogName = "", const char* aSchemaName = "def"); + Ndb(Ndb_cluster_connection *ndb_cluster_connection, + const char* aCatalogName = "", const char* aSchemaName = "def"); ~Ndb(); @@ -1081,7 +1084,10 @@ public: * @return 0: Ndb is ready and timeout has not occurred.
* -1: Timeout has expired */ + int waitUntilReady(int timeout = 60); + + void connected(Uint32 block_reference); /** @} *********************************************************************/ @@ -1447,6 +1453,9 @@ public: ****************************************************************************/ private: + void setup(Ndb_cluster_connection *ndb_cluster_connection, + const char* aCatalogName, const char* aSchemaName); + NdbConnection* startTransactionLocal(Uint32 aPrio, Uint32 aFragmentId); // Connect the connection object to the Database. @@ -1585,6 +1594,7 @@ private: * These are the private variables in this class. *****************************************************************************/ NdbObjectIdMap* theNdbObjectIdMap; + Ndb_cluster_connection *m_ndb_cluster_connection; NdbConnection** thePreparedTransactionsArray; NdbConnection** theSentTransactionsArray; @@ -1703,7 +1713,7 @@ private: static void executeMessage(void*, NdbApiSignal *, struct LinearSectionPtr ptr[3]); - static void statusMessage(void*, Uint16, bool, bool); + static void statusMessage(void*, Uint32, bool, bool); #ifdef VM_TRACE void printState(const char* fmt, ...); #endif diff --git a/ndb/include/ndbapi/ndb_cluster_connection.hpp b/ndb/include/ndbapi/ndb_cluster_connection.hpp new file mode 100644 index 00000000000..59d5a038844 --- /dev/null +++ b/ndb/include/ndbapi/ndb_cluster_connection.hpp @@ -0,0 +1,45 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + + +#ifndef CLUSTER_CONNECTION_HPP +#define CLUSTER_CONNECTION_HPP + +class TransporterFacade; +class ConfigRetriever; +class NdbThread; + +extern "C" { + void* run_ndb_cluster_connection_connect_thread(void*); +} + +class Ndb_cluster_connection { +public: + Ndb_cluster_connection(const char * connect_string = 0); + ~Ndb_cluster_connection(); + int connect(int reconnect= 0); + int start_connect_thread(int (*connect_callback)(void)= 0); +private: + friend void* run_ndb_cluster_connection_connect_thread(void*); + void connect_thread(); + char *m_connect_string; + TransporterFacade *m_facade; + ConfigRetriever *m_config_retriever; + NdbThread *m_connect_thread; + int (*m_connect_callback)(void); +}; + +#endif diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index afee135d793..26bac2cf10b 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -78,7 +78,7 @@ ConfigRetriever::init() { } int -ConfigRetriever::do_connect(){ +ConfigRetriever::do_connect(int exit_on_connect_failure){ if(!m_handle) m_handle= ndb_mgm_create_handle(); @@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){ if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) { return 0; } + if (exit_on_connect_failure) + return 1; setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle)); case MgmId_File: break; diff --git a/ndb/src/common/mgmcommon/NdbConfig.c b/ndb/src/common/mgmcommon/NdbConfig.c index c3f4abf61a7..6c93f9fc7cf 100644 --- a/ndb/src/common/mgmcommon/NdbConfig.c +++ b/ndb/src/common/mgmcommon/NdbConfig.c @@ -21,27 +21,34 @@ static char *datadir_path= 0; +const char * +NdbConfig_get_path(int *_len) +{ + const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); + int path_len= 0; + if (path) + path_len= strlen(path); + if (path_len == 0 && datadir_path) { + path= datadir_path; + path_len= strlen(path); + } + if (path_len == 0) { + path= "."; + path_len= strlen(path); + } + if (_len) + *_len= path_len; + return path; +} + static char* NdbConfig_AllocHomePath(int _len) { - const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0); - int len= _len; - int path_len= 0; - char *buf; - - if (path == 0) - path= datadir_path; - - if (path) - path_len= strlen(path); - - len+= path_len; - buf= NdbMem_Allocate(len); - if (path_len > 0) - snprintf(buf, len, "%s%s", path, DIR_SEPARATOR); - else - buf[0]= 0; - + int path_len; + const char *path= NdbConfig_get_path(&path_len); + int len= _len+path_len; + char *buf= NdbMem_Allocate(len); + snprintf(buf, len, "%s%s", path, DIR_SEPARATOR); return buf; } diff --git a/ndb/src/kernel/main.cpp b/ndb/src/kernel/main.cpp index 1f080b003bc..491733975a8 100644 --- a/ndb/src/kernel/main.cpp +++ b/ndb/src/kernel/main.cpp @@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){ theConfig->fetch_configuration(); } + chdir(NdbConfig_get_path(0)); + if (theConfig->getDaemonMode()) { // Become a daemon char *lockfile= NdbConfig_PidFileName(globalData.ownId); diff --git a/ndb/src/kernel/vm/WatchDog.cpp b/ndb/src/kernel/vm/WatchDog.cpp index a90f63aff37..4e07dc1df90 100644 --- a/ndb/src/kernel/vm/WatchDog.cpp +++ b/ndb/src/kernel/vm/WatchDog.cpp @@ -15,6 +15,9 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include +#include + #include "WatchDog.hpp" #include "GlobalData.hpp" #include @@ -24,7 +27,9 @@ extern "C" void* runWatchDog(void* w){ + my_thread_init(); ((WatchDog*)w)->run(); + my_thread_end(); NdbThread_Exit(0); return NULL; } diff --git a/ndb/src/mgmsrv/Makefile.am b/ndb/src/mgmsrv/Makefile.am index 60b579d18e1..8fa9ec5f63e 100644 --- a/ndb/src/mgmsrv/Makefile.am +++ b/ndb/src/mgmsrv/Makefile.am @@ -1,7 +1,8 @@ MYSQLDATAdir = $(localstatedir) MYSQLSHAREdir = $(pkgdatadir) MYSQLBASEdir= $(prefix) -MYSQLCLUSTERdir= $(prefix)/mysql-cluster +#MYSQLCLUSTERdir= $(prefix)/mysql-cluster +MYSQLCLUSTERdir= . ndbbin_PROGRAMS = ndb_mgmd diff --git a/ndb/src/mgmsrv/MgmtSrvr.cpp b/ndb/src/mgmsrv/MgmtSrvr.cpp index d6fe8a11bc7..0f50fd6c596 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -629,13 +629,16 @@ MgmtSrvr::start() if (!check_start()) return false; } - theFacade = TransporterFacade::start_instance - (_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues); - + theFacade= TransporterFacade::theFacadeInstance = new TransporterFacade(); if(theFacade == 0) { - DEBUG("MgmtSrvr.cpp: theFacade is NULL."); + DEBUG("MgmtSrvr.cpp: theFacade == 0."); return false; } + if ( theFacade->start_instance + (_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) { + DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0."); + return false; + } MGM_REQUIRE(_blockNumber == 1); @@ -2295,7 +2298,7 @@ MgmtSrvr::signalReceivedNotification(void* mgmtSrvr, //**************************************************************************** //**************************************************************************** void -MgmtSrvr::nodeStatusNotification(void* mgmSrv, NodeId nodeId, +MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId, bool alive, bool nfComplete) { if(!(!alive && nfComplete)) diff --git a/ndb/src/mgmsrv/MgmtSrvr.hpp b/ndb/src/mgmsrv/MgmtSrvr.hpp index b26eaeb4ab9..e910fb67449 100644 --- a/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -699,7 +699,7 @@ private: * shall receive the notification. * @param processId: Id of the dead process. */ - static void nodeStatusNotification(void* mgmSrv, NodeId nodeId, + static void nodeStatusNotification(void* mgmSrv, Uint32 nodeId, bool alive, bool nfCompleted); /** diff --git a/ndb/src/ndbapi/ClusterMgr.cpp b/ndb/src/ndbapi/ClusterMgr.cpp index b9947fcf0e7..17a0e443b48 100644 --- a/ndb/src/ndbapi/ClusterMgr.cpp +++ b/ndb/src/ndbapi/ClusterMgr.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include +#include #include #include @@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): { ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); - noOfConnectedNodes = 0; + noOfConnectedNodes= 0; + theClusterMgrThread= 0; } ClusterMgr::~ClusterMgr(){ @@ -137,20 +139,21 @@ ClusterMgr::startThread() { void ClusterMgr::doStop( ){ + DBUG_ENTER("ClusterMgr::doStop"); NdbMutex_Lock(clusterMgrThreadMutex); - if(theStop){ NdbMutex_Unlock(clusterMgrThreadMutex); - return; + DBUG_VOID_RETURN; } - void *status; theStop = 1; - - NdbThread_WaitFor(theClusterMgrThread, &status); - NdbThread_Destroy(&theClusterMgrThread); - + if (theClusterMgrThread) { + NdbThread_WaitFor(theClusterMgrThread, &status); + NdbThread_Destroy(&theClusterMgrThread); + theClusterMgrThread= 0; + } NdbMutex_Unlock(clusterMgrThreadMutex); + DBUG_VOID_RETURN; } void @@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData) void ArbitMgr::doStop(const Uint32* theData) { + DBUG_ENTER("ArbitMgr::doStop"); ArbitSignal aSignal; NdbMutex_Lock(theThreadMutex); if (theThread != NULL) { @@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData) theState = StateInit; } NdbMutex_Unlock(theThreadMutex); + DBUG_VOID_RETURN; } // private methods @@ -548,7 +553,9 @@ extern "C" void* runArbitMgr_C(void* me) { + my_thread_init(); ((ArbitMgr*) me)->threadMain(); + my_thread_end(); NdbThread_Exit(0); return NULL; } diff --git a/ndb/src/ndbapi/Makefile.am b/ndb/src/ndbapi/Makefile.am index 14badb0c62f..06128e047b6 100644 --- a/ndb/src/ndbapi/Makefile.am +++ b/ndb/src/ndbapi/Makefile.am @@ -34,6 +34,7 @@ libndbapi_la_SOURCES = \ NdbDictionary.cpp \ NdbDictionaryImpl.cpp \ DictCache.cpp \ + ndb_cluster_connection.cpp \ NdbBlob.cpp INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi diff --git a/ndb/src/ndbapi/Ndb.cpp b/ndb/src/ndbapi/Ndb.cpp index 0c91fcb178b..2c952425b1e 100644 --- a/ndb/src/ndbapi/Ndb.cpp +++ b/ndb/src/ndbapi/Ndb.cpp @@ -207,9 +207,11 @@ Remark: Disconnect all connections to the database. void Ndb::doDisconnect() { + DBUG_ENTER("Ndb::doDisconnect"); NdbConnection* tNdbCon; CHECK_STATUS_MACRO_VOID; + DBUG_PRINT("info", ("theNoOfDBnodes=%d", theNoOfDBnodes)); Uint32 tNoOfDbNodes = theNoOfDBnodes; UintR i; for (i = 0; i < tNoOfDbNodes; i++) { @@ -227,6 +229,7 @@ Ndb::doDisconnect() tNdbCon = tNdbCon->theNext; releaseConnectToNdb(tmpNdbCon); }//while + DBUG_VOID_RETURN; }//Ndb::disconnect() /***************************************************************************** @@ -239,6 +242,7 @@ Remark: Waits until a node has status != 0 int Ndb::waitUntilReady(int timeout) { + DBUG_ENTER("Ndb::waitUntilReady"); int secondsCounter = 0; int milliCounter = 0; int noChecksSinceFirstAliveFound = 0; @@ -246,7 +250,7 @@ Ndb::waitUntilReady(int timeout) if (theInitState != Initialised) { // Ndb::init is not called theError.code = 4256; - return -1; + DBUG_RETURN(-1); } do { @@ -265,13 +269,13 @@ Ndb::waitUntilReady(int timeout) tp->unlock_mutex(); if (foundAliveNode == theNoOfDBnodes) { - return 0; + DBUG_RETURN(0); }//if if (foundAliveNode > 0) { noChecksSinceFirstAliveFound++; }//if if (noChecksSinceFirstAliveFound > 30) { - return 0; + DBUG_RETURN(0); }//if NdbSleep_MilliSleep(100); milliCounter += 100; @@ -281,9 +285,9 @@ Ndb::waitUntilReady(int timeout) }//if } while ( secondsCounter < timeout ); if (noChecksSinceFirstAliveFound > 0) { - return 0; + DBUG_RETURN(0); }//if - return -1; + DBUG_RETURN(-1); } /***************************************************************************** @@ -1060,6 +1064,9 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes, * This algorithm should be implemented in Dbdih */ { + if (fragment2PrimaryNodeMap != 0) + abort(); + fragment2PrimaryNodeMap = new Uint32[noOfFragments]; Uint32 i; for(i = 0; i #include #include #include @@ -104,7 +92,9 @@ Remark: Deletes the connection object. *****************************************************************************/ NdbConnection::~NdbConnection() { + DBUG_ENTER("NdbConnection::~NdbConnection"); theNdb->theNdbObjectIdMap->unmap(theId, this); + DBUG_VOID_RETURN; }//NdbConnection::~NdbConnection() /***************************************************************************** diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/ndb/src/ndbapi/NdbDictionaryImpl.cpp index 6a776dfc6d2..b0c007d7790 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.cpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.cpp @@ -660,6 +660,7 @@ NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index, return getTable(m_ndb.externalizeTableName(internalName)); } +#if 0 bool NdbDictInterface::setTransporter(class TransporterFacade * tf) { @@ -683,11 +684,11 @@ NdbDictInterface::setTransporter(class TransporterFacade * tf) return true; } +#endif bool NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf) { - m_blockNumber = -1; m_reference = ndb->getReference(); m_transporter = tf; m_waiter.m_mutex = tf->theMutexPtr; @@ -697,10 +698,6 @@ NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf) NdbDictInterface::~NdbDictInterface() { - if (m_transporter != NULL){ - if (m_blockNumber != -1) - m_transporter->close(m_blockNumber, 0); - } } void @@ -787,7 +784,7 @@ NdbDictInterface::execSignal(void* dictImpl, } void -NdbDictInterface::execNodeStatus(void* dictImpl, NodeId aNode, +NdbDictInterface::execNodeStatus(void* dictImpl, Uint32 aNode, bool alive, bool nfCompleted) { NdbDictInterface * tmp = (NdbDictInterface*)dictImpl; diff --git a/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/ndb/src/ndbapi/NdbDictionaryImpl.hpp index aeb2682483c..d05c34520b8 100644 --- a/ndb/src/ndbapi/NdbDictionaryImpl.hpp +++ b/ndb/src/ndbapi/NdbDictionaryImpl.hpp @@ -241,7 +241,6 @@ public: NdbDictInterface(NdbError& err) : m_error(err) { m_reference = 0; m_masterNodeId = 0; - m_blockNumber = -1; m_transporter= NULL; } ~NdbDictInterface(); @@ -309,7 +308,6 @@ public: private: Uint32 m_reference; Uint32 m_masterNodeId; - int m_blockNumber; NdbWaiter m_waiter; class TransporterFacade * m_transporter; @@ -319,7 +317,7 @@ private: class NdbApiSignal* signal, class LinearSectionPtr ptr[3]); - static void execNodeStatus(void* dictImpl, NodeId, + static void execNodeStatus(void* dictImpl, Uint32, bool alive, bool nfCompleted); void execGET_TABINFO_REF(NdbApiSignal *, LinearSectionPtr ptr[3]); diff --git a/ndb/src/ndbapi/NdbReceiver.cpp b/ndb/src/ndbapi/NdbReceiver.cpp index 8cc17a9d1d7..caeeac80093 100644 --- a/ndb/src/ndbapi/NdbReceiver.cpp +++ b/ndb/src/ndbapi/NdbReceiver.cpp @@ -14,6 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include #include "NdbImpl.hpp" #include #include "NdbDictionaryImpl.hpp" @@ -36,10 +37,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) : NdbReceiver::~NdbReceiver() { + DBUG_ENTER("NdbReceiver::~NdbReceiver"); if (m_id != NdbObjectIdMap::InvalidId) { m_ndb->theNdbObjectIdMap->unmap(m_id, this); } delete[] m_rows; + DBUG_VOID_RETURN; } void diff --git a/ndb/src/ndbapi/Ndbif.cpp b/ndb/src/ndbapi/Ndbif.cpp index 7ad51efdb62..3e98640b9fb 100644 --- a/ndb/src/ndbapi/Ndbif.cpp +++ b/ndb/src/ndbapi/Ndbif.cpp @@ -15,6 +15,8 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include + #include "NdbApiSignal.hpp" #include "NdbImpl.hpp" #include "NdbOperation.hpp" @@ -53,6 +55,8 @@ int Ndb::init(int aMaxNoOfTransactions) { + DBUG_ENTER("Ndb::init"); + int i; int aNrOfCon; int aNrOfOp; @@ -67,7 +71,7 @@ Ndb::init(int aMaxNoOfTransactions) theError.code = 4104; break; } - return -1; + DBUG_RETURN(-1); }//if theInitState = StartingInit; TransporterFacade * theFacade = TransporterFacade::instance(); @@ -75,37 +79,17 @@ Ndb::init(int aMaxNoOfTransactions) const int tBlockNo = theFacade->open(this, executeMessage, - statusMessage); - - + statusMessage); if ( tBlockNo == -1 ) { theError.code = 4105; theFacade->unlock_mutex(); - return -1; // no more free blocknumbers + DBUG_RETURN(-1); // no more free blocknumbers }//if theNdbBlockNumber = tBlockNo; - theNode = theFacade->ownId(); - theMyRef = numberToRef(theNdbBlockNumber, theNode); - - for (i = 1; i < MAX_NDB_NODES; i++){ - if (theFacade->getIsDbNode(i)){ - theDBnodes[theNoOfDBnodes] = i; - theNoOfDBnodes++; - } - } - - theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+((Uint64)theNode << 40); - theFirstTransId += theFacade->m_max_trans_id; theFacade->unlock_mutex(); - - theDictionary = new NdbDictionaryImpl(*this); - if (theDictionary == NULL) { - theError.code = 4000; - return -1; - } theDictionary->setTransporter(this, theFacade); aNrOfCon = theNoOfDBnodes; @@ -144,9 +128,6 @@ Ndb::init(int aMaxNoOfTransactions) theSentTransactionsArray[i] = NULL; theCompletedTransactionsArray[i] = NULL; }//for - - startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes); - for (i = 0; i < 16; i++){ tSignal[i] = getSignal(); if(tSignal[i] == NULL) { @@ -156,11 +137,8 @@ Ndb::init(int aMaxNoOfTransactions) } for (i = 0; i < 16; i++) releaseSignal(tSignal[i]); - theInitState = Initialised; - - theCommitAckSignal = new NdbApiSignal(theMyRef); - return 0; + DBUG_RETURN(0); error_handler: ndbout << "error_handler" << endl; @@ -176,12 +154,13 @@ error_handler: delete theDictionary; TransporterFacade::instance()->close(theNdbBlockNumber, 0); - return -1; + DBUG_RETURN(-1); } void Ndb::releaseTransactionArrays() { + DBUG_ENTER("Ndb::releaseTransactionArrays"); if (thePreparedTransactionsArray != NULL) { delete [] thePreparedTransactionsArray; }//if @@ -191,6 +170,7 @@ Ndb::releaseTransactionArrays() if (theCompletedTransactionsArray != NULL) { delete [] theCompletedTransactionsArray; }//if + DBUG_VOID_RETURN; }//Ndb::releaseTransactionArrays() void @@ -202,13 +182,46 @@ Ndb::executeMessage(void* NdbObject, tNdb->handleReceivedSignal(aSignal, ptr); } -void -Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete) +void Ndb::connected(Uint32 ref) { + theMyRef= ref; + theNode= refToNode(theMyRef); + if (theNdbBlockNumber >= 0) + assert(theMyRef == numberToRef(theNdbBlockNumber, theNode)); + + TransporterFacade * theFacade = TransporterFacade::instance(); + int i; + theNoOfDBnodes= 0; + for (i = 1; i < MAX_NDB_NODES; i++){ + if (theFacade->getIsDbNode(i)){ + theDBnodes[theNoOfDBnodes] = i; + theNoOfDBnodes++; + } + } + theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+ + ((Uint64)theNode << 40); + theFirstTransId += theFacade->m_max_trans_id; + // assert(0); + DBUG_PRINT("info",("connected with ref=%x, id=%d, no_db_nodes=%d, first_trans_id=%d", + theMyRef, + theNode, + theNoOfDBnodes, + theFirstTransId)); + startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes); + theCommitAckSignal = new NdbApiSignal(theMyRef); + + theDictionary->m_receiver.m_reference= theMyRef; +} + +void +Ndb::statusMessage(void* NdbObject, Uint32 a_node, bool alive, bool nfComplete) +{ + DBUG_ENTER("Ndb::statusMessage"); Ndb* tNdb = (Ndb*)NdbObject; if (alive) { if (nfComplete) { - assert(0); + tNdb->connected(a_node); + DBUG_VOID_RETURN; }//if } else { if (nfComplete) { @@ -219,6 +232,7 @@ Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete) }//if NdbDictInterface::execNodeStatus(&tNdb->theDictionary->m_receiver, a_node, alive, nfComplete); + DBUG_VOID_RETURN; } void diff --git a/ndb/src/ndbapi/Ndbinit.cpp b/ndb/src/ndbapi/Ndbinit.cpp index b09696d5262..718116819da 100644 --- a/ndb/src/ndbapi/Ndbinit.cpp +++ b/ndb/src/ndbapi/Ndbinit.cpp @@ -42,6 +42,7 @@ void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *); static int theNoOfNdbObjects = 0; static char *ndbConnectString = 0; +static Ndb_cluster_connection *global_ndb_cluster_connection= 0; #if defined NDB_WIN32 || defined SCO static NdbMutex & createNdbMutex = * NdbMutex_Create(); @@ -56,45 +57,74 @@ Ndb(const char* aDataBase); Parameters: aDataBase : Name of the database. Remark: Connect to the database. ***************************************************************************/ -Ndb::Ndb( const char* aDataBase , const char* aSchema) : - theNdbObjectIdMap(0), - thePreparedTransactionsArray(NULL), - theSentTransactionsArray(NULL), - theCompletedTransactionsArray(NULL), - theNoOfPreparedTransactions(0), - theNoOfSentTransactions(0), - theNoOfCompletedTransactions(0), - theNoOfAllocatedTransactions(0), - theMaxNoOfTransactions(0), - theMinNoOfEventsToWakeUp(0), - prefixEnd(NULL), - theImpl(NULL), - theDictionary(NULL), - theConIdleList(NULL), - theOpIdleList(NULL), - theScanOpIdleList(NULL), - theIndexOpIdleList(NULL), -// theSchemaConIdleList(NULL), -// theSchemaConToNdbList(NULL), - theTransactionList(NULL), - theConnectionArray(NULL), - theRecAttrIdleList(NULL), - theSignalIdleList(NULL), - theLabelList(NULL), - theBranchList(NULL), - theSubroutineList(NULL), - theCallList(NULL), - theScanList(NULL), - theNdbBlobIdleList(NULL), - theNoOfDBnodes(0), - theDBnodes(NULL), - the_release_ind(NULL), - the_last_check_time(0), - theFirstTransId(0), - theRestartGCI(0), - theNdbBlockNumber(-1), - theInitState(NotConstructed) +Ndb::Ndb( const char* aDataBase , const char* aSchema) { + if (global_ndb_cluster_connection == 0) { + if (theNoOfNdbObjects > 0) + abort(); // old and new Ndb constructor used mixed + global_ndb_cluster_connection= new Ndb_cluster_connection(ndbConnectString); + global_ndb_cluster_connection->connect(); + } + setup(global_ndb_cluster_connection, aDataBase, aSchema); +} + +Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection, + const char* aDataBase , const char* aSchema) { + if (global_ndb_cluster_connection != 0 && + global_ndb_cluster_connection != ndb_cluster_connection) + abort(); // old and new Ndb constructor used mixed + setup(ndb_cluster_connection, aDataBase, aSchema); +} + +void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection, + const char* aDataBase , const char* aSchema) +{ + DBUG_ENTER("Ndb::setup"); + + theNdbObjectIdMap= 0; + m_ndb_cluster_connection= ndb_cluster_connection; + thePreparedTransactionsArray= NULL; + theSentTransactionsArray= NULL; + theCompletedTransactionsArray= NULL; + theNoOfPreparedTransactions= 0; + theNoOfSentTransactions= 0; + theNoOfCompletedTransactions= 0; + theNoOfAllocatedTransactions= 0; + theMaxNoOfTransactions= 0; + theMinNoOfEventsToWakeUp= 0; + prefixEnd= NULL; + theImpl= NULL; + theDictionary= NULL; + theConIdleList= NULL; + theOpIdleList= NULL; + theScanOpIdleList= NULL; + theIndexOpIdleList= NULL; +// theSchemaConIdleList= NULL; +// theSchemaConToNdbList= NULL; + theTransactionList= NULL; + theConnectionArray= NULL; + theRecAttrIdleList= NULL; + theSignalIdleList= NULL; + theLabelList= NULL; + theBranchList= NULL; + theSubroutineList= NULL; + theCallList= NULL; + theScanList= NULL; + theNdbBlobIdleList= NULL; + theNoOfDBnodes= 0; + theDBnodes= NULL; + the_release_ind= NULL; + the_last_check_time= 0; + theFirstTransId= 0; + theRestartGCI= 0; + theNdbBlockNumber= -1; + theInitState= NotConstructed; + + theNode= 0; + theFirstTransId= 0; + theMyRef= 0; + theNoOfDBnodes= 0; + fullyQualifiedNames = true; cgetSignals =0; @@ -135,18 +165,8 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) : NdbMutex_Lock(&createNdbMutex); - TransporterFacade * m_facade = 0; - if(theNoOfNdbObjects == 0){ - if ((m_facade = TransporterFacade::start_instance(ndbConnectString)) == 0) - theInitState = InitConfigError; - } else { - m_facade = TransporterFacade::instance(); - } - - if(m_facade != 0){ - theWaiter.m_mutex = m_facade->theMutexPtr; - } - + theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr; + // For keeping track of how many Ndb objects that exists. theNoOfNdbObjects += 1; @@ -167,6 +187,13 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) : } NdbMutex_Unlock(&createNdbMutex); + + theDictionary = new NdbDictionaryImpl(*this); + if (theDictionary == NULL) { + ndbout_c("Ndb cailed to allocate dictionary"); + exit(-1); + } + DBUG_VOID_RETURN; } @@ -187,6 +214,7 @@ void Ndb::setConnectString(const char * connectString) *****************************************************************************/ Ndb::~Ndb() { + DBUG_ENTER("Ndb::~Ndb()"); doDisconnect(); delete theDictionary; @@ -203,6 +231,10 @@ Ndb::~Ndb() theNoOfNdbObjects -= 1; if(theNoOfNdbObjects == 0){ TransporterFacade::stop_instance(); + if (global_ndb_cluster_connection != 0) { + delete global_ndb_cluster_connection; + global_ndb_cluster_connection= 0; + } }//if NdbMutex_Unlock(&createNdbMutex); @@ -271,6 +303,7 @@ Ndb::~Ndb() assert(cnewSignals == cfreeSignals); assert(cgetSignals == creleaseSignals); #endif + DBUG_VOID_RETURN; } NdbWaiter::NdbWaiter(){ diff --git a/ndb/src/ndbapi/Ndblist.cpp b/ndb/src/ndbapi/Ndblist.cpp index 1e1cb5e4b40..25239ef6738 100644 --- a/ndb/src/ndbapi/Ndblist.cpp +++ b/ndb/src/ndbapi/Ndblist.cpp @@ -783,6 +783,7 @@ Remark: Release and disconnect from DBTC a connection and seize it to th void Ndb::releaseConnectToNdb(NdbConnection* a_con) { + DBUG_ENTER("Ndb::releaseConnectToNdb"); NdbApiSignal tSignal(theMyRef); int tConPtr; @@ -790,7 +791,7 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con) // manage to reach NDB or not. if (a_con == NULL) - return; + DBUG_VOID_RETURN; Uint32 node_id = a_con->getConnectedNodeId(); Uint32 conn_seq = a_con->theNodeSequence; @@ -821,6 +822,6 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con) abort(); }//if releaseNdbCon(a_con); - return; + DBUG_VOID_RETURN; } diff --git a/ndb/src/ndbapi/TransporterFacade.cpp b/ndb/src/ndbapi/TransporterFacade.cpp index 852996e92c0..4a7ad8a6872 100644 --- a/ndb/src/ndbapi/TransporterFacade.cpp +++ b/ndb/src/ndbapi/TransporterFacade.cpp @@ -15,6 +15,7 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include +#include #include #include "TransporterFacade.hpp" #include "ClusterMgr.hpp" @@ -37,6 +38,16 @@ //#define REPORT_TRANSPORTER //#define API_TRACE; +static int numberToIndex(int number) +{ + return number - MIN_API_BLOCK_NO; +} + +static int indexToNumber(int index) +{ + return index + MIN_API_BLOCK_NO; +} + #if defined DEBUG_TRANSPORTER #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl; #else @@ -44,8 +55,6 @@ #endif TransporterFacade* TransporterFacade::theFacadeInstance = NULL; -ConfigRetriever *TransporterFacade::s_config_retriever= 0; - /***************************************************************************** * Call back functions @@ -321,12 +330,6 @@ copy(Uint32 * & insertPtr, abort(); } -extern "C" -void -atexit_stop_instance(){ - TransporterFacade::stop_instance(); -} - /** * Note that this function need no locking since its * only called from the constructor of Ndb (the NdbObject) @@ -334,64 +337,14 @@ atexit_stop_instance(){ * Which is protected by a mutex */ - -TransporterFacade* -TransporterFacade::start_instance(const char * connectString){ - - // TransporterFacade used from API get config from mgmt srvr - s_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API); - - s_config_retriever->setConnectString(connectString); - const char* error = 0; - do { - if(s_config_retriever->init() == -1) - break; - - if(s_config_retriever->do_connect() == -1) - break; - - Uint32 nodeId = s_config_retriever->allocNodeId(); - for(Uint32 i = 0; nodeId == 0 && i<5; i++){ - NdbSleep_SecSleep(3); - nodeId = s_config_retriever->allocNodeId(); - } - if(nodeId == 0) - break; - - ndb_mgm_configuration * props = s_config_retriever->getConfig(); - if(props == 0) - break; - - TransporterFacade * tf = start_instance(nodeId, props); - - free(props); - return tf; - } while(0); - - ndbout << "Configuration error: "; - const char* erString = s_config_retriever->getErrorString(); - if (erString == 0) { - erString = "No error specified!"; - } - ndbout << erString << endl; - return 0; -} - -TransporterFacade* +int TransporterFacade::start_instance(int nodeId, const ndb_mgm_configuration* props) { - TransporterFacade* tf = new TransporterFacade(); - if (! tf->init(nodeId, props)) { - delete tf; - return NULL; + if (! theFacadeInstance->init(nodeId, props)) { + return -1; } - /** - * Install atexit handler - */ - atexit(atexit_stop_instance); - /** * Install signal handler for SIGPIPE * @@ -402,19 +355,7 @@ TransporterFacade::start_instance(int nodeId, signal(SIGPIPE, SIG_IGN); #endif - if(theFacadeInstance == NULL){ - theFacadeInstance = tf; - } - - return tf; -} - -void -TransporterFacade::close_configuration(){ - if (s_config_retriever) { - delete s_config_retriever; - s_config_retriever= 0; - } + return 0; } /** @@ -425,23 +366,15 @@ TransporterFacade::close_configuration(){ */ void TransporterFacade::stop_instance(){ - - close_configuration(); - - if(theFacadeInstance == NULL){ - /** - * We are called from atexit function - */ - return; - } - - theFacadeInstance->doStop(); - - delete theFacadeInstance; theFacadeInstance = NULL; + DBUG_ENTER("TransporterFacade::stop_instance"); + if(theFacadeInstance) + theFacadeInstance->doStop(); + DBUG_VOID_RETURN; } void TransporterFacade::doStop(){ + DBUG_ENTER("TransporterFacade::doStop"); /** * First stop the ClusterMgr because it needs to send one more signal * and also uses theFacadeInstance to lock/unlock theMutexPtr @@ -454,17 +387,26 @@ TransporterFacade::doStop(){ */ void *status; theStopReceive = 1; - NdbThread_WaitFor(theReceiveThread, &status); - NdbThread_WaitFor(theSendThread, &status); - NdbThread_Destroy(&theReceiveThread); - NdbThread_Destroy(&theSendThread); + if (theReceiveThread) { + NdbThread_WaitFor(theReceiveThread, &status); + NdbThread_Destroy(&theReceiveThread); + theReceiveThread= 0; + } + if (theSendThread) { + NdbThread_WaitFor(theSendThread, &status); + NdbThread_Destroy(&theSendThread); + theSendThread= 0; + } + DBUG_VOID_RETURN; } extern "C" void* runSendRequest_C(void * me) { + my_thread_init(); ((TransporterFacade*) me)->threadMainSend(); + my_thread_end(); NdbThread_Exit(0); return me; } @@ -507,7 +449,9 @@ extern "C" void* runReceiveResponse_C(void * me) { + my_thread_init(); ((TransporterFacade*) me)->threadMainReceive(); + my_thread_end(); NdbThread_Exit(0); return me; } @@ -540,6 +484,8 @@ TransporterFacade::TransporterFacade() : theSendThread(NULL), theReceiveThread(NULL) { + theOwnId = 0; + theMutexPtr = NdbMutex_Create(); sendPerformedLastInterval = 0; @@ -552,6 +498,8 @@ TransporterFacade::TransporterFacade() : m_batch_byte_size= SCAN_BATCH_SIZE; m_batch_size= DEF_BATCH_SIZE; m_max_trans_id = 0; + + theClusterMgr = new ClusterMgr(* this); } bool @@ -570,7 +518,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE); iter.first(); - theClusterMgr = new ClusterMgr(* this); theClusterMgr->init(iter); /** @@ -622,7 +569,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) 32768, "ndb_send", NDB_THREAD_PRIO_LOW); - theClusterMgr->startThread(); #ifdef API_TRACE @@ -633,6 +579,21 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) } +void +TransporterFacade::connected() +{ + DBUG_ENTER("TransporterFacade::connected"); + Uint32 sz = m_threads.m_statusNext.size(); + for (Uint32 i = 0; i < sz ; i ++) { + if (m_threads.getInUse(i)){ + void * obj = m_threads.m_objectExecute[i].m_object; + NodeStatusFunction RegPC = m_threads.m_statusFunction[i]; + (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true); + } + } + DBUG_VOID_RETURN; +} + void TransporterFacade::ReportNodeDead(NodeId tNodeId) { @@ -719,7 +680,16 @@ TransporterFacade::open(void* objRef, ExecuteFunction fun, NodeStatusFunction statusFun) { - return m_threads.open(objRef, fun, statusFun); + DBUG_ENTER("TransporterFacade::open"); + int r= m_threads.open(objRef, fun, statusFun); + if (r < 0) + DBUG_RETURN(r); +#if 1 + if (theOwnId > 0) { + (*statusFun)(objRef, numberToRef(r, theOwnId), true, true); + } +#endif + DBUG_RETURN(r); } TransporterFacade::~TransporterFacade(){ @@ -762,7 +732,7 @@ TransporterFacade::calculateSendLimit() //------------------------------------------------- void TransporterFacade::forceSend(Uint32 block_number) { checkCounter--; - m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE; + m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE; sendPerformedLastInterval = 1; if (checkCounter < 0) { calculateSendLimit(); @@ -775,7 +745,7 @@ void TransporterFacade::forceSend(Uint32 block_number) { //------------------------------------------------- void TransporterFacade::checkForceSend(Uint32 block_number) { - m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE; + m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE; //------------------------------------------------- // This code is an adaptive algorithm to discover when // the API should actually send its buffers. The reason @@ -1016,11 +986,12 @@ TransporterFacade::ThreadData::expand(Uint32 size){ m_firstFree = m_statusNext.size() - size; } + int TransporterFacade::ThreadData::open(void* objRef, - ExecuteFunction fun, - NodeStatusFunction fun2){ - + ExecuteFunction fun, + NodeStatusFunction fun2) +{ Uint32 nextFree = m_firstFree; if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){ @@ -1040,12 +1011,12 @@ TransporterFacade::ThreadData::open(void* objRef, m_objectExecute[nextFree] = oe; m_statusFunction[nextFree] = fun2; - return nextFree + MIN_API_BLOCK_NO; + return indexToNumber(nextFree); } int TransporterFacade::ThreadData::close(int number){ - number -= MIN_API_BLOCK_NO; + number= numberToIndex(number); assert(getInUse(number)); m_statusNext[number] = m_firstFree; m_firstFree = number; diff --git a/ndb/src/ndbapi/TransporterFacade.hpp b/ndb/src/ndbapi/TransporterFacade.hpp index 1ecdfab1e31..130a24345b7 100644 --- a/ndb/src/ndbapi/TransporterFacade.hpp +++ b/ndb/src/ndbapi/TransporterFacade.hpp @@ -35,7 +35,7 @@ class Ndb; class NdbApiSignal; typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]); -typedef void (* NodeStatusFunction)(void *, NodeId, bool nodeAlive, bool nfComplete); +typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete); extern "C" { void* runSendRequest_C(void*); @@ -55,9 +55,7 @@ public: bool init(Uint32, const ndb_mgm_configuration *); static TransporterFacade* instance(); - static TransporterFacade* start_instance(int, const ndb_mgm_configuration*); - static TransporterFacade* start_instance(const char *connectString); - static void close_configuration(); + int start_instance(int, const ndb_mgm_configuration*); static void stop_instance(); /** @@ -93,6 +91,8 @@ public: // My own processor id NodeId ownId() const; + void connected(); + void doConnect(int NodeId); void reportConnected(int NodeId); void doDisconnect(int NodeId); @@ -130,6 +130,7 @@ private: friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond friend class GrepSS; friend class Ndb; + friend class Ndb_cluster_connection; int sendSignalUnCond(NdbApiSignal *, NodeId nodeId); diff --git a/ndb/src/ndbapi/ndb_cluster_connection.cpp b/ndb/src/ndbapi/ndb_cluster_connection.cpp new file mode 100644 index 00000000000..9fb7a563807 --- /dev/null +++ b/ndb/src/ndbapi/ndb_cluster_connection.cpp @@ -0,0 +1,156 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +static int g_run_connect_thread= 0; + +Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string) +{ + m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade(); + if (connect_string) + m_connect_string= strdup(connect_string); + else + m_connect_string= 0; + m_config_retriever= 0; + m_connect_thread= 0; + m_connect_callback= 0; +} + +extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me) +{ + my_thread_init(); + g_run_connect_thread= 1; + ((Ndb_cluster_connection*) me)->connect_thread(); + my_thread_end(); + NdbThread_Exit(0); + return me; +} + +void Ndb_cluster_connection::connect_thread() +{ + DBUG_ENTER("Ndb_cluster_connection::connect_thread"); + int r; + while (g_run_connect_thread) { + if ((r = connect(1)) == 0) + break; + if (r == -1) { + printf("Ndb_cluster_connection::connect_thread error\n"); + abort(); + } + } + if (m_connect_callback) + (*m_connect_callback)(); + DBUG_VOID_RETURN; +} + +int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void)) +{ + DBUG_ENTER("Ndb_cluster_connection::start_connect_thread"); + m_connect_callback= connect_callback; + m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread, + (void**)this, + 32768, + "ndb_cluster_connection", + NDB_THREAD_PRIO_LOW); + DBUG_RETURN(0); +} + +int Ndb_cluster_connection::connect(int reconnect) +{ + DBUG_ENTER("Ndb_cluster_connection::connect"); + const char* error = 0; + do { + if (m_config_retriever == 0) + { + m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API); + m_config_retriever->setConnectString(m_connect_string); + if(m_config_retriever->init() == -1) + break; + } + else + if (reconnect == 0) + DBUG_RETURN(0); + if (reconnect) + { + int r= m_config_retriever->do_connect(1); + if (r == 1) + DBUG_RETURN(1); // mgmt server not up yet + if (r == -1) + break; + } + else + if(m_config_retriever->do_connect() == -1) + break; + Uint32 nodeId = m_config_retriever->allocNodeId(); + for(Uint32 i = 0; nodeId == 0 && i<5; i++){ + NdbSleep_SecSleep(3); + nodeId = m_config_retriever->allocNodeId(); + } + if(nodeId == 0) + break; + ndb_mgm_configuration * props = m_config_retriever->getConfig(); + if(props == 0) + break; + m_facade->start_instance(nodeId, props); + free(props); + m_facade->connected(); + DBUG_RETURN(0); + } while(0); + + ndbout << "Configuration error: "; + const char* erString = m_config_retriever->getErrorString(); + if (erString == 0) { + erString = "No error specified!"; + } + ndbout << erString << endl; + DBUG_RETURN(-1); +} + +Ndb_cluster_connection::~Ndb_cluster_connection() +{ + if (m_connect_thread) + { + void *status; + g_run_connect_thread= 0; + NdbThread_WaitFor(m_connect_thread, &status); + NdbThread_Destroy(&m_connect_thread); + m_connect_thread= 0; + } + if (m_facade != 0) + { + delete m_facade; + if (m_facade != TransporterFacade::theFacadeInstance) + abort(); + TransporterFacade::theFacadeInstance= 0; + } + if (m_connect_string) + free(m_connect_string); + if (m_config_retriever) + delete m_config_retriever; +} + + diff --git a/ndb/tools/listTables.cpp b/ndb/tools/listTables.cpp index bddf61848e8..b9e050ab6a4 100644 --- a/ndb/tools/listTables.cpp +++ b/ndb/tools/listTables.cpp @@ -22,12 +22,13 @@ */ #include +#include #include #include #include - +static Ndb_cluster_connection *ndb_cluster_connection= 0; static Ndb* ndb = 0; static NdbDictionary::Dictionary* dic = 0; static int _unqualified = 0; @@ -48,6 +49,22 @@ fatal(char const* fmt, ...) exit(1); } +static void +fatal_dict(char const* fmt, ...) +{ + va_list ap; + char buf[500]; + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + ndbout << buf; + if (dic) + ndbout << " - " << dic->getNdbError(); + ndbout << endl; + NDBT_ProgramExit(NDBT_FAILED); + exit(1); +} + static void list(const char * tabname, NdbDictionary::Object::Type type) @@ -55,10 +72,10 @@ list(const char * tabname, NdbDictionary::Dictionary::List list; if (tabname == 0) { if (dic->listObjects(list, type) == -1) - fatal("listObjects"); + fatal_dict("listObjects"); } else { if (dic->listIndexes(list, tabname) == -1) - fatal("listIndexes"); + fatal_dict("listIndexes"); } if (ndb->usingFullyQualifiedNames()) ndbout_c("%-5s %-20s %-8s %-7s %-12s %-8s %s", "id", "type", "state", "logging", "database", "schema", "name"); @@ -145,12 +162,17 @@ list(const char * tabname, } } +#ifndef DBUG_OFF +const char *debug_option= 0; +#endif + int main(int argc, const char** argv){ int _loops = 1; const char* _tabname = NULL; const char* _dbname = "TEST_DB"; int _type = 0; int _help = 0; + const char* _connect_str = NULL; struct getargs args[] = { { "loops", 'l', arg_integer, &_loops, "loops", @@ -161,6 +183,13 @@ int main(int argc, const char** argv){ "Name of database table is in"}, { "type", 't', arg_integer, &_type, "type", "Type of objects to show, see NdbDictionary.hpp for numbers(default = 0)" }, + { "connect-string", 'c', arg_string, &_connect_str, + "Set connect string for connecting to ndb_mgmd. =\"host=[;nodeid=]\". Overides specifying entries in NDB_CONNECTSTRING and config file", + "" }, +#ifndef DBUG_OFF + { "debug", 0, arg_string, &debug_option, + "Specify debug options e.g. d:t:i:o,out.trace", "options" }, +#endif { "usage", '?', arg_flag, &_help, "Print help", "" } }; int num_args = sizeof(args) / sizeof(args[0]); @@ -179,10 +208,18 @@ int main(int argc, const char** argv){ } _tabname = argv[optind]; - ndb = new Ndb(_dbname); +#ifndef DBUG_OFF + my_init(); + if (debug_option) + DBUG_PUSH(debug_option); +#endif + + ndb_cluster_connection = new Ndb_cluster_connection(_connect_str); + ndb = new Ndb(ndb_cluster_connection, _dbname); ndb->useFullyQualifiedNames(!_unqualified); if (ndb->init() != 0) fatal("init"); + ndb_cluster_connection->connect(); if (ndb->waitUntilReady(30) < 0) fatal("waitUntilReady"); dic = ndb->getDictionary(); diff --git a/ndb/tools/select_all.cpp b/ndb/tools/select_all.cpp index 329ed87bc48..eb95947fc0f 100644 --- a/ndb/tools/select_all.cpp +++ b/ndb/tools/select_all.cpp @@ -16,6 +16,7 @@ #include +#include #include @@ -26,6 +27,9 @@ #include #include +#ifndef DBUG_OFF +const char *debug_option= 0; +#endif int scanReadRecords(Ndb*, const NdbDictionary::Table*, @@ -58,6 +62,10 @@ int main(int argc, const char** argv){ "Output numbers in hexadecimal format", "useHexFormat" }, { "delimiter", 'd', arg_string, &_delimiter, "Column delimiter", "delimiter" }, +#ifndef DBUG_OFF + { "debug", 0, arg_string, &debug_option, + "Specify debug options e.g. d:t:i:o,out.trace", "options" }, +#endif { "usage", '?', arg_flag, &_help, "Print help", "" }, { "lock", 'l', arg_integer, &_lock, "Read(0), Read-hold(1), Exclusive(2)", "lock"}, @@ -80,6 +88,12 @@ int main(int argc, const char** argv){ } _tabname = argv[optind]; +#ifndef DBUG_OFF + my_init(); + if (debug_option) + DBUG_PUSH(debug_option); +#endif + // Connect to Ndb Ndb MyNdb(_dbname);