1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-13 23:02:14 +03:00

Add logic for graceful shutdown

This commit is contained in:
Ben Thompson
2019-07-01 11:39:24 -05:00
parent bf18233ce7
commit dedffcd01e
10 changed files with 108 additions and 25 deletions

16
src/Cache.cpp Normal file → Executable file
View File

@@ -90,9 +90,9 @@ Cache::Cache() : currentCacheSize(0)
throw e; throw e;
} }
//cout << "Cache got prefix " << prefix << endl; //cout << "Cache got prefix " << prefix << endl;
downloader.reset(new Downloader());
downloader.setDownloadPath(prefix.string()); downloader->setDownloadPath(prefix.string());
downloader.useThisLock(&lru_mutex); downloader->useThisLock(&lru_mutex);
stmp = conf->getValue("ObjectStorage", "journal_path"); stmp = conf->getValue("ObjectStorage", "journal_path");
if (stmp.empty()) if (stmp.empty())
@@ -191,7 +191,7 @@ void Cache::read(const vector<string> &keys)
vector<int> dl_errnos; vector<int> dl_errnos;
vector<size_t> sizes; vector<size_t> sizes;
if (!keysToFetch.empty()) if (!keysToFetch.empty())
downloader.download(keysToFetch, &dl_errnos, &sizes, lru_mutex); downloader->download(keysToFetch, &dl_errnos, &sizes, lru_mutex);
size_t sum_sizes = 0; size_t sum_sizes = 0;
for (size_t &size : sizes) for (size_t &size : sizes)
@@ -266,7 +266,7 @@ void Cache::read(const vector<string> &keys)
return; return;
assert(s.owns_lock()); assert(s.owns_lock());
downloader.download(keysToFetch, &dlErrnos, &dlSizes); downloader->download(keysToFetch, &dlErrnos, &dlSizes);
assert(s.owns_lock()); assert(s.owns_lock());
size_t sum_sizes = 0; size_t sum_sizes = 0;
@@ -614,6 +614,12 @@ void Cache::reset()
currentCacheSize = 0; currentCacheSize = 0;
} }
void Cache::shutdown()
{
boost::unique_lock<boost::mutex> s(lru_mutex);
downloader.reset();
}
/* The helper classes */ /* The helper classes */
Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k)

4
src/Cache.h Normal file → Executable file
View File

@@ -59,6 +59,8 @@ class Cache : public boost::noncopyable
const boost::filesystem::path &getJournalPath(); const boost::filesystem::path &getJournalPath();
// this will delete everything in the cache and journal paths, and empty all Cache structures. // this will delete everything in the cache and journal paths, and empty all Cache structures.
void reset(); void reset();
void shutdown();
private: private:
Cache(); Cache();
@@ -67,7 +69,7 @@ class Cache : public boost::noncopyable
size_t maxCacheSize; size_t maxCacheSize;
size_t objectSize; size_t objectSize;
size_t currentCacheSize; size_t currentCacheSize;
Downloader downloader; boost::scoped_ptr<Downloader> downloader;
Replicator *replicator; Replicator *replicator;
SMLogging *logger; SMLogging *logger;

View File

@@ -37,6 +37,11 @@ void ClientRequestProcessor::processRequest(int sock, uint len)
threadPool.addJob(t); threadPool.addJob(t);
} }
void ClientRequestProcessor::shutdown()
{
delete crp;
}
} }

View File

@@ -16,6 +16,7 @@ class ClientRequestProcessor : public boost::noncopyable
virtual ~ClientRequestProcessor(); virtual ~ClientRequestProcessor();
void processRequest(int sock, uint len); void processRequest(int sock, uint len);
void shutdown();
private: private:
ClientRequestProcessor(); ClientRequestProcessor();

54
src/SessionManager.cpp Normal file → Executable file
View File

@@ -60,9 +60,10 @@ int SessionManager::start()
int pollTimeout = -1; int pollTimeout = -1;
int socketIncr; int socketIncr;
int current_size = 0; int current_size = 0;
bool running = true; int prevNFDS = 0;
logger->log(LOG_INFO,"SessionManager starting..."); bool shutdown = false;
bool running = true;
if (pipe(socketCtrl)==-1) if (pipe(socketCtrl)==-1)
{ {
@@ -156,7 +157,7 @@ int SessionManager::start()
{ {
//logger->log(LOG_DEBUG,"Listening socket is readable"); //logger->log(LOG_DEBUG,"Listening socket is readable");
incomingSockfd = 0; incomingSockfd = 0;
while (incomingSockfd != -1) while (incomingSockfd != -1 && !shutdown)
{ {
if (nfds >= MAX_SM_SOCKETS) if (nfds >= MAX_SM_SOCKETS)
break; // try to free up some room break; // try to free up some room
@@ -167,7 +168,7 @@ int SessionManager::start()
if (errno != EWOULDBLOCK) if (errno != EWOULDBLOCK)
{ {
logger->log(LOG_CRIT,"accept() failed: %s", strerror(errno)); logger->log(LOG_CRIT,"accept() failed: %s", strerror(errno));
running = false; shutdown = true;
} }
break; break;
} }
@@ -200,8 +201,13 @@ int SessionManager::start()
{ {
if(fds[i].fd == socket) if(fds[i].fd == socket)
{ {
//logger->log(LOG_DEBUG,"returned socket %i at index %i", fds[i].fd,i); if (shutdown)
fds[i].events = POLLIN; {
logger->log(LOG_DEBUG,"Shutdown in progress, closed socket %i at index %i", fds[i].fd,i);
close(socket);
break;
}
fds[i].events = (POLLIN | POLLPRI);
break; break;
} }
} }
@@ -224,6 +230,10 @@ int SessionManager::start()
} }
} }
break; break;
case SHUTDOWN:
logger->log(LOG_DEBUG,"Shutdown StorageManager...");
shutdown = true;
break;
default: default:
break; break;
} }
@@ -364,8 +374,15 @@ int SessionManager::start()
/* get rid of fds == -1 */ /* get rid of fds == -1 */
int i, j; int i, j;
for (i = 2; i < nfds; ++i) for (i = 2; i < nfds; ++i)
{
if (shutdown && fds[i].events == (POLLIN | POLLPRI))
{
close(fds[i].fd);
fds[i].fd = -1;
}
if (fds[i].fd == -1) if (fds[i].fd == -1)
break; break;
}
for (j = i + 1; j < nfds; ++j) for (j = i + 1; j < nfds; ++j)
{ {
if (fds[j].fd != -1) if (fds[j].fd != -1)
@@ -376,7 +393,19 @@ int SessionManager::start()
} }
} }
nfds = i; nfds = i;
if(shutdown)
{
//if (prevNFDS != nfds)
// logger->log(LOG_CRIT,"StorageManager shutting down, nfds = %i",nfds);
//prevNFDS = nfds;
if (nfds <= 2)
running = false;
}
} }
//Shutdown Done
crp->shutdown();
return -1; return -1;
} }
@@ -417,5 +446,18 @@ void SessionManager::socketError(int socket)
} }
} }
void SessionManager::shutdownSM(int sig){
boost::mutex::scoped_lock s(ctrlMutex);
SMLogging* logger = SMLogging::get();
logger->log(LOG_INFO,"SessionManager Caught Signal %i",sig);
int err;
uint8_t ctrlCode = SHUTDOWN;
err = ::write(socketCtrl[1], &ctrlCode, 1);
if (err <= 0)
{
return;
}
}
} }

5
src/SessionManager.h Normal file → Executable file
View File

@@ -6,6 +6,7 @@
#include "ClientRequestProcessor.h" #include "ClientRequestProcessor.h"
#include "messageFormat.h" #include "messageFormat.h"
#include <signal.h>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <sys/poll.h> #include <sys/poll.h>
#include <tr1/unordered_map> #include <tr1/unordered_map>
@@ -16,7 +17,8 @@ namespace storagemanager
enum sessionCtrl { enum sessionCtrl {
ADDFD, ADDFD,
REMOVEFD REMOVEFD,
SHUTDOWN
}; };
#define MAX_SM_SOCKETS 200 #define MAX_SM_SOCKETS 200
@@ -38,6 +40,7 @@ public:
void returnSocket(int socket); void returnSocket(int socket);
void socketError(int socket); void socketError(int socket);
void CRPTest(int socket,uint length); void CRPTest(int socket,uint length);
void shutdownSM(int sig);
private: private:
SessionManager(); SessionManager();

View File

@@ -56,7 +56,8 @@ Synchronizer::Synchronizer() : maxUploads(0)
journalPath = cache->getJournalPath(); journalPath = cache->getJournalPath();
cachePath = cache->getCachePath(); cachePath = cache->getCachePath();
threadPool.setMaxThreads(maxUploads); threadPool.reset(new ThreadPool());
threadPool->setMaxThreads(maxUploads);
die = false; die = false;
syncThread = boost::thread([this] () { this->periodicSync(); }); syncThread = boost::thread([this] () { this->periodicSync(); });
} }
@@ -67,11 +68,10 @@ Synchronizer::~Synchronizer()
or save the list it's working on..... or save the list it's working on.....
For milestone 2, this will do the safe thing and finish working first. For milestone 2, this will do the safe thing and finish working first.
Later we can get fancy. */ Later we can get fancy. */
boost::unique_lock<boost::mutex> lock(mutex); forceFlush();
die = true; die = true;
syncThread.interrupt();
lock.unlock();
syncThread.join(); syncThread.join();
threadPool.reset();
} }
enum OpFlags enum OpFlags
@@ -224,7 +224,7 @@ void Synchronizer::periodicSync()
//logger->log(LOG_DEBUG,"Synchronizer Force Flush."); //logger->log(LOG_DEBUG,"Synchronizer Force Flush.");
} }
lock.lock(); lock.lock();
//cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " <<
// threadPool.currentQueueSize() << endl; // threadPool.currentQueueSize() << endl;
for (auto &job : pendingOps) for (auto &job : pendingOps)
makeJob(job.first); makeJob(job.first);
@@ -243,7 +243,7 @@ void Synchronizer::makeJob(const string &key)
objNames.push_front(key); objNames.push_front(key);
boost::shared_ptr<Job> j(new Job(this, objNames.begin())); boost::shared_ptr<Job> j(new Job(this, objNames.begin()));
threadPool.addJob(j); threadPool->addJob(j);
} }
void Synchronizer::process(list<string>::iterator name) void Synchronizer::process(list<string>::iterator name)

View File

@@ -72,7 +72,7 @@ class Synchronizer : public boost::noncopyable
}; };
uint maxUploads; uint maxUploads;
ThreadPool threadPool; boost::scoped_ptr<ThreadPool> threadPool;
std::map<std::string, boost::shared_ptr<PendingOps> > pendingOps; std::map<std::string, boost::shared_ptr<PendingOps> > pendingOps;
std::map<std::string, boost::shared_ptr<PendingOps> > opsInProgress; std::map<std::string, boost::shared_ptr<PendingOps> > opsInProgress;

View File

@@ -7,7 +7,7 @@ using namespace std;
namespace storagemanager namespace storagemanager
{ {
ThreadPool::ThreadPool() : maxThreads(1000), die(false), processQueueOnExit(false), threadsWaiting(0) ThreadPool::ThreadPool() : maxThreads(1000), die(false), processQueueOnExit(true), threadsWaiting(0)
{ {
// Using this ctor effectively limits the # of threads here to the natural limit of the // Using this ctor effectively limits the # of threads here to the natural limit of the
// context it's used in. In the CRP class for example, the # of active threads would be // context it's used in. In the CRP class for example, the # of active threads would be

32
src/main.cpp Normal file → Executable file
View File

@@ -20,23 +20,39 @@ using namespace std;
using namespace storagemanager; using namespace storagemanager;
bool signalCaught = false;
void printCacheUsage(int sig) void printCacheUsage(int sig)
{ {
cout << "Current cache size = " << Cache::get()->getCurrentCacheSize() << endl; cout << "Current cache size = " << Cache::get()->getCurrentCacheSize() << endl;
cout << "Cache element count = " << Cache::get()->getCurrentCacheElementCount() << endl; cout << "Cache element count = " << Cache::get()->getCurrentCacheElementCount() << endl;
} }
void shutdownSM(int sig)
{
if (!signalCaught)
(SessionManager::get())->shutdownSM(sig);
signalCaught = true;
}
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
/* Instantiate objects to have them verify config settings before continuing */ /* Instantiate objects to have them verify config settings before continuing */
IOCoordinator::get(); IOCoordinator* ioc = IOCoordinator::get();
Cache::get(); Cache* cache = Cache::get();
Synchronizer::get(); Synchronizer* sync = Synchronizer::get();
Replicator::get(); Replicator* rep = Replicator::get();
struct sigaction sa; struct sigaction sa;
memset(&sa, 0, sizeof(sa)); memset(&sa, 0, sizeof(sa));
for (int i=0; i<SIGRTMAX; i++)
{
sa.sa_handler = shutdownSM;
sigaction(i, &sa, NULL);
}
sa.sa_handler = SIG_IGN; sa.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sa, NULL); sigaction(SIGPIPE, &sa, NULL);
@@ -51,8 +67,16 @@ int main(int argc, char** argv)
SessionManager* sm = SessionManager::get(); SessionManager* sm = SessionManager::get();
ret = sm->start(); ret = sm->start();
cache->shutdown();
delete sync;
delete cache;
delete ioc;
delete rep;
logger->log(LOG_INFO,"StorageManager Shutdown Complete.");
return ret; return ret;
} }