mirror of
https://github.com/MariaDB/server.git
synced 2025-09-02 09:41:40 +03:00
Merge
ndb/src/ndbapi/NdbDictionaryImpl.hpp: Auto merged ndb/src/ndbapi/NdbReceiver.cpp: Auto merged ndb/src/ndbapi/TransporterFacade.cpp: Auto merged ndb/src/ndbapi/TransporterFacade.hpp: Auto merged ndb/src/ndbapi/NdbDictionaryImpl.cpp: SCCS merged
This commit is contained in:
@@ -16,6 +16,7 @@ ndbapi/NdbError.hpp \
|
||||
ndbapi/NdbEventOperation.hpp \
|
||||
ndbapi/NdbIndexOperation.hpp \
|
||||
ndbapi/NdbOperation.hpp \
|
||||
ndbapi/ndb_cluster_connection.hpp \
|
||||
ndbapi/NdbBlob.hpp \
|
||||
ndbapi/NdbPool.hpp \
|
||||
ndbapi/NdbRecAttr.hpp \
|
||||
|
@@ -37,7 +37,7 @@ public:
|
||||
*/
|
||||
int init();
|
||||
|
||||
int do_connect();
|
||||
int do_connect(int exit_on_connect_failure= false);
|
||||
|
||||
/**
|
||||
* Get configuration for current (nodeId given in local config file) node.
|
||||
|
@@ -21,6 +21,7 @@
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
const char* NdbConfig_get_path(int *len);
|
||||
void NdbConfig_SetPath(const char *path);
|
||||
char* NdbConfig_NdbCfgName(int with_ndb_home);
|
||||
char* NdbConfig_ErrorFileName(int node_id);
|
||||
|
@@ -860,6 +860,7 @@
|
||||
|
||||
#include <ndb_types.h>
|
||||
#include <ndbapi_limits.h>
|
||||
#include <ndb_cluster_connection.hpp>
|
||||
#include <NdbError.hpp>
|
||||
#include <NdbDictionary.hpp>
|
||||
|
||||
@@ -992,6 +993,8 @@ public:
|
||||
* deprecated.
|
||||
*/
|
||||
Ndb(const char* aCatalogName = "", const char* aSchemaName = "def");
|
||||
Ndb(Ndb_cluster_connection *ndb_cluster_connection,
|
||||
const char* aCatalogName = "", const char* aSchemaName = "def");
|
||||
|
||||
~Ndb();
|
||||
|
||||
@@ -1081,8 +1084,11 @@ public:
|
||||
* @return 0: Ndb is ready and timeout has not occurred.<br>
|
||||
* -1: Timeout has expired
|
||||
*/
|
||||
|
||||
int waitUntilReady(int timeout = 60);
|
||||
|
||||
void connected(Uint32 block_reference);
|
||||
|
||||
/** @} *********************************************************************/
|
||||
|
||||
/**
|
||||
@@ -1447,6 +1453,9 @@ public:
|
||||
****************************************************************************/
|
||||
private:
|
||||
|
||||
void setup(Ndb_cluster_connection *ndb_cluster_connection,
|
||||
const char* aCatalogName, const char* aSchemaName);
|
||||
|
||||
NdbConnection* startTransactionLocal(Uint32 aPrio, Uint32 aFragmentId);
|
||||
|
||||
// Connect the connection object to the Database.
|
||||
@@ -1585,6 +1594,7 @@ private:
|
||||
* These are the private variables in this class.
|
||||
*****************************************************************************/
|
||||
NdbObjectIdMap* theNdbObjectIdMap;
|
||||
Ndb_cluster_connection *m_ndb_cluster_connection;
|
||||
|
||||
NdbConnection** thePreparedTransactionsArray;
|
||||
NdbConnection** theSentTransactionsArray;
|
||||
@@ -1703,7 +1713,7 @@ private:
|
||||
|
||||
static void executeMessage(void*, NdbApiSignal *,
|
||||
struct LinearSectionPtr ptr[3]);
|
||||
static void statusMessage(void*, Uint16, bool, bool);
|
||||
static void statusMessage(void*, Uint32, bool, bool);
|
||||
#ifdef VM_TRACE
|
||||
void printState(const char* fmt, ...);
|
||||
#endif
|
||||
|
45
ndb/include/ndbapi/ndb_cluster_connection.hpp
Normal file
45
ndb/include/ndbapi/ndb_cluster_connection.hpp
Normal file
@@ -0,0 +1,45 @@
|
||||
/* Copyright (C) 2003 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
|
||||
#ifndef CLUSTER_CONNECTION_HPP
|
||||
#define CLUSTER_CONNECTION_HPP
|
||||
|
||||
class TransporterFacade;
|
||||
class ConfigRetriever;
|
||||
class NdbThread;
|
||||
|
||||
extern "C" {
|
||||
void* run_ndb_cluster_connection_connect_thread(void*);
|
||||
}
|
||||
|
||||
class Ndb_cluster_connection {
|
||||
public:
|
||||
Ndb_cluster_connection(const char * connect_string = 0);
|
||||
~Ndb_cluster_connection();
|
||||
int connect(int reconnect= 0);
|
||||
int start_connect_thread(int (*connect_callback)(void)= 0);
|
||||
private:
|
||||
friend void* run_ndb_cluster_connection_connect_thread(void*);
|
||||
void connect_thread();
|
||||
char *m_connect_string;
|
||||
TransporterFacade *m_facade;
|
||||
ConfigRetriever *m_config_retriever;
|
||||
NdbThread *m_connect_thread;
|
||||
int (*m_connect_callback)(void);
|
||||
};
|
||||
|
||||
#endif
|
@@ -78,7 +78,7 @@ ConfigRetriever::init() {
|
||||
}
|
||||
|
||||
int
|
||||
ConfigRetriever::do_connect(){
|
||||
ConfigRetriever::do_connect(int exit_on_connect_failure){
|
||||
|
||||
if(!m_handle)
|
||||
m_handle= ndb_mgm_create_handle();
|
||||
@@ -102,6 +102,8 @@ ConfigRetriever::do_connect(){
|
||||
if (ndb_mgm_connect(m_handle, tmp.c_str()) == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (exit_on_connect_failure)
|
||||
return 1;
|
||||
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
|
||||
case MgmId_File:
|
||||
break;
|
||||
|
@@ -21,27 +21,34 @@
|
||||
|
||||
static char *datadir_path= 0;
|
||||
|
||||
const char *
|
||||
NdbConfig_get_path(int *_len)
|
||||
{
|
||||
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
|
||||
int path_len= 0;
|
||||
if (path)
|
||||
path_len= strlen(path);
|
||||
if (path_len == 0 && datadir_path) {
|
||||
path= datadir_path;
|
||||
path_len= strlen(path);
|
||||
}
|
||||
if (path_len == 0) {
|
||||
path= ".";
|
||||
path_len= strlen(path);
|
||||
}
|
||||
if (_len)
|
||||
*_len= path_len;
|
||||
return path;
|
||||
}
|
||||
|
||||
static char*
|
||||
NdbConfig_AllocHomePath(int _len)
|
||||
{
|
||||
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
|
||||
int len= _len;
|
||||
int path_len= 0;
|
||||
char *buf;
|
||||
|
||||
if (path == 0)
|
||||
path= datadir_path;
|
||||
|
||||
if (path)
|
||||
path_len= strlen(path);
|
||||
|
||||
len+= path_len;
|
||||
buf= NdbMem_Allocate(len);
|
||||
if (path_len > 0)
|
||||
int path_len;
|
||||
const char *path= NdbConfig_get_path(&path_len);
|
||||
int len= _len+path_len;
|
||||
char *buf= NdbMem_Allocate(len);
|
||||
snprintf(buf, len, "%s%s", path, DIR_SEPARATOR);
|
||||
else
|
||||
buf[0]= 0;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
@@ -74,6 +74,8 @@ NDB_MAIN(ndb_kernel){
|
||||
theConfig->fetch_configuration();
|
||||
}
|
||||
|
||||
chdir(NdbConfig_get_path(0));
|
||||
|
||||
if (theConfig->getDaemonMode()) {
|
||||
// Become a daemon
|
||||
char *lockfile= NdbConfig_PidFileName(globalData.ownId);
|
||||
|
@@ -15,6 +15,9 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
|
||||
#include "WatchDog.hpp"
|
||||
#include "GlobalData.hpp"
|
||||
#include <NdbOut.hpp>
|
||||
@@ -24,7 +27,9 @@
|
||||
extern "C"
|
||||
void*
|
||||
runWatchDog(void* w){
|
||||
my_thread_init();
|
||||
((WatchDog*)w)->run();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return NULL;
|
||||
}
|
||||
|
@@ -1,7 +1,8 @@
|
||||
MYSQLDATAdir = $(localstatedir)
|
||||
MYSQLSHAREdir = $(pkgdatadir)
|
||||
MYSQLBASEdir= $(prefix)
|
||||
MYSQLCLUSTERdir= $(prefix)/mysql-cluster
|
||||
#MYSQLCLUSTERdir= $(prefix)/mysql-cluster
|
||||
MYSQLCLUSTERdir= .
|
||||
|
||||
ndbbin_PROGRAMS = ndb_mgmd
|
||||
|
||||
|
@@ -629,11 +629,14 @@ MgmtSrvr::start()
|
||||
if (!check_start())
|
||||
return false;
|
||||
}
|
||||
theFacade = TransporterFacade::start_instance
|
||||
(_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues);
|
||||
|
||||
theFacade= TransporterFacade::theFacadeInstance = new TransporterFacade();
|
||||
if(theFacade == 0) {
|
||||
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
|
||||
DEBUG("MgmtSrvr.cpp: theFacade == 0.");
|
||||
return false;
|
||||
}
|
||||
if ( theFacade->start_instance
|
||||
(_ownNodeId, (ndb_mgm_configuration*)_config->m_configValues) < 0) {
|
||||
DEBUG("MgmtSrvr.cpp: TransporterFacade::start_instance < 0.");
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -2295,7 +2298,7 @@ MgmtSrvr::signalReceivedNotification(void* mgmtSrvr,
|
||||
//****************************************************************************
|
||||
//****************************************************************************
|
||||
void
|
||||
MgmtSrvr::nodeStatusNotification(void* mgmSrv, NodeId nodeId,
|
||||
MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
|
||||
bool alive, bool nfComplete)
|
||||
{
|
||||
if(!(!alive && nfComplete))
|
||||
|
@@ -699,7 +699,7 @@ private:
|
||||
* shall receive the notification.
|
||||
* @param processId: Id of the dead process.
|
||||
*/
|
||||
static void nodeStatusNotification(void* mgmSrv, NodeId nodeId,
|
||||
static void nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
|
||||
bool alive, bool nfCompleted);
|
||||
|
||||
/**
|
||||
|
@@ -15,6 +15,7 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
#include <ndb_limits.h>
|
||||
#include <ndb_version.h>
|
||||
|
||||
@@ -64,7 +65,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
|
||||
{
|
||||
ndbSetOwnVersion();
|
||||
clusterMgrThreadMutex = NdbMutex_Create();
|
||||
noOfConnectedNodes = 0;
|
||||
noOfConnectedNodes= 0;
|
||||
theClusterMgrThread= 0;
|
||||
}
|
||||
|
||||
ClusterMgr::~ClusterMgr(){
|
||||
@@ -137,20 +139,21 @@ ClusterMgr::startThread() {
|
||||
|
||||
void
|
||||
ClusterMgr::doStop( ){
|
||||
DBUG_ENTER("ClusterMgr::doStop");
|
||||
NdbMutex_Lock(clusterMgrThreadMutex);
|
||||
|
||||
if(theStop){
|
||||
NdbMutex_Unlock(clusterMgrThreadMutex);
|
||||
return;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void *status;
|
||||
theStop = 1;
|
||||
|
||||
if (theClusterMgrThread) {
|
||||
NdbThread_WaitFor(theClusterMgrThread, &status);
|
||||
NdbThread_Destroy(&theClusterMgrThread);
|
||||
|
||||
theClusterMgrThread= 0;
|
||||
}
|
||||
NdbMutex_Unlock(clusterMgrThreadMutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -524,6 +527,7 @@ ArbitMgr::doChoose(const Uint32* theData)
|
||||
void
|
||||
ArbitMgr::doStop(const Uint32* theData)
|
||||
{
|
||||
DBUG_ENTER("ArbitMgr::doStop");
|
||||
ArbitSignal aSignal;
|
||||
NdbMutex_Lock(theThreadMutex);
|
||||
if (theThread != NULL) {
|
||||
@@ -540,6 +544,7 @@ ArbitMgr::doStop(const Uint32* theData)
|
||||
theState = StateInit;
|
||||
}
|
||||
NdbMutex_Unlock(theThreadMutex);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
// private methods
|
||||
@@ -548,7 +553,9 @@ extern "C"
|
||||
void*
|
||||
runArbitMgr_C(void* me)
|
||||
{
|
||||
my_thread_init();
|
||||
((ArbitMgr*) me)->threadMain();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return NULL;
|
||||
}
|
||||
|
@@ -34,6 +34,7 @@ libndbapi_la_SOURCES = \
|
||||
NdbDictionary.cpp \
|
||||
NdbDictionaryImpl.cpp \
|
||||
DictCache.cpp \
|
||||
ndb_cluster_connection.cpp \
|
||||
NdbBlob.cpp
|
||||
|
||||
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
|
||||
|
@@ -207,9 +207,11 @@ Remark: Disconnect all connections to the database.
|
||||
void
|
||||
Ndb::doDisconnect()
|
||||
{
|
||||
DBUG_ENTER("Ndb::doDisconnect");
|
||||
NdbConnection* tNdbCon;
|
||||
CHECK_STATUS_MACRO_VOID;
|
||||
|
||||
DBUG_PRINT("info", ("theNoOfDBnodes=%d", theNoOfDBnodes));
|
||||
Uint32 tNoOfDbNodes = theNoOfDBnodes;
|
||||
UintR i;
|
||||
for (i = 0; i < tNoOfDbNodes; i++) {
|
||||
@@ -227,6 +229,7 @@ Ndb::doDisconnect()
|
||||
tNdbCon = tNdbCon->theNext;
|
||||
releaseConnectToNdb(tmpNdbCon);
|
||||
}//while
|
||||
DBUG_VOID_RETURN;
|
||||
}//Ndb::disconnect()
|
||||
|
||||
/*****************************************************************************
|
||||
@@ -239,6 +242,7 @@ Remark: Waits until a node has status != 0
|
||||
int
|
||||
Ndb::waitUntilReady(int timeout)
|
||||
{
|
||||
DBUG_ENTER("Ndb::waitUntilReady");
|
||||
int secondsCounter = 0;
|
||||
int milliCounter = 0;
|
||||
int noChecksSinceFirstAliveFound = 0;
|
||||
@@ -246,7 +250,7 @@ Ndb::waitUntilReady(int timeout)
|
||||
if (theInitState != Initialised) {
|
||||
// Ndb::init is not called
|
||||
theError.code = 4256;
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
do {
|
||||
@@ -265,13 +269,13 @@ Ndb::waitUntilReady(int timeout)
|
||||
|
||||
tp->unlock_mutex();
|
||||
if (foundAliveNode == theNoOfDBnodes) {
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}//if
|
||||
if (foundAliveNode > 0) {
|
||||
noChecksSinceFirstAliveFound++;
|
||||
}//if
|
||||
if (noChecksSinceFirstAliveFound > 30) {
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}//if
|
||||
NdbSleep_MilliSleep(100);
|
||||
milliCounter += 100;
|
||||
@@ -281,9 +285,9 @@ Ndb::waitUntilReady(int timeout)
|
||||
}//if
|
||||
} while ( secondsCounter < timeout );
|
||||
if (noChecksSinceFirstAliveFound > 0) {
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
}//if
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
/*****************************************************************************
|
||||
@@ -1060,6 +1064,9 @@ Ndb::StartTransactionNodeSelectionData::init(Uint32 noOfNodes,
|
||||
* This algorithm should be implemented in Dbdih
|
||||
*/
|
||||
{
|
||||
if (fragment2PrimaryNodeMap != 0)
|
||||
abort();
|
||||
|
||||
fragment2PrimaryNodeMap = new Uint32[noOfFragments];
|
||||
Uint32 i;
|
||||
for(i = 0; i<noOfNodes; i++){
|
||||
|
@@ -14,19 +14,7 @@
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
|
||||
|
||||
/*****************************************************************************
|
||||
Name: NdbConnection.C
|
||||
Include:
|
||||
Link:
|
||||
Author: UABMNST Mona Natterkvist UAB/B/UL
|
||||
Date: 970829
|
||||
Version: 0.1
|
||||
Description: Interface between TIS and NDB
|
||||
Documentation:
|
||||
Adjust: 971022 UABMNST First version.
|
||||
*****************************************************************************/
|
||||
#include <ndb_global.h>
|
||||
#include <NdbOut.hpp>
|
||||
#include <NdbConnection.hpp>
|
||||
#include <NdbOperation.hpp>
|
||||
@@ -104,7 +92,9 @@ Remark: Deletes the connection object.
|
||||
*****************************************************************************/
|
||||
NdbConnection::~NdbConnection()
|
||||
{
|
||||
DBUG_ENTER("NdbConnection::~NdbConnection");
|
||||
theNdb->theNdbObjectIdMap->unmap(theId, this);
|
||||
DBUG_VOID_RETURN;
|
||||
}//NdbConnection::~NdbConnection()
|
||||
|
||||
/*****************************************************************************
|
||||
|
@@ -660,6 +660,7 @@ NdbDictionaryImpl::getIndexTable(NdbIndexImpl * index,
|
||||
return getTable(m_ndb.externalizeTableName(internalName));
|
||||
}
|
||||
|
||||
#if 0
|
||||
bool
|
||||
NdbDictInterface::setTransporter(class TransporterFacade * tf)
|
||||
{
|
||||
@@ -683,11 +684,11 @@ NdbDictInterface::setTransporter(class TransporterFacade * tf)
|
||||
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
bool
|
||||
NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
|
||||
{
|
||||
m_blockNumber = -1;
|
||||
m_reference = ndb->getReference();
|
||||
m_transporter = tf;
|
||||
m_waiter.m_mutex = tf->theMutexPtr;
|
||||
@@ -697,10 +698,6 @@ NdbDictInterface::setTransporter(class Ndb* ndb, class TransporterFacade * tf)
|
||||
|
||||
NdbDictInterface::~NdbDictInterface()
|
||||
{
|
||||
if (m_transporter != NULL){
|
||||
if (m_blockNumber != -1)
|
||||
m_transporter->close(m_blockNumber, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -787,7 +784,7 @@ NdbDictInterface::execSignal(void* dictImpl,
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictInterface::execNodeStatus(void* dictImpl, NodeId aNode,
|
||||
NdbDictInterface::execNodeStatus(void* dictImpl, Uint32 aNode,
|
||||
bool alive, bool nfCompleted)
|
||||
{
|
||||
NdbDictInterface * tmp = (NdbDictInterface*)dictImpl;
|
||||
|
@@ -241,7 +241,6 @@ public:
|
||||
NdbDictInterface(NdbError& err) : m_error(err) {
|
||||
m_reference = 0;
|
||||
m_masterNodeId = 0;
|
||||
m_blockNumber = -1;
|
||||
m_transporter= NULL;
|
||||
}
|
||||
~NdbDictInterface();
|
||||
@@ -309,7 +308,6 @@ public:
|
||||
private:
|
||||
Uint32 m_reference;
|
||||
Uint32 m_masterNodeId;
|
||||
int m_blockNumber;
|
||||
|
||||
NdbWaiter m_waiter;
|
||||
class TransporterFacade * m_transporter;
|
||||
@@ -319,7 +317,7 @@ private:
|
||||
class NdbApiSignal* signal,
|
||||
class LinearSectionPtr ptr[3]);
|
||||
|
||||
static void execNodeStatus(void* dictImpl, NodeId,
|
||||
static void execNodeStatus(void* dictImpl, Uint32,
|
||||
bool alive, bool nfCompleted);
|
||||
|
||||
void execGET_TABINFO_REF(NdbApiSignal *, LinearSectionPtr ptr[3]);
|
||||
|
@@ -14,6 +14,7 @@
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include "NdbImpl.hpp"
|
||||
#include <NdbReceiver.hpp>
|
||||
#include "NdbDictionaryImpl.hpp"
|
||||
@@ -36,10 +37,12 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
|
||||
|
||||
NdbReceiver::~NdbReceiver()
|
||||
{
|
||||
DBUG_ENTER("NdbReceiver::~NdbReceiver");
|
||||
if (m_id != NdbObjectIdMap::InvalidId) {
|
||||
m_ndb->theNdbObjectIdMap->unmap(m_id, this);
|
||||
}
|
||||
delete[] m_rows;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
|
@@ -15,6 +15,8 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
|
||||
#include <ndb_global.h>
|
||||
|
||||
#include "NdbApiSignal.hpp"
|
||||
#include "NdbImpl.hpp"
|
||||
#include "NdbOperation.hpp"
|
||||
@@ -53,6 +55,8 @@
|
||||
int
|
||||
Ndb::init(int aMaxNoOfTransactions)
|
||||
{
|
||||
DBUG_ENTER("Ndb::init");
|
||||
|
||||
int i;
|
||||
int aNrOfCon;
|
||||
int aNrOfOp;
|
||||
@@ -67,7 +71,7 @@ Ndb::init(int aMaxNoOfTransactions)
|
||||
theError.code = 4104;
|
||||
break;
|
||||
}
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}//if
|
||||
theInitState = StartingInit;
|
||||
TransporterFacade * theFacade = TransporterFacade::instance();
|
||||
@@ -76,36 +80,16 @@ Ndb::init(int aMaxNoOfTransactions)
|
||||
const int tBlockNo = theFacade->open(this,
|
||||
executeMessage,
|
||||
statusMessage);
|
||||
|
||||
|
||||
if ( tBlockNo == -1 ) {
|
||||
theError.code = 4105;
|
||||
theFacade->unlock_mutex();
|
||||
return -1; // no more free blocknumbers
|
||||
DBUG_RETURN(-1); // no more free blocknumbers
|
||||
}//if
|
||||
|
||||
theNdbBlockNumber = tBlockNo;
|
||||
|
||||
theNode = theFacade->ownId();
|
||||
theMyRef = numberToRef(theNdbBlockNumber, theNode);
|
||||
|
||||
for (i = 1; i < MAX_NDB_NODES; i++){
|
||||
if (theFacade->getIsDbNode(i)){
|
||||
theDBnodes[theNoOfDBnodes] = i;
|
||||
theNoOfDBnodes++;
|
||||
}
|
||||
}
|
||||
|
||||
theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+((Uint64)theNode << 40);
|
||||
theFirstTransId += theFacade->m_max_trans_id;
|
||||
theFacade->unlock_mutex();
|
||||
|
||||
|
||||
theDictionary = new NdbDictionaryImpl(*this);
|
||||
if (theDictionary == NULL) {
|
||||
theError.code = 4000;
|
||||
return -1;
|
||||
}
|
||||
theDictionary->setTransporter(this, theFacade);
|
||||
|
||||
aNrOfCon = theNoOfDBnodes;
|
||||
@@ -144,9 +128,6 @@ Ndb::init(int aMaxNoOfTransactions)
|
||||
theSentTransactionsArray[i] = NULL;
|
||||
theCompletedTransactionsArray[i] = NULL;
|
||||
}//for
|
||||
|
||||
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
|
||||
|
||||
for (i = 0; i < 16; i++){
|
||||
tSignal[i] = getSignal();
|
||||
if(tSignal[i] == NULL) {
|
||||
@@ -156,11 +137,8 @@ Ndb::init(int aMaxNoOfTransactions)
|
||||
}
|
||||
for (i = 0; i < 16; i++)
|
||||
releaseSignal(tSignal[i]);
|
||||
|
||||
theInitState = Initialised;
|
||||
|
||||
theCommitAckSignal = new NdbApiSignal(theMyRef);
|
||||
return 0;
|
||||
DBUG_RETURN(0);
|
||||
|
||||
error_handler:
|
||||
ndbout << "error_handler" << endl;
|
||||
@@ -176,12 +154,13 @@ error_handler:
|
||||
|
||||
delete theDictionary;
|
||||
TransporterFacade::instance()->close(theNdbBlockNumber, 0);
|
||||
return -1;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
void
|
||||
Ndb::releaseTransactionArrays()
|
||||
{
|
||||
DBUG_ENTER("Ndb::releaseTransactionArrays");
|
||||
if (thePreparedTransactionsArray != NULL) {
|
||||
delete [] thePreparedTransactionsArray;
|
||||
}//if
|
||||
@@ -191,6 +170,7 @@ Ndb::releaseTransactionArrays()
|
||||
if (theCompletedTransactionsArray != NULL) {
|
||||
delete [] theCompletedTransactionsArray;
|
||||
}//if
|
||||
DBUG_VOID_RETURN;
|
||||
}//Ndb::releaseTransactionArrays()
|
||||
|
||||
void
|
||||
@@ -202,13 +182,46 @@ Ndb::executeMessage(void* NdbObject,
|
||||
tNdb->handleReceivedSignal(aSignal, ptr);
|
||||
}
|
||||
|
||||
void
|
||||
Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete)
|
||||
void Ndb::connected(Uint32 ref)
|
||||
{
|
||||
theMyRef= ref;
|
||||
theNode= refToNode(theMyRef);
|
||||
if (theNdbBlockNumber >= 0)
|
||||
assert(theMyRef == numberToRef(theNdbBlockNumber, theNode));
|
||||
|
||||
TransporterFacade * theFacade = TransporterFacade::instance();
|
||||
int i;
|
||||
theNoOfDBnodes= 0;
|
||||
for (i = 1; i < MAX_NDB_NODES; i++){
|
||||
if (theFacade->getIsDbNode(i)){
|
||||
theDBnodes[theNoOfDBnodes] = i;
|
||||
theNoOfDBnodes++;
|
||||
}
|
||||
}
|
||||
theFirstTransId = ((Uint64)theNdbBlockNumber << 52)+
|
||||
((Uint64)theNode << 40);
|
||||
theFirstTransId += theFacade->m_max_trans_id;
|
||||
// assert(0);
|
||||
DBUG_PRINT("info",("connected with ref=%x, id=%d, no_db_nodes=%d, first_trans_id=%d",
|
||||
theMyRef,
|
||||
theNode,
|
||||
theNoOfDBnodes,
|
||||
theFirstTransId));
|
||||
startTransactionNodeSelectionData.init(theNoOfDBnodes, theDBnodes);
|
||||
theCommitAckSignal = new NdbApiSignal(theMyRef);
|
||||
|
||||
theDictionary->m_receiver.m_reference= theMyRef;
|
||||
}
|
||||
|
||||
void
|
||||
Ndb::statusMessage(void* NdbObject, Uint32 a_node, bool alive, bool nfComplete)
|
||||
{
|
||||
DBUG_ENTER("Ndb::statusMessage");
|
||||
Ndb* tNdb = (Ndb*)NdbObject;
|
||||
if (alive) {
|
||||
if (nfComplete) {
|
||||
assert(0);
|
||||
tNdb->connected(a_node);
|
||||
DBUG_VOID_RETURN;
|
||||
}//if
|
||||
} else {
|
||||
if (nfComplete) {
|
||||
@@ -219,6 +232,7 @@ Ndb::statusMessage(void* NdbObject, NodeId a_node, bool alive, bool nfComplete)
|
||||
}//if
|
||||
NdbDictInterface::execNodeStatus(&tNdb->theDictionary->m_receiver,
|
||||
a_node, alive, nfComplete);
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
|
@@ -42,6 +42,7 @@ void NdbGlobalEventBuffer_drop(NdbGlobalEventBufferHandle *);
|
||||
static int theNoOfNdbObjects = 0;
|
||||
|
||||
static char *ndbConnectString = 0;
|
||||
static Ndb_cluster_connection *global_ndb_cluster_connection= 0;
|
||||
|
||||
#if defined NDB_WIN32 || defined SCO
|
||||
static NdbMutex & createNdbMutex = * NdbMutex_Create();
|
||||
@@ -56,45 +57,74 @@ Ndb(const char* aDataBase);
|
||||
Parameters: aDataBase : Name of the database.
|
||||
Remark: Connect to the database.
|
||||
***************************************************************************/
|
||||
Ndb::Ndb( const char* aDataBase , const char* aSchema) :
|
||||
theNdbObjectIdMap(0),
|
||||
thePreparedTransactionsArray(NULL),
|
||||
theSentTransactionsArray(NULL),
|
||||
theCompletedTransactionsArray(NULL),
|
||||
theNoOfPreparedTransactions(0),
|
||||
theNoOfSentTransactions(0),
|
||||
theNoOfCompletedTransactions(0),
|
||||
theNoOfAllocatedTransactions(0),
|
||||
theMaxNoOfTransactions(0),
|
||||
theMinNoOfEventsToWakeUp(0),
|
||||
prefixEnd(NULL),
|
||||
theImpl(NULL),
|
||||
theDictionary(NULL),
|
||||
theConIdleList(NULL),
|
||||
theOpIdleList(NULL),
|
||||
theScanOpIdleList(NULL),
|
||||
theIndexOpIdleList(NULL),
|
||||
// theSchemaConIdleList(NULL),
|
||||
// theSchemaConToNdbList(NULL),
|
||||
theTransactionList(NULL),
|
||||
theConnectionArray(NULL),
|
||||
theRecAttrIdleList(NULL),
|
||||
theSignalIdleList(NULL),
|
||||
theLabelList(NULL),
|
||||
theBranchList(NULL),
|
||||
theSubroutineList(NULL),
|
||||
theCallList(NULL),
|
||||
theScanList(NULL),
|
||||
theNdbBlobIdleList(NULL),
|
||||
theNoOfDBnodes(0),
|
||||
theDBnodes(NULL),
|
||||
the_release_ind(NULL),
|
||||
the_last_check_time(0),
|
||||
theFirstTransId(0),
|
||||
theRestartGCI(0),
|
||||
theNdbBlockNumber(-1),
|
||||
theInitState(NotConstructed)
|
||||
Ndb::Ndb( const char* aDataBase , const char* aSchema) {
|
||||
if (global_ndb_cluster_connection == 0) {
|
||||
if (theNoOfNdbObjects > 0)
|
||||
abort(); // old and new Ndb constructor used mixed
|
||||
global_ndb_cluster_connection= new Ndb_cluster_connection(ndbConnectString);
|
||||
global_ndb_cluster_connection->connect();
|
||||
}
|
||||
setup(global_ndb_cluster_connection, aDataBase, aSchema);
|
||||
}
|
||||
|
||||
Ndb::Ndb( Ndb_cluster_connection *ndb_cluster_connection,
|
||||
const char* aDataBase , const char* aSchema)
|
||||
{
|
||||
if (global_ndb_cluster_connection != 0 &&
|
||||
global_ndb_cluster_connection != ndb_cluster_connection)
|
||||
abort(); // old and new Ndb constructor used mixed
|
||||
setup(ndb_cluster_connection, aDataBase, aSchema);
|
||||
}
|
||||
|
||||
void Ndb::setup(Ndb_cluster_connection *ndb_cluster_connection,
|
||||
const char* aDataBase , const char* aSchema)
|
||||
{
|
||||
DBUG_ENTER("Ndb::setup");
|
||||
|
||||
theNdbObjectIdMap= 0;
|
||||
m_ndb_cluster_connection= ndb_cluster_connection;
|
||||
thePreparedTransactionsArray= NULL;
|
||||
theSentTransactionsArray= NULL;
|
||||
theCompletedTransactionsArray= NULL;
|
||||
theNoOfPreparedTransactions= 0;
|
||||
theNoOfSentTransactions= 0;
|
||||
theNoOfCompletedTransactions= 0;
|
||||
theNoOfAllocatedTransactions= 0;
|
||||
theMaxNoOfTransactions= 0;
|
||||
theMinNoOfEventsToWakeUp= 0;
|
||||
prefixEnd= NULL;
|
||||
theImpl= NULL;
|
||||
theDictionary= NULL;
|
||||
theConIdleList= NULL;
|
||||
theOpIdleList= NULL;
|
||||
theScanOpIdleList= NULL;
|
||||
theIndexOpIdleList= NULL;
|
||||
// theSchemaConIdleList= NULL;
|
||||
// theSchemaConToNdbList= NULL;
|
||||
theTransactionList= NULL;
|
||||
theConnectionArray= NULL;
|
||||
theRecAttrIdleList= NULL;
|
||||
theSignalIdleList= NULL;
|
||||
theLabelList= NULL;
|
||||
theBranchList= NULL;
|
||||
theSubroutineList= NULL;
|
||||
theCallList= NULL;
|
||||
theScanList= NULL;
|
||||
theNdbBlobIdleList= NULL;
|
||||
theNoOfDBnodes= 0;
|
||||
theDBnodes= NULL;
|
||||
the_release_ind= NULL;
|
||||
the_last_check_time= 0;
|
||||
theFirstTransId= 0;
|
||||
theRestartGCI= 0;
|
||||
theNdbBlockNumber= -1;
|
||||
theInitState= NotConstructed;
|
||||
|
||||
theNode= 0;
|
||||
theFirstTransId= 0;
|
||||
theMyRef= 0;
|
||||
theNoOfDBnodes= 0;
|
||||
|
||||
fullyQualifiedNames = true;
|
||||
|
||||
cgetSignals =0;
|
||||
@@ -135,17 +165,7 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) :
|
||||
|
||||
NdbMutex_Lock(&createNdbMutex);
|
||||
|
||||
TransporterFacade * m_facade = 0;
|
||||
if(theNoOfNdbObjects == 0){
|
||||
if ((m_facade = TransporterFacade::start_instance(ndbConnectString)) == 0)
|
||||
theInitState = InitConfigError;
|
||||
} else {
|
||||
m_facade = TransporterFacade::instance();
|
||||
}
|
||||
|
||||
if(m_facade != 0){
|
||||
theWaiter.m_mutex = m_facade->theMutexPtr;
|
||||
}
|
||||
theWaiter.m_mutex = TransporterFacade::instance()->theMutexPtr;
|
||||
|
||||
// For keeping track of how many Ndb objects that exists.
|
||||
theNoOfNdbObjects += 1;
|
||||
@@ -167,6 +187,13 @@ Ndb::Ndb( const char* aDataBase , const char* aSchema) :
|
||||
}
|
||||
|
||||
NdbMutex_Unlock(&createNdbMutex);
|
||||
|
||||
theDictionary = new NdbDictionaryImpl(*this);
|
||||
if (theDictionary == NULL) {
|
||||
ndbout_c("Ndb cailed to allocate dictionary");
|
||||
exit(-1);
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
@@ -187,6 +214,7 @@ void Ndb::setConnectString(const char * connectString)
|
||||
*****************************************************************************/
|
||||
Ndb::~Ndb()
|
||||
{
|
||||
DBUG_ENTER("Ndb::~Ndb()");
|
||||
doDisconnect();
|
||||
|
||||
delete theDictionary;
|
||||
@@ -203,6 +231,10 @@ Ndb::~Ndb()
|
||||
theNoOfNdbObjects -= 1;
|
||||
if(theNoOfNdbObjects == 0){
|
||||
TransporterFacade::stop_instance();
|
||||
if (global_ndb_cluster_connection != 0) {
|
||||
delete global_ndb_cluster_connection;
|
||||
global_ndb_cluster_connection= 0;
|
||||
}
|
||||
}//if
|
||||
|
||||
NdbMutex_Unlock(&createNdbMutex);
|
||||
@@ -271,6 +303,7 @@ Ndb::~Ndb()
|
||||
assert(cnewSignals == cfreeSignals);
|
||||
assert(cgetSignals == creleaseSignals);
|
||||
#endif
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
NdbWaiter::NdbWaiter(){
|
||||
|
@@ -783,6 +783,7 @@ Remark: Release and disconnect from DBTC a connection and seize it to th
|
||||
void
|
||||
Ndb::releaseConnectToNdb(NdbConnection* a_con)
|
||||
{
|
||||
DBUG_ENTER("Ndb::releaseConnectToNdb");
|
||||
NdbApiSignal tSignal(theMyRef);
|
||||
int tConPtr;
|
||||
|
||||
@@ -790,7 +791,7 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con)
|
||||
// manage to reach NDB or not.
|
||||
|
||||
if (a_con == NULL)
|
||||
return;
|
||||
DBUG_VOID_RETURN;
|
||||
|
||||
Uint32 node_id = a_con->getConnectedNodeId();
|
||||
Uint32 conn_seq = a_con->theNodeSequence;
|
||||
@@ -821,6 +822,6 @@ Ndb::releaseConnectToNdb(NdbConnection* a_con)
|
||||
abort();
|
||||
}//if
|
||||
releaseNdbCon(a_con);
|
||||
return;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
@@ -15,6 +15,7 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
#include <ndb_limits.h>
|
||||
#include "TransporterFacade.hpp"
|
||||
#include "ClusterMgr.hpp"
|
||||
@@ -37,6 +38,16 @@
|
||||
//#define REPORT_TRANSPORTER
|
||||
//#define API_TRACE;
|
||||
|
||||
static int numberToIndex(int number)
|
||||
{
|
||||
return number - MIN_API_BLOCK_NO;
|
||||
}
|
||||
|
||||
static int indexToNumber(int index)
|
||||
{
|
||||
return index + MIN_API_BLOCK_NO;
|
||||
}
|
||||
|
||||
#if defined DEBUG_TRANSPORTER
|
||||
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
|
||||
#else
|
||||
@@ -44,8 +55,6 @@
|
||||
#endif
|
||||
|
||||
TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
|
||||
ConfigRetriever *TransporterFacade::s_config_retriever= 0;
|
||||
|
||||
|
||||
/*****************************************************************************
|
||||
* Call back functions
|
||||
@@ -321,12 +330,6 @@ copy(Uint32 * & insertPtr,
|
||||
abort();
|
||||
}
|
||||
|
||||
extern "C"
|
||||
void
|
||||
atexit_stop_instance(){
|
||||
TransporterFacade::stop_instance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that this function need no locking since its
|
||||
* only called from the constructor of Ndb (the NdbObject)
|
||||
@@ -334,64 +337,14 @@ atexit_stop_instance(){
|
||||
* Which is protected by a mutex
|
||||
*/
|
||||
|
||||
|
||||
TransporterFacade*
|
||||
TransporterFacade::start_instance(const char * connectString){
|
||||
|
||||
// TransporterFacade used from API get config from mgmt srvr
|
||||
s_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
|
||||
|
||||
s_config_retriever->setConnectString(connectString);
|
||||
const char* error = 0;
|
||||
do {
|
||||
if(s_config_retriever->init() == -1)
|
||||
break;
|
||||
|
||||
if(s_config_retriever->do_connect() == -1)
|
||||
break;
|
||||
|
||||
Uint32 nodeId = s_config_retriever->allocNodeId();
|
||||
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
|
||||
NdbSleep_SecSleep(3);
|
||||
nodeId = s_config_retriever->allocNodeId();
|
||||
}
|
||||
if(nodeId == 0)
|
||||
break;
|
||||
|
||||
ndb_mgm_configuration * props = s_config_retriever->getConfig();
|
||||
if(props == 0)
|
||||
break;
|
||||
|
||||
TransporterFacade * tf = start_instance(nodeId, props);
|
||||
|
||||
free(props);
|
||||
return tf;
|
||||
} while(0);
|
||||
|
||||
ndbout << "Configuration error: ";
|
||||
const char* erString = s_config_retriever->getErrorString();
|
||||
if (erString == 0) {
|
||||
erString = "No error specified!";
|
||||
}
|
||||
ndbout << erString << endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
TransporterFacade*
|
||||
int
|
||||
TransporterFacade::start_instance(int nodeId,
|
||||
const ndb_mgm_configuration* props)
|
||||
{
|
||||
TransporterFacade* tf = new TransporterFacade();
|
||||
if (! tf->init(nodeId, props)) {
|
||||
delete tf;
|
||||
return NULL;
|
||||
if (! theFacadeInstance->init(nodeId, props)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Install atexit handler
|
||||
*/
|
||||
atexit(atexit_stop_instance);
|
||||
|
||||
/**
|
||||
* Install signal handler for SIGPIPE
|
||||
*
|
||||
@@ -402,19 +355,7 @@ TransporterFacade::start_instance(int nodeId,
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
#endif
|
||||
|
||||
if(theFacadeInstance == NULL){
|
||||
theFacadeInstance = tf;
|
||||
}
|
||||
|
||||
return tf;
|
||||
}
|
||||
|
||||
void
|
||||
TransporterFacade::close_configuration(){
|
||||
if (s_config_retriever) {
|
||||
delete s_config_retriever;
|
||||
s_config_retriever= 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -425,23 +366,15 @@ TransporterFacade::close_configuration(){
|
||||
*/
|
||||
void
|
||||
TransporterFacade::stop_instance(){
|
||||
|
||||
close_configuration();
|
||||
|
||||
if(theFacadeInstance == NULL){
|
||||
/**
|
||||
* We are called from atexit function
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
DBUG_ENTER("TransporterFacade::stop_instance");
|
||||
if(theFacadeInstance)
|
||||
theFacadeInstance->doStop();
|
||||
|
||||
delete theFacadeInstance; theFacadeInstance = NULL;
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
TransporterFacade::doStop(){
|
||||
DBUG_ENTER("TransporterFacade::doStop");
|
||||
/**
|
||||
* First stop the ClusterMgr because it needs to send one more signal
|
||||
* and also uses theFacadeInstance to lock/unlock theMutexPtr
|
||||
@@ -454,17 +387,26 @@ TransporterFacade::doStop(){
|
||||
*/
|
||||
void *status;
|
||||
theStopReceive = 1;
|
||||
if (theReceiveThread) {
|
||||
NdbThread_WaitFor(theReceiveThread, &status);
|
||||
NdbThread_WaitFor(theSendThread, &status);
|
||||
NdbThread_Destroy(&theReceiveThread);
|
||||
theReceiveThread= 0;
|
||||
}
|
||||
if (theSendThread) {
|
||||
NdbThread_WaitFor(theSendThread, &status);
|
||||
NdbThread_Destroy(&theSendThread);
|
||||
theSendThread= 0;
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
extern "C"
|
||||
void*
|
||||
runSendRequest_C(void * me)
|
||||
{
|
||||
my_thread_init();
|
||||
((TransporterFacade*) me)->threadMainSend();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
@@ -507,7 +449,9 @@ extern "C"
|
||||
void*
|
||||
runReceiveResponse_C(void * me)
|
||||
{
|
||||
my_thread_init();
|
||||
((TransporterFacade*) me)->threadMainReceive();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
@@ -540,6 +484,8 @@ TransporterFacade::TransporterFacade() :
|
||||
theSendThread(NULL),
|
||||
theReceiveThread(NULL)
|
||||
{
|
||||
theOwnId = 0;
|
||||
|
||||
theMutexPtr = NdbMutex_Create();
|
||||
sendPerformedLastInterval = 0;
|
||||
|
||||
@@ -552,6 +498,8 @@ TransporterFacade::TransporterFacade() :
|
||||
m_batch_byte_size= SCAN_BATCH_SIZE;
|
||||
m_batch_size= DEF_BATCH_SIZE;
|
||||
m_max_trans_id = 0;
|
||||
|
||||
theClusterMgr = new ClusterMgr(* this);
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -570,7 +518,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
|
||||
|
||||
ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
|
||||
iter.first();
|
||||
theClusterMgr = new ClusterMgr(* this);
|
||||
theClusterMgr->init(iter);
|
||||
|
||||
/**
|
||||
@@ -622,7 +569,6 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
|
||||
32768,
|
||||
"ndb_send",
|
||||
NDB_THREAD_PRIO_LOW);
|
||||
|
||||
theClusterMgr->startThread();
|
||||
|
||||
#ifdef API_TRACE
|
||||
@@ -633,6 +579,21 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
TransporterFacade::connected()
|
||||
{
|
||||
DBUG_ENTER("TransporterFacade::connected");
|
||||
Uint32 sz = m_threads.m_statusNext.size();
|
||||
for (Uint32 i = 0; i < sz ; i ++) {
|
||||
if (m_threads.getInUse(i)){
|
||||
void * obj = m_threads.m_objectExecute[i].m_object;
|
||||
NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
|
||||
(*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
|
||||
}
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void
|
||||
TransporterFacade::ReportNodeDead(NodeId tNodeId)
|
||||
{
|
||||
@@ -719,7 +680,16 @@ TransporterFacade::open(void* objRef,
|
||||
ExecuteFunction fun,
|
||||
NodeStatusFunction statusFun)
|
||||
{
|
||||
return m_threads.open(objRef, fun, statusFun);
|
||||
DBUG_ENTER("TransporterFacade::open");
|
||||
int r= m_threads.open(objRef, fun, statusFun);
|
||||
if (r < 0)
|
||||
DBUG_RETURN(r);
|
||||
#if 1
|
||||
if (theOwnId > 0) {
|
||||
(*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
|
||||
}
|
||||
#endif
|
||||
DBUG_RETURN(r);
|
||||
}
|
||||
|
||||
TransporterFacade::~TransporterFacade(){
|
||||
@@ -762,7 +732,7 @@ TransporterFacade::calculateSendLimit()
|
||||
//-------------------------------------------------
|
||||
void TransporterFacade::forceSend(Uint32 block_number) {
|
||||
checkCounter--;
|
||||
m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE;
|
||||
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
|
||||
sendPerformedLastInterval = 1;
|
||||
if (checkCounter < 0) {
|
||||
calculateSendLimit();
|
||||
@@ -775,7 +745,7 @@ void TransporterFacade::forceSend(Uint32 block_number) {
|
||||
//-------------------------------------------------
|
||||
void
|
||||
TransporterFacade::checkForceSend(Uint32 block_number) {
|
||||
m_threads.m_statusNext[block_number - MIN_API_BLOCK_NO] = ThreadData::ACTIVE;
|
||||
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
|
||||
//-------------------------------------------------
|
||||
// This code is an adaptive algorithm to discover when
|
||||
// the API should actually send its buffers. The reason
|
||||
@@ -1016,11 +986,12 @@ TransporterFacade::ThreadData::expand(Uint32 size){
|
||||
m_firstFree = m_statusNext.size() - size;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
TransporterFacade::ThreadData::open(void* objRef,
|
||||
ExecuteFunction fun,
|
||||
NodeStatusFunction fun2){
|
||||
|
||||
NodeStatusFunction fun2)
|
||||
{
|
||||
Uint32 nextFree = m_firstFree;
|
||||
|
||||
if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
|
||||
@@ -1040,12 +1011,12 @@ TransporterFacade::ThreadData::open(void* objRef,
|
||||
m_objectExecute[nextFree] = oe;
|
||||
m_statusFunction[nextFree] = fun2;
|
||||
|
||||
return nextFree + MIN_API_BLOCK_NO;
|
||||
return indexToNumber(nextFree);
|
||||
}
|
||||
|
||||
int
|
||||
TransporterFacade::ThreadData::close(int number){
|
||||
number -= MIN_API_BLOCK_NO;
|
||||
number= numberToIndex(number);
|
||||
assert(getInUse(number));
|
||||
m_statusNext[number] = m_firstFree;
|
||||
m_firstFree = number;
|
||||
|
@@ -35,7 +35,7 @@ class Ndb;
|
||||
class NdbApiSignal;
|
||||
|
||||
typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
|
||||
typedef void (* NodeStatusFunction)(void *, NodeId, bool nodeAlive, bool nfComplete);
|
||||
typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
|
||||
|
||||
extern "C" {
|
||||
void* runSendRequest_C(void*);
|
||||
@@ -55,9 +55,7 @@ public:
|
||||
bool init(Uint32, const ndb_mgm_configuration *);
|
||||
|
||||
static TransporterFacade* instance();
|
||||
static TransporterFacade* start_instance(int, const ndb_mgm_configuration*);
|
||||
static TransporterFacade* start_instance(const char *connectString);
|
||||
static void close_configuration();
|
||||
int start_instance(int, const ndb_mgm_configuration*);
|
||||
static void stop_instance();
|
||||
|
||||
/**
|
||||
@@ -93,6 +91,8 @@ public:
|
||||
// My own processor id
|
||||
NodeId ownId() const;
|
||||
|
||||
void connected();
|
||||
|
||||
void doConnect(int NodeId);
|
||||
void reportConnected(int NodeId);
|
||||
void doDisconnect(int NodeId);
|
||||
@@ -130,6 +130,7 @@ private:
|
||||
friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
|
||||
friend class GrepSS;
|
||||
friend class Ndb;
|
||||
friend class Ndb_cluster_connection;
|
||||
|
||||
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
|
||||
|
||||
|
156
ndb/src/ndbapi/ndb_cluster_connection.cpp
Normal file
156
ndb/src/ndbapi/ndb_cluster_connection.cpp
Normal file
@@ -0,0 +1,156 @@
|
||||
/* Copyright (C) 2003 MySQL AB
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_pthread.h>
|
||||
|
||||
#include <ndb_cluster_connection.hpp>
|
||||
#include <TransporterFacade.hpp>
|
||||
#include <NdbOut.hpp>
|
||||
#include <NdbSleep.h>
|
||||
#include <NdbThread.h>
|
||||
#include <ndb_limits.h>
|
||||
#include <ConfigRetriever.hpp>
|
||||
#include <ndb_version.h>
|
||||
|
||||
static int g_run_connect_thread= 0;
|
||||
|
||||
Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
|
||||
{
|
||||
m_facade= TransporterFacade::theFacadeInstance= new TransporterFacade();
|
||||
if (connect_string)
|
||||
m_connect_string= strdup(connect_string);
|
||||
else
|
||||
m_connect_string= 0;
|
||||
m_config_retriever= 0;
|
||||
m_connect_thread= 0;
|
||||
m_connect_callback= 0;
|
||||
}
|
||||
|
||||
extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
|
||||
{
|
||||
my_thread_init();
|
||||
g_run_connect_thread= 1;
|
||||
((Ndb_cluster_connection*) me)->connect_thread();
|
||||
my_thread_end();
|
||||
NdbThread_Exit(0);
|
||||
return me;
|
||||
}
|
||||
|
||||
void Ndb_cluster_connection::connect_thread()
|
||||
{
|
||||
DBUG_ENTER("Ndb_cluster_connection::connect_thread");
|
||||
int r;
|
||||
while (g_run_connect_thread) {
|
||||
if ((r = connect(1)) == 0)
|
||||
break;
|
||||
if (r == -1) {
|
||||
printf("Ndb_cluster_connection::connect_thread error\n");
|
||||
abort();
|
||||
}
|
||||
}
|
||||
if (m_connect_callback)
|
||||
(*m_connect_callback)();
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
|
||||
{
|
||||
DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
|
||||
m_connect_callback= connect_callback;
|
||||
m_connect_thread= NdbThread_Create(run_ndb_cluster_connection_connect_thread,
|
||||
(void**)this,
|
||||
32768,
|
||||
"ndb_cluster_connection",
|
||||
NDB_THREAD_PRIO_LOW);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
int Ndb_cluster_connection::connect(int reconnect)
|
||||
{
|
||||
DBUG_ENTER("Ndb_cluster_connection::connect");
|
||||
const char* error = 0;
|
||||
do {
|
||||
if (m_config_retriever == 0)
|
||||
{
|
||||
m_config_retriever= new ConfigRetriever(NDB_VERSION, NODE_TYPE_API);
|
||||
m_config_retriever->setConnectString(m_connect_string);
|
||||
if(m_config_retriever->init() == -1)
|
||||
break;
|
||||
}
|
||||
else
|
||||
if (reconnect == 0)
|
||||
DBUG_RETURN(0);
|
||||
if (reconnect)
|
||||
{
|
||||
int r= m_config_retriever->do_connect(1);
|
||||
if (r == 1)
|
||||
DBUG_RETURN(1); // mgmt server not up yet
|
||||
if (r == -1)
|
||||
break;
|
||||
}
|
||||
else
|
||||
if(m_config_retriever->do_connect() == -1)
|
||||
break;
|
||||
Uint32 nodeId = m_config_retriever->allocNodeId();
|
||||
for(Uint32 i = 0; nodeId == 0 && i<5; i++){
|
||||
NdbSleep_SecSleep(3);
|
||||
nodeId = m_config_retriever->allocNodeId();
|
||||
}
|
||||
if(nodeId == 0)
|
||||
break;
|
||||
ndb_mgm_configuration * props = m_config_retriever->getConfig();
|
||||
if(props == 0)
|
||||
break;
|
||||
m_facade->start_instance(nodeId, props);
|
||||
free(props);
|
||||
m_facade->connected();
|
||||
DBUG_RETURN(0);
|
||||
} while(0);
|
||||
|
||||
ndbout << "Configuration error: ";
|
||||
const char* erString = m_config_retriever->getErrorString();
|
||||
if (erString == 0) {
|
||||
erString = "No error specified!";
|
||||
}
|
||||
ndbout << erString << endl;
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
Ndb_cluster_connection::~Ndb_cluster_connection()
|
||||
{
|
||||
if (m_connect_thread)
|
||||
{
|
||||
void *status;
|
||||
g_run_connect_thread= 0;
|
||||
NdbThread_WaitFor(m_connect_thread, &status);
|
||||
NdbThread_Destroy(&m_connect_thread);
|
||||
m_connect_thread= 0;
|
||||
}
|
||||
if (m_facade != 0)
|
||||
{
|
||||
delete m_facade;
|
||||
if (m_facade != TransporterFacade::theFacadeInstance)
|
||||
abort();
|
||||
TransporterFacade::theFacadeInstance= 0;
|
||||
}
|
||||
if (m_connect_string)
|
||||
free(m_connect_string);
|
||||
if (m_config_retriever)
|
||||
delete m_config_retriever;
|
||||
}
|
||||
|
||||
|
@@ -22,12 +22,13 @@
|
||||
*/
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_sys.h>
|
||||
#include <getarg.h>
|
||||
|
||||
#include <NdbApi.hpp>
|
||||
#include <NDBT.hpp>
|
||||
|
||||
|
||||
static Ndb_cluster_connection *ndb_cluster_connection= 0;
|
||||
static Ndb* ndb = 0;
|
||||
static NdbDictionary::Dictionary* dic = 0;
|
||||
static int _unqualified = 0;
|
||||
@@ -48,6 +49,22 @@ fatal(char const* fmt, ...)
|
||||
exit(1);
|
||||
}
|
||||
|
||||
static void
|
||||
fatal_dict(char const* fmt, ...)
|
||||
{
|
||||
va_list ap;
|
||||
char buf[500];
|
||||
va_start(ap, fmt);
|
||||
vsnprintf(buf, sizeof(buf), fmt, ap);
|
||||
va_end(ap);
|
||||
ndbout << buf;
|
||||
if (dic)
|
||||
ndbout << " - " << dic->getNdbError();
|
||||
ndbout << endl;
|
||||
NDBT_ProgramExit(NDBT_FAILED);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
static void
|
||||
list(const char * tabname,
|
||||
NdbDictionary::Object::Type type)
|
||||
@@ -55,10 +72,10 @@ list(const char * tabname,
|
||||
NdbDictionary::Dictionary::List list;
|
||||
if (tabname == 0) {
|
||||
if (dic->listObjects(list, type) == -1)
|
||||
fatal("listObjects");
|
||||
fatal_dict("listObjects");
|
||||
} else {
|
||||
if (dic->listIndexes(list, tabname) == -1)
|
||||
fatal("listIndexes");
|
||||
fatal_dict("listIndexes");
|
||||
}
|
||||
if (ndb->usingFullyQualifiedNames())
|
||||
ndbout_c("%-5s %-20s %-8s %-7s %-12s %-8s %s", "id", "type", "state", "logging", "database", "schema", "name");
|
||||
@@ -145,12 +162,17 @@ list(const char * tabname,
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
const char *debug_option= 0;
|
||||
#endif
|
||||
|
||||
int main(int argc, const char** argv){
|
||||
int _loops = 1;
|
||||
const char* _tabname = NULL;
|
||||
const char* _dbname = "TEST_DB";
|
||||
int _type = 0;
|
||||
int _help = 0;
|
||||
const char* _connect_str = NULL;
|
||||
|
||||
struct getargs args[] = {
|
||||
{ "loops", 'l', arg_integer, &_loops, "loops",
|
||||
@@ -161,6 +183,13 @@ int main(int argc, const char** argv){
|
||||
"Name of database table is in"},
|
||||
{ "type", 't', arg_integer, &_type, "type",
|
||||
"Type of objects to show, see NdbDictionary.hpp for numbers(default = 0)" },
|
||||
{ "connect-string", 'c', arg_string, &_connect_str,
|
||||
"Set connect string for connecting to ndb_mgmd. <constr>=\"host=<hostname:port>[;nodeid=<id>]\". Overides specifying entries in NDB_CONNECTSTRING and config file",
|
||||
"<constr>" },
|
||||
#ifndef DBUG_OFF
|
||||
{ "debug", 0, arg_string, &debug_option,
|
||||
"Specify debug options e.g. d:t:i:o,out.trace", "options" },
|
||||
#endif
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" }
|
||||
};
|
||||
int num_args = sizeof(args) / sizeof(args[0]);
|
||||
@@ -179,10 +208,18 @@ int main(int argc, const char** argv){
|
||||
}
|
||||
_tabname = argv[optind];
|
||||
|
||||
ndb = new Ndb(_dbname);
|
||||
#ifndef DBUG_OFF
|
||||
my_init();
|
||||
if (debug_option)
|
||||
DBUG_PUSH(debug_option);
|
||||
#endif
|
||||
|
||||
ndb_cluster_connection = new Ndb_cluster_connection(_connect_str);
|
||||
ndb = new Ndb(ndb_cluster_connection, _dbname);
|
||||
ndb->useFullyQualifiedNames(!_unqualified);
|
||||
if (ndb->init() != 0)
|
||||
fatal("init");
|
||||
ndb_cluster_connection->connect();
|
||||
if (ndb->waitUntilReady(30) < 0)
|
||||
fatal("waitUntilReady");
|
||||
dic = ndb->getDictionary();
|
||||
|
@@ -16,6 +16,7 @@
|
||||
|
||||
|
||||
#include <ndb_global.h>
|
||||
#include <my_sys.h>
|
||||
|
||||
#include <NdbOut.hpp>
|
||||
|
||||
@@ -26,6 +27,9 @@
|
||||
#include <getarg.h>
|
||||
#include <NdbScanFilter.hpp>
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
const char *debug_option= 0;
|
||||
#endif
|
||||
|
||||
int scanReadRecords(Ndb*,
|
||||
const NdbDictionary::Table*,
|
||||
@@ -58,6 +62,10 @@ int main(int argc, const char** argv){
|
||||
"Output numbers in hexadecimal format", "useHexFormat" },
|
||||
{ "delimiter", 'd', arg_string, &_delimiter, "Column delimiter",
|
||||
"delimiter" },
|
||||
#ifndef DBUG_OFF
|
||||
{ "debug", 0, arg_string, &debug_option,
|
||||
"Specify debug options e.g. d:t:i:o,out.trace", "options" },
|
||||
#endif
|
||||
{ "usage", '?', arg_flag, &_help, "Print help", "" },
|
||||
{ "lock", 'l', arg_integer, &_lock,
|
||||
"Read(0), Read-hold(1), Exclusive(2)", "lock"},
|
||||
@@ -80,6 +88,12 @@ int main(int argc, const char** argv){
|
||||
}
|
||||
_tabname = argv[optind];
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
my_init();
|
||||
if (debug_option)
|
||||
DBUG_PUSH(debug_option);
|
||||
#endif
|
||||
|
||||
// Connect to Ndb
|
||||
Ndb MyNdb(_dbname);
|
||||
|
||||
|
Reference in New Issue
Block a user