diff --git a/storage/ndb/src/kernel/blocks/backup/Backup.cpp b/storage/ndb/src/kernel/blocks/backup/Backup.cpp index ba77a11d36b..8479b977f7f 100644 --- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp +++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp @@ -229,36 +229,47 @@ Backup::execCONTINUEB(Signal* signal) BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptr_I); - TablePtr tabPtr; - ptr.p->tables.getPtr(tabPtr, tabPtr_I); - FragmentPtr fragPtr; - tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I); - BackupFilePtr filePtr; - ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr); - - const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2; - Uint32 * dst; - if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz)) + if (tabPtr_I == RNIL) { - sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4); + closeFiles(signal, ptr); return; } + jam(); + TablePtr tabPtr; + ptr.p->tables.getPtr(tabPtr, tabPtr_I); + jam(); + if(tabPtr.p->fragments.getSize()) + { + FragmentPtr fragPtr; + tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I); - BackupFormat::CtlFile::FragmentInfo * fragInfo = - (BackupFormat::CtlFile::FragmentInfo*)dst; - fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO); - fragInfo->SectionLength = htonl(sz); - fragInfo->TableId = htonl(fragPtr.p->tableId); - fragInfo->FragmentNo = htonl(fragPtr_I); - fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF); - fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32); - fragInfo->FilePosLow = htonl(0); - fragInfo->FilePosHigh = htonl(0); + BackupFilePtr filePtr; + ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr); + const Uint32 sz = sizeof(BackupFormat::CtlFile::FragmentInfo) >> 2; + Uint32 * dst; + if (!filePtr.p->operation.dataBuffer.getWritePtr(&dst, sz)) + { + sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 4); + return; + } - filePtr.p->operation.dataBuffer.updateWritePtr(sz); + BackupFormat::CtlFile::FragmentInfo * fragInfo = + (BackupFormat::CtlFile::FragmentInfo*)dst; + fragInfo->SectionType = htonl(BackupFormat::FRAGMENT_INFO); + fragInfo->SectionLength = htonl(sz); + fragInfo->TableId = htonl(fragPtr.p->tableId); + fragInfo->FragmentNo = htonl(fragPtr_I); + fragInfo->NoOfRecordsLow = htonl(fragPtr.p->noOfRecords & 0xFFFFFFFF); + fragInfo->NoOfRecordsHigh = htonl(fragPtr.p->noOfRecords >> 32); + fragInfo->FilePosLow = htonl(0); + fragInfo->FilePosHigh = htonl(0); + + filePtr.p->operation.dataBuffer.updateWritePtr(sz); + + fragPtr_I++; + } - fragPtr_I++; if (fragPtr_I == tabPtr.p->fragments.getSize()) { signal->theData[0] = tabPtr.p->tableId; @@ -2040,6 +2051,12 @@ Backup::sendDropTrig(Signal* signal, BackupRecordPtr ptr) TablePtr tabPtr; ptr.p->tables.first(tabPtr); + if(tabPtr.i == RNIL) + { + closeFiles(signal, ptr); + return; + } + signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO; signal->theData[1] = ptr.i; signal->theData[2] = tabPtr.i; diff --git a/storage/ndb/src/mgmclient/CommandInterpreter.cpp b/storage/ndb/src/mgmclient/CommandInterpreter.cpp index c31efebbf1e..3c822e4b07e 100644 --- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp +++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp @@ -161,8 +161,15 @@ private: int try_reconnect; int m_error; struct NdbThread* m_event_thread; + NdbMutex *m_print_mutex; }; +struct event_thread_param { + NdbMgmHandle *m; + NdbMutex **p; +}; + +NdbMutex* print_mutex; /* * Facade object for CommandInterpreter @@ -340,6 +347,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; + m_print_mutex= NdbMutex_Create(); } /* @@ -348,6 +356,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); + NdbMutex_Destroy(m_print_mutex); } static bool @@ -384,11 +393,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* -event_thread_run(void* m) +event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); - NdbMgmHandle handle= *(NdbMgmHandle*)m; + struct event_thread_param param= *(struct event_thread_param*)p; + NdbMgmHandle handle= *(param.m); + NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, @@ -406,7 +417,11 @@ event_thread_run(void* m) { const char ping_token[]= ""; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) - ndbout << tmp; + if(tmp && strlen(tmp)) + { + Guard g(printmutex); + ndbout << tmp; + } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); @@ -459,8 +474,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; + struct event_thread_param p; + p.m= &m_mgmsrv2; + p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, - (void**)&m_mgmsrv2, + (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); @@ -547,6 +565,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; + return result; } @@ -626,6 +645,7 @@ CommandInterpreter::execute_impl(const char *_line) DBUG_RETURN(true); if (strcasecmp(firstToken, "SHOW") == 0) { + Guard g(m_print_mutex); executeShow(allAfterFirstToken); DBUG_RETURN(true); } @@ -853,6 +873,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { + Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); diff --git a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp index 50a623920d2..a2a56905392 100644 --- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp @@ -77,7 +77,6 @@ }\ } -extern int global_flag_send_heartbeat_now; extern int g_no_nodeid_checks; extern my_bool opt_core; @@ -1450,6 +1449,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include +void +MgmtSrvr::updateStatus() +{ + theFacade->theClusterMgr->forceHB(); +} + int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, @@ -2260,7 +2265,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, if (found_matching_type && !found_free_node) { // we have a temporary error which might be due to that // we have got the latest connect status from db-nodes. Force update. - global_flag_send_heartbeat_now= 1; + updateStatus(); } BaseString type_string, type_c_string; @@ -2603,7 +2608,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() if (!m_reserved_nodes.isclear()) { m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); // node has been reserved, force update signal to ndb nodes - global_flag_send_heartbeat_now= 1; + m_mgmsrv.updateStatus(); char tmp_str[128]; m_mgmsrv.m_reserved_nodes.getText(tmp_str); diff --git a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp index 1473ec90c33..5eee7447c98 100644 --- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp @@ -485,6 +485,8 @@ public: void get_connected_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } + void updateStatus(); + //************************************************************************** private: //************************************************************************** diff --git a/storage/ndb/src/mgmsrv/Services.cpp b/storage/ndb/src/mgmsrv/Services.cpp index cc7892f8b36..5132b343fbd 100644 --- a/storage/ndb/src/mgmsrv/Services.cpp +++ b/storage/ndb/src/mgmsrv/Services.cpp @@ -972,6 +972,7 @@ printNodeStatus(OutputStream *output, MgmtSrvr &mgmsrv, enum ndb_mgm_node_type type) { NodeId nodeId = 0; + mgmsrv.updateStatus(); while(mgmsrv.getNextNodeId(&nodeId, type)) { enum ndb_mgm_node_status status; Uint32 startPhase = 0, diff --git a/storage/ndb/src/ndbapi/ClusterMgr.cpp b/storage/ndb/src/ndbapi/ClusterMgr.cpp index 49815ae6c13..b171457c2a9 100644 --- a/storage/ndb/src/ndbapi/ClusterMgr.cpp +++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp @@ -37,8 +37,8 @@ #include #include -int global_flag_send_heartbeat_now= 0; int global_flag_skip_invalidate_cache = 0; +//#define DEBUG_REG // Just a C wrapper for threadMain extern "C" @@ -68,6 +68,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); + waitForHBCond= NdbCondition_Create(); + waitingForHB= false; noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; @@ -79,7 +81,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ClusterMgr::~ClusterMgr() { DBUG_ENTER("ClusterMgr::~ClusterMgr"); - doStop(); + doStop(); + NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN; } @@ -153,6 +156,70 @@ ClusterMgr::doStop( ){ DBUG_VOID_RETURN; } +void +ClusterMgr::forceHB() +{ + theFacade.lock_mutex(); + + if(waitingForHB) + { + NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + theFacade.unlock_mutex(); + return; + } + + waitingForHB= true; + + NodeBitmask ndb_nodes; + ndb_nodes.clear(); + waitForHBFromNodes.clear(); + for(Uint32 i = 0; i < MAX_NODES; i++) + { + if(!theNodes[i].defined) + continue; + if(theNodes[i].m_info.m_type == NodeInfo::DB) + { + ndb_nodes.set(i); + const ClusterMgr::Node &node= getNodeInfo(i); + waitForHBFromNodes.bitOR(node.m_state.m_connected_nodes); + } + } + waitForHBFromNodes.bitAND(ndb_nodes); + +#ifdef DEBUG_REG + char buf[128]; + ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; +#endif + NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); + + signal.theVerId_signalNumber = GSN_API_REGREQ; + signal.theReceiversBlockNumber = QMGR; + signal.theTrace = 0; + signal.theLength = ApiRegReq::SignalLength; + + ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); + req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); + req->version = NDB_VERSION; + + int nodeId= 0; + for(int i=0; + NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i)); + i= nodeId+1) + { +#ifdef DEBUG_REG + ndbout << "FORCE HB to " << nodeId << endl; +#endif + theFacade.sendSignalUnCond(&signal, nodeId); + } + + NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); + waitingForHB= false; +#ifdef DEBUG_REG + ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; +#endif + theFacade.unlock_mutex(); +} + void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); @@ -174,9 +241,6 @@ ClusterMgr::threadMain( ){ /** * Start of Secure area for use of Transporter */ - int send_heartbeat_now= global_flag_send_heartbeat_now; - global_flag_send_heartbeat_now= 0; - if (m_cluster_state == CS_waiting_for_clean_cache) { theFacade.m_globalDictCache.lock(); @@ -209,8 +273,7 @@ ClusterMgr::threadMain( ){ } theNode.hbCounter += timeSlept; - if (theNode.hbCounter >= theNode.hbFrequency || - send_heartbeat_now) { + if (theNode.hbCounter >= theNode.hbFrequency) { /** * It is now time to send a new Heartbeat */ @@ -219,7 +282,7 @@ ClusterMgr::threadMain( ){ theNode.hbCounter = 0; } -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); @@ -272,7 +335,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif @@ -313,7 +376,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); -#if 0 +#ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif @@ -342,6 +405,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ }//if node.m_info.m_heartbeat_cnt = 0; node.hbCounter = 0; + + if(waitingForHB) + { + waitForHBFromNodes.clear(nodeId); + + if(waitForHBFromNodes.isclear()) + { + waitingForHB= false; + NdbCondition_Broadcast(waitForHBCond); + } + } node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } @@ -371,6 +445,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){ default: break; } + + waitForHBFromNodes.clear(nodeId); + if(waitForHBFromNodes.isclear()) + NdbCondition_Signal(waitForHBCond); } void diff --git a/storage/ndb/src/ndbapi/ClusterMgr.hpp b/storage/ndb/src/ndbapi/ClusterMgr.hpp index ca879e7948e..20912938cf3 100644 --- a/storage/ndb/src/ndbapi/ClusterMgr.hpp +++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp @@ -49,7 +49,9 @@ public: void doStop(); void startThread(); - + + void forceHB(); + private: void threadMain(); @@ -91,6 +93,11 @@ private: Uint32 noOfConnectedNodes; Node theNodes[MAX_NODES]; NdbThread* theClusterMgrThread; + + NodeBitmask waitForHBFromNodes; // used in forcing HBs + NdbCondition* waitForHBCond; + bool waitingForHB; + enum Cluster_state m_cluster_state; /** * Used for controlling start/stop of the thread