1
0
mirror of https://github.com/MariaDB/server.git synced 2025-12-24 11:21:21 +03:00

Merge perch.ndb.mysql.com:/home/jonas/src/50-work

into  perch.ndb.mysql.com:/home/jonas/src/51-work


storage/ndb/include/ndb_version.h.in:
  Auto merged
storage/ndb/src/common/util/SocketClient.cpp:
  Auto merged
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Auto merged
storage/ndb/include/kernel/GlobalSignalNumbers.h:
  Auto merged
storage/ndb/include/mgmapi/mgmapi.h:
  Auto merged
storage/ndb/include/mgmcommon/ConfigRetriever.hpp:
  Auto merged
storage/ndb/include/util/SocketClient.hpp:
  Auto merged
storage/ndb/src/common/mgmcommon/ConfigRetriever.cpp:
  Auto merged
storage/ndb/src/common/transporter/Transporter.cpp:
  Auto merged
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Auto merged
storage/ndb/src/kernel/vm/Configuration.cpp:
  Auto merged
storage/ndb/src/mgmapi/mgmapi.cpp:
  Auto merged
storage/ndb/include/kernel/signaldata/CopyFrag.hpp:
  merge
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp:
  merge
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
  merge
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  merge
storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp:
  merge
This commit is contained in:
unknown
2006-09-15 11:43:49 +02:00
17 changed files with 268 additions and 31 deletions

View File

@@ -127,6 +127,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
/* 68 not unused */
/* 69 not unused */
/* 70 unused */
#define GSN_UPDATE_FRAG_DIST_KEY_ORD 70
#define GSN_ACC_ABORTREQ 71
#define GSN_ACC_CHECK_SCAN 72
#define GSN_ACC_COMMITCONF 73

View File

@@ -30,7 +30,7 @@ class CopyFragReq {
*/
friend class Dblqh;
public:
STATIC_CONST( SignalLength = 8 );
STATIC_CONST( SignalLength = 9 );
private:
Uint32 userPtr;
@@ -41,6 +41,8 @@ private:
Uint32 schemaVersion;
Uint32 distributionKey;
Uint32 gci;
Uint32 nodeCount;
Uint32 nodeList[1];
};
class CopyFragConf {
@@ -85,4 +87,13 @@ private:
Uint32 errorCode;
};
struct UpdateFragDistKeyOrd
{
Uint32 tableId;
Uint32 fragId;
Uint32 fragDistributionKey;
STATIC_CONST( SignalLength = 3 );
};
#endif

View File

@@ -230,7 +230,9 @@ extern "C" {
NDB_MGM_SERVER_NOT_CONNECTED = 1010,
/** Could not connect to socker */
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
/** Could not bind local address */
NDB_MGM_BIND_ADDRESS = 1012,
/* Alloc node id failures */
/** Generic error, retry may succeed */
NDB_MGM_ALLOCID_ERROR = 1101,
@@ -514,6 +516,15 @@ extern "C" {
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz);
/**
* Set local bindaddress
* @param arg - Srting of form "host[:port]"
* @note must be called before connect
* @note Error on binding local address will not be reported until connect
* @return 0 on success
*/
int ndb_mgm_set_bindaddress(NdbMgmHandle, const char * arg);
/**
* Gets the connectstring used for a connection
*

View File

@@ -28,7 +28,8 @@
class ConfigRetriever {
public:
ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 nodeType);
Uint32 version, Uint32 nodeType,
const char * _bind_address = 0);
~ConfigRetriever();
int do_connect(int no_retries, int retry_delay_in_seconds, int verbose);

View File

@@ -67,5 +67,7 @@ char ndb_version_string_buf[NDB_VERSION_STRING_BUF_SZ];
#define NDBD_DICT_LOCK_VERSION_5 MAKE_VERSION(5,0,23)
#define NDBD_DICT_LOCK_VERSION_5_1 MAKE_VERSION(5,1,12)
#define NDBD_UPDATE_FRAG_DIST_KEY_50 MAKE_VERSION(5,0,26)
#define NDBD_UPDATE_FRAG_DIST_KEY_51 MAKE_VERSION(5,1,12)
#endif

View File

@@ -37,7 +37,8 @@ public:
};
unsigned short get_port() { return m_port; };
char *get_server_name() { return m_server_name; };
NDB_SOCKET_TYPE connect();
int bind(const char* toaddress, unsigned short toport);
NDB_SOCKET_TYPE connect(const char* toaddress = 0, unsigned short port = 0);
bool close();
};

View File

@@ -633,5 +633,6 @@ const GsnName SignalNames [] = {
,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
,{ GSN_UPDATE_FRAG_DIST_KEY_ORD, "UPDATE_FRAG_DIST_KEY_ORD" }
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);

View File

@@ -45,7 +45,8 @@
//****************************************************************************
ConfigRetriever::ConfigRetriever(const char * _connect_string,
Uint32 version, Uint32 node_type)
Uint32 version, Uint32 node_type,
const char * _bindaddress)
{
DBUG_ENTER("ConfigRetriever::ConfigRetriever");
@@ -69,6 +70,15 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string,
setError(CR_ERROR, tmp.c_str());
DBUG_VOID_RETURN;
}
if (_bindaddress)
{
if (ndb_mgm_set_bindaddress(m_handle, _bindaddress))
{
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
DBUG_VOID_RETURN;
}
}
resetError();
DBUG_VOID_RETURN;
}

View File

@@ -60,9 +60,6 @@ Transporter::Transporter(TransporterRegistry &t_reg,
}
strncpy(localHostName, lHostName, sizeof(localHostName));
if (strlen(lHostName) > 0)
Ndb_getInAddr(&localHostAddress, lHostName);
DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
remoteNodeId, localNodeId, isServer,
remoteHostName, localHostName,
@@ -128,10 +125,23 @@ Transporter::connect_client() {
return true;
if(isMgmConnection)
{
sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
}
else
{
if (!m_socket_client->init())
{
return false;
}
if (strlen(localHostName) > 0)
{
if (m_socket_client->bind(localHostName, 0) != 0)
return false;
}
sockfd= m_socket_client->connect();
}
return connect_client(sockfd);
}

View File

@@ -25,7 +25,7 @@ SocketClient::SocketClient(const char *server_name, unsigned short port, SocketA
{
m_auth= sa;
m_port= port;
m_server_name= strdup(server_name);
m_server_name= server_name ? strdup(server_name) : 0;
m_sockfd= NDB_INVALID_SOCKET;
}
@@ -45,13 +45,16 @@ SocketClient::init()
if (m_sockfd != NDB_INVALID_SOCKET)
NDB_CLOSE_SOCKET(m_sockfd);
memset(&m_servaddr, 0, sizeof(m_servaddr));
m_servaddr.sin_family = AF_INET;
m_servaddr.sin_port = htons(m_port);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
return false;
if (m_server_name)
{
memset(&m_servaddr, 0, sizeof(m_servaddr));
m_servaddr.sin_family = AF_INET;
m_servaddr.sin_port = htons(m_port);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
return false;
}
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd == NDB_INVALID_SOCKET) {
return false;
@@ -62,8 +65,45 @@ SocketClient::init()
return true;
}
int
SocketClient::bind(const char* bindaddress, unsigned short localport)
{
if (m_sockfd == NDB_INVALID_SOCKET)
return -1;
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(localport);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&local.sin_addr, bindaddress))
{
return errno ? errno : EINVAL;
}
const int on = 1;
if (setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR,
(const char*)&on, sizeof(on)) == -1) {
int ret = errno;
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= NDB_INVALID_SOCKET;
return errno;
}
if (::bind(m_sockfd, (struct sockaddr*)&local, sizeof(local)) == -1)
{
int ret = errno;
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= NDB_INVALID_SOCKET;
return ret;
}
return 0;
}
NDB_SOCKET_TYPE
SocketClient::connect()
SocketClient::connect(const char *toaddress, unsigned short toport)
{
if (m_sockfd == NDB_INVALID_SOCKET)
{
@@ -74,6 +114,21 @@ SocketClient::connect()
return NDB_INVALID_SOCKET;
}
}
if (toaddress)
{
if (m_server_name)
free(m_server_name);
m_server_name = strdup(toaddress);
m_port = toport;
memset(&m_servaddr, 0, sizeof(m_servaddr));
m_servaddr.sin_family = AF_INET;
m_servaddr.sin_port = htons(toport);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
return NDB_INVALID_SOCKET;
}
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == -1) {
NDB_CLOSE_SOCKET(m_sockfd);

View File

@@ -2876,10 +2876,10 @@ void Dbdict::checkSchemaStatus(Signal* signal)
// On NR get index from master because index state is not on file
Uint32 type= oldEntry->m_tableType;
const bool file = c_systemRestart || !DictTabInfo::isIndex(type);
const bool file = (* newEntry == * oldEntry) &&
(c_systemRestart || !DictTabInfo::isIndex(type);
newEntry->m_info_words= oldEntry->m_info_words;
restartCreateTab(signal, tableId, oldEntry, newEntry, file);
return;
}
ndbrequire(ok);

View File

@@ -3264,7 +3264,10 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal)
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
copyFragReq->distributionKey = fragPtr.p->distributionKey;
copyFragReq->gci = gci;
sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
copyFragReq->nodeCount = extractNodeInfo(fragPtr.p,
copyFragReq->nodeList);
sendSignal(ref, GSN_COPY_FRAGREQ, signal,
CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB);
} else {
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
jam();

View File

@@ -2147,6 +2147,7 @@ private:
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
void execCOPY_FRAGREQ(Signal* signal);
void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
void execCOPY_ACTIVEREQ(Signal* signal);
void execCOPY_STATEREQ(Signal* signal);
void execLQH_TRANSREQ(Signal* signal);

View File

@@ -301,6 +301,9 @@ Dblqh::Dblqh(Block_context& ctx):
addRecSignal(GSN_RESTORE_LCP_REF, &Dblqh::execRESTORE_LCP_REF);
addRecSignal(GSN_RESTORE_LCP_CONF, &Dblqh::execRESTORE_LCP_CONF);
addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD,
&Dblqh::execUPDATE_FRAG_DIST_KEY_ORD);
initData();

View File

@@ -9913,6 +9913,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
tabptr.i = copyFragReq->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
Uint32 i;
const Uint32 fragId = copyFragReq->fragId;
const Uint32 copyPtr = copyFragReq->userPtr;
const Uint32 userRef = copyFragReq->userRef;
@@ -9925,8 +9926,20 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
ndbrequire(cfirstfreeTcConrec != RNIL);
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ?
NDBD_UPDATE_FRAG_DIST_KEY_51 : NDBD_UPDATE_FRAG_DIST_KEY_50;
Uint32 nodeCount = copyFragReq->nodeCount;
NdbNodeBitmask nodemask;
if (getNodeInfo(refToNode(userRef)).m_version >= checkversion)
{
ndbrequire(nodeCount <= MAX_REPLICAS);
for (i = 0; i<nodeCount; i++)
nodemask.set(copyFragReq->nodeList[i]);
}
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
jam();
/**
@@ -10009,9 +10022,42 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
req->savePointId = tcConnectptr.p->savePointId;
sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
AccScanReq::SignalLength, JBB);
if (! nodemask.isclear())
{
ndbrequire(nodemask.get(getOwnNodeId()));
ndbrequire(nodemask.get(nodeId)); // cpy dest
nodemask.clear(getOwnNodeId());
nodemask.clear(nodeId);
UpdateFragDistKeyOrd*
ord = (UpdateFragDistKeyOrd*)signal->getDataPtrSend();
ord->tableId = tabptr.i;
ord->fragId = fragId;
ord->fragDistributionKey = key;
i = 0;
while ((i = nodemask.find(i+1)) != NdbNodeBitmask::NotFound)
{
if (getNodeInfo(i).m_version >= checkversion)
sendSignal(calcLqhBlockRef(i), GSN_UPDATE_FRAG_DIST_KEY_ORD,
signal, UpdateFragDistKeyOrd::SignalLength, JBB);
}
}
return;
}//Dblqh::execCOPY_FRAGREQ()
void
Dblqh::execUPDATE_FRAG_DIST_KEY_ORD(Signal * signal)
{
jamEntry();
UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtr();
tabptr.i = ord->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
ndbrequire(getFragmentrec(signal, ord->fragId));
fragptr.p->fragDistributionKey = ord->fragDistributionKey;
}
void Dblqh::accScanConfCopyLab(Signal* signal)
{
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
@@ -18292,6 +18338,18 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){
infoEvent("Table %d Status: %d Usage: %d",
i, tabPtr.p->tableStatus, tabPtr.p->usageCount);
for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++)
{
FragrecordPtr fragPtr;
if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL)
{
ptrCheckGuard(fragPtr, cfragrecFileSize, fragrecord);
infoEvent(" frag: %d distKey: %u",
tabPtr.p->fragid[j],
fragPtr.p->fragDistributionKey);
}
}
}
}
return;

View File

@@ -58,7 +58,8 @@ NDB_STD_OPTS_VARS;
// XXX should be my_bool ???
static int _daemon, _no_daemon, _foreground, _initial, _no_start;
static int _initialstart;
static const char* _nowait_nodes;
static const char* _nowait_nodes = 0;
static const char* _bind_address = 0;
extern Uint32 g_start_type;
extern NdbNodeBitmask g_nowait_nodes;
@@ -98,6 +99,10 @@ static struct my_option my_long_options[] =
"Perform initial start",
(gptr*) &_initialstart, (gptr*) &_initialstart, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "bind-address", OPT_NOWAIT_NODES,
"Local bind address",
(gptr*) &_bind_address, (gptr*) &_bind_address, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
static void short_usage_sub(void)
@@ -257,7 +262,9 @@ Configuration::fetch_configuration(){
m_mgmd_port= 0;
m_config_retriever= new ConfigRetriever(getConnectString(),
NDB_VERSION, NODE_TYPE_DB);
NDB_VERSION,
NODE_TYPE_DB,
_bind_address);
if (m_config_retriever->hasError())
{

View File

@@ -107,6 +107,7 @@ struct ndb_mgm_handle {
int mgmd_version_major;
int mgmd_version_minor;
int mgmd_version_build;
char * m_bindaddress;
};
#define SET_ERROR(h, e, s) setError(h, e, __LINE__, s)
@@ -168,6 +169,7 @@ ndb_mgm_create_handle()
h->cfg_i = -1;
h->errstream = stdout;
h->m_name = 0;
h->m_bindaddress = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
@@ -215,6 +217,22 @@ ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
DBUG_RETURN(0);
}
extern "C"
int
ndb_mgm_set_bindaddress(NdbMgmHandle handle, const char * arg)
{
DBUG_ENTER("ndb_mgm_set_bindaddress");
if (handle->m_bindaddress)
free(handle->m_bindaddress);
if (arg)
handle->m_bindaddress = strdup(arg);
else
handle->m_bindaddress = 0;
DBUG_RETURN(0);
}
/**
* Destroy a handle
*/
@@ -241,6 +259,8 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
#endif
(*handle)->cfg.~LocalConfig();
my_free((*handle)->m_name, MYF(MY_ALLOW_ZERO_PTR));
if ((*handle)->m_bindaddress)
free((*handle)->m_bindaddress);
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
DBUG_VOID_RETURN;
@@ -433,6 +453,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
BaseString::snprintf(logname, 64, "mgmapi.log");
handle->logfile = fopen(logname, "w");
#endif
char buf[1024];
/**
* Do connect
@@ -440,6 +461,50 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
LocalConfig &cfg= handle->cfg;
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
Uint32 i;
int binderror = 0;
SocketClient s(0, 0);
if (!s.init())
{
fprintf(handle->errstream,
"Unable to create socket, "
"while trying to connect with connect string: %s\n",
cfg.makeConnectString(buf,sizeof(buf)));
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to create socket, "
"while trying to connect with connect string: %s\n",
cfg.makeConnectString(buf,sizeof(buf)));
DBUG_RETURN(-1);
}
if (handle->m_bindaddress)
{
BaseString::snprintf(buf, sizeof(buf), handle->m_bindaddress);
unsigned short portno = 0;
char * port = strchr(buf, ':');
if (port != 0)
{
portno = atoi(port+1);
* port = 0;
}
int err;
if ((err = s.bind(buf, portno)) != 0)
{
fprintf(handle->errstream,
"Unable to bind local address %s errno: %d, "
"while trying to connect with connect string: %s\n",
handle->m_bindaddress, err,
cfg.makeConnectString(buf,sizeof(buf)));
setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
"Unable to bind local address %s errno: %d, "
"while trying to connect with connect string: %s\n",
handle->m_bindaddress, err,
cfg.makeConnectString(buf,sizeof(buf)));
DBUG_RETURN(-1);
}
}
while (sockfd == NDB_INVALID_SOCKET)
{
// do all the mgmt servers
@@ -447,8 +512,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
{
if (cfg.ids[i].type != MgmId_TCP)
continue;
SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
sockfd = s.connect();
sockfd = s.connect(cfg.ids[i].name.c_str(), cfg.ids[i].port);
if (sockfd != NDB_INVALID_SOCKET)
break;
}
@@ -456,19 +520,17 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
break;
#ifndef DBUG_OFF
{
char buf[1024];
DBUG_PRINT("info",("Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf))));
}
#endif
if (verbose > 0) {
char buf[1024];
fprintf(handle->errstream, "Unable to connect with connect string: %s\n",
fprintf(handle->errstream,
"Unable to connect with connect string: %s\n",
cfg.makeConnectString(buf,sizeof(buf)));
verbose= -1;
}
if (no_retries == 0) {
char buf[1024];
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));