diff --git a/src/Cache.cpp b/src/Cache.cpp old mode 100644 new mode 100755 index f937c81b9..24b2bd3ea --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -90,9 +90,9 @@ Cache::Cache() : currentCacheSize(0) throw e; } //cout << "Cache got prefix " << prefix << endl; - - downloader.setDownloadPath(prefix.string()); - downloader.useThisLock(&lru_mutex); + downloader.reset(new Downloader()); + downloader->setDownloadPath(prefix.string()); + downloader->useThisLock(&lru_mutex); stmp = conf->getValue("ObjectStorage", "journal_path"); if (stmp.empty()) @@ -191,7 +191,7 @@ void Cache::read(const vector &keys) vector dl_errnos; vector sizes; if (!keysToFetch.empty()) - downloader.download(keysToFetch, &dl_errnos, &sizes, lru_mutex); + downloader->download(keysToFetch, &dl_errnos, &sizes, lru_mutex); size_t sum_sizes = 0; for (size_t &size : sizes) @@ -266,7 +266,7 @@ void Cache::read(const vector &keys) return; assert(s.owns_lock()); - downloader.download(keysToFetch, &dlErrnos, &dlSizes); + downloader->download(keysToFetch, &dlErrnos, &dlSizes); assert(s.owns_lock()); size_t sum_sizes = 0; @@ -614,6 +614,12 @@ void Cache::reset() currentCacheSize = 0; } +void Cache::shutdown() +{ + boost::unique_lock s(lru_mutex); + downloader.reset(); +} + /* The helper classes */ Cache::M_LRU_element_t::M_LRU_element_t(const string *k) : key(k) diff --git a/src/Cache.h b/src/Cache.h old mode 100644 new mode 100755 index 321cac486..5227a6ece --- a/src/Cache.h +++ b/src/Cache.h @@ -59,6 +59,8 @@ class Cache : public boost::noncopyable const boost::filesystem::path &getJournalPath(); // this will delete everything in the cache and journal paths, and empty all Cache structures. void reset(); + void shutdown(); + private: Cache(); @@ -67,7 +69,7 @@ class Cache : public boost::noncopyable size_t maxCacheSize; size_t objectSize; size_t currentCacheSize; - Downloader downloader; + boost::scoped_ptr downloader; Replicator *replicator; SMLogging *logger; diff --git a/src/ClientRequestProcessor.cpp b/src/ClientRequestProcessor.cpp index d5fde05ae..6a61ccad4 100644 --- a/src/ClientRequestProcessor.cpp +++ b/src/ClientRequestProcessor.cpp @@ -37,6 +37,11 @@ void ClientRequestProcessor::processRequest(int sock, uint len) threadPool.addJob(t); } +void ClientRequestProcessor::shutdown() +{ + delete crp; +} + } diff --git a/src/ClientRequestProcessor.h b/src/ClientRequestProcessor.h index 82af3bb68..c416782ce 100644 --- a/src/ClientRequestProcessor.h +++ b/src/ClientRequestProcessor.h @@ -16,6 +16,7 @@ class ClientRequestProcessor : public boost::noncopyable virtual ~ClientRequestProcessor(); void processRequest(int sock, uint len); + void shutdown(); private: ClientRequestProcessor(); diff --git a/src/SessionManager.cpp b/src/SessionManager.cpp old mode 100644 new mode 100755 index e8d349250..5e8cd877c --- a/src/SessionManager.cpp +++ b/src/SessionManager.cpp @@ -60,9 +60,10 @@ int SessionManager::start() int pollTimeout = -1; int socketIncr; int current_size = 0; - bool running = true; + int prevNFDS = 0; - logger->log(LOG_INFO,"SessionManager starting..."); + bool shutdown = false; + bool running = true; if (pipe(socketCtrl)==-1) { @@ -156,7 +157,7 @@ int SessionManager::start() { //logger->log(LOG_DEBUG,"Listening socket is readable"); incomingSockfd = 0; - while (incomingSockfd != -1) + while (incomingSockfd != -1 && !shutdown) { if (nfds >= MAX_SM_SOCKETS) break; // try to free up some room @@ -167,7 +168,7 @@ int SessionManager::start() if (errno != EWOULDBLOCK) { logger->log(LOG_CRIT,"accept() failed: %s", strerror(errno)); - running = false; + shutdown = true; } break; } @@ -200,8 +201,13 @@ int SessionManager::start() { if(fds[i].fd == socket) { - //logger->log(LOG_DEBUG,"returned socket %i at index %i", fds[i].fd,i); - fds[i].events = POLLIN; + if (shutdown) + { + logger->log(LOG_DEBUG,"Shutdown in progress, closed socket %i at index %i", fds[i].fd,i); + close(socket); + break; + } + fds[i].events = (POLLIN | POLLPRI); break; } } @@ -224,6 +230,10 @@ int SessionManager::start() } } break; + case SHUTDOWN: + logger->log(LOG_DEBUG,"Shutdown StorageManager..."); + shutdown = true; + break; default: break; } @@ -364,8 +374,15 @@ int SessionManager::start() /* get rid of fds == -1 */ int i, j; for (i = 2; i < nfds; ++i) + { + if (shutdown && fds[i].events == (POLLIN | POLLPRI)) + { + close(fds[i].fd); + fds[i].fd = -1; + } if (fds[i].fd == -1) break; + } for (j = i + 1; j < nfds; ++j) { if (fds[j].fd != -1) @@ -376,7 +393,19 @@ int SessionManager::start() } } nfds = i; + + if(shutdown) + { + //if (prevNFDS != nfds) + // logger->log(LOG_CRIT,"StorageManager shutting down, nfds = %i",nfds); + //prevNFDS = nfds; + if (nfds <= 2) + running = false; + } } + + //Shutdown Done + crp->shutdown(); return -1; } @@ -417,5 +446,18 @@ void SessionManager::socketError(int socket) } } +void SessionManager::shutdownSM(int sig){ + boost::mutex::scoped_lock s(ctrlMutex); + SMLogging* logger = SMLogging::get(); + logger->log(LOG_INFO,"SessionManager Caught Signal %i",sig); + int err; + uint8_t ctrlCode = SHUTDOWN; + err = ::write(socketCtrl[1], &ctrlCode, 1); + if (err <= 0) + { + return; + } +} + } diff --git a/src/SessionManager.h b/src/SessionManager.h old mode 100644 new mode 100755 index 81d9a5ed6..9ce769034 --- a/src/SessionManager.h +++ b/src/SessionManager.h @@ -6,6 +6,7 @@ #include "ClientRequestProcessor.h" #include "messageFormat.h" +#include #include #include #include @@ -16,7 +17,8 @@ namespace storagemanager enum sessionCtrl { ADDFD, - REMOVEFD + REMOVEFD, + SHUTDOWN }; #define MAX_SM_SOCKETS 200 @@ -38,6 +40,7 @@ public: void returnSocket(int socket); void socketError(int socket); void CRPTest(int socket,uint length); + void shutdownSM(int sig); private: SessionManager(); diff --git a/src/Synchronizer.cpp b/src/Synchronizer.cpp index abd6b1453..fc7270e78 100755 --- a/src/Synchronizer.cpp +++ b/src/Synchronizer.cpp @@ -56,7 +56,8 @@ Synchronizer::Synchronizer() : maxUploads(0) journalPath = cache->getJournalPath(); cachePath = cache->getCachePath(); - threadPool.setMaxThreads(maxUploads); + threadPool.reset(new ThreadPool()); + threadPool->setMaxThreads(maxUploads); die = false; syncThread = boost::thread([this] () { this->periodicSync(); }); } @@ -67,11 +68,10 @@ Synchronizer::~Synchronizer() or save the list it's working on..... For milestone 2, this will do the safe thing and finish working first. Later we can get fancy. */ - boost::unique_lock lock(mutex); + forceFlush(); die = true; - syncThread.interrupt(); - lock.unlock(); syncThread.join(); + threadPool.reset(); } enum OpFlags @@ -224,7 +224,7 @@ void Synchronizer::periodicSync() //logger->log(LOG_DEBUG,"Synchronizer Force Flush."); } lock.lock(); - //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << + //cout << "Sync'ing " << pendingOps.size() << " objects" << " queue size is " << // threadPool.currentQueueSize() << endl; for (auto &job : pendingOps) makeJob(job.first); @@ -243,7 +243,7 @@ void Synchronizer::makeJob(const string &key) objNames.push_front(key); boost::shared_ptr j(new Job(this, objNames.begin())); - threadPool.addJob(j); + threadPool->addJob(j); } void Synchronizer::process(list::iterator name) diff --git a/src/Synchronizer.h b/src/Synchronizer.h index 7652b2fc2..bbc01678b 100755 --- a/src/Synchronizer.h +++ b/src/Synchronizer.h @@ -72,7 +72,7 @@ class Synchronizer : public boost::noncopyable }; uint maxUploads; - ThreadPool threadPool; + boost::scoped_ptr threadPool; std::map > pendingOps; std::map > opsInProgress; diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp index 67daef2f3..e510da100 100644 --- a/src/ThreadPool.cpp +++ b/src/ThreadPool.cpp @@ -7,7 +7,7 @@ using namespace std; namespace storagemanager { -ThreadPool::ThreadPool() : maxThreads(1000), die(false), processQueueOnExit(false), threadsWaiting(0) +ThreadPool::ThreadPool() : maxThreads(1000), die(false), processQueueOnExit(true), threadsWaiting(0) { // Using this ctor effectively limits the # of threads here to the natural limit of the // context it's used in. In the CRP class for example, the # of active threads would be diff --git a/src/main.cpp b/src/main.cpp old mode 100644 new mode 100755 index 291816c3b..f7d974bd1 --- a/src/main.cpp +++ b/src/main.cpp @@ -20,23 +20,39 @@ using namespace std; using namespace storagemanager; +bool signalCaught = false; + void printCacheUsage(int sig) { cout << "Current cache size = " << Cache::get()->getCurrentCacheSize() << endl; cout << "Cache element count = " << Cache::get()->getCurrentCacheElementCount() << endl; } +void shutdownSM(int sig) +{ + if (!signalCaught) + (SessionManager::get())->shutdownSM(sig); + signalCaught = true; +} + int main(int argc, char** argv) { /* Instantiate objects to have them verify config settings before continuing */ - IOCoordinator::get(); - Cache::get(); - Synchronizer::get(); - Replicator::get(); + IOCoordinator* ioc = IOCoordinator::get(); + Cache* cache = Cache::get(); + Synchronizer* sync = Synchronizer::get(); + Replicator* rep = Replicator::get(); struct sigaction sa; memset(&sa, 0, sizeof(sa)); + + for (int i=0; istart(); + cache->shutdown(); + delete sync; + delete cache; + delete ioc; + delete rep; + logger->log(LOG_INFO,"StorageManager Shutdown Complete."); return ret; }