From 9ced374c819465f1130cdae82cbfaf92b73c0dd7 Mon Sep 17 00:00:00 2001 From: Patrick LeBlanc Date: Fri, 12 Jul 2019 10:33:51 -0500 Subject: [PATCH] Another incremental commit. --- versioning/BRM/sessionmanagerserver.cpp | 97 +---------- versioning/BRM/sessionmanagerserver.h | 6 +- versioning/BRM/slavecomm.cpp | 205 ++++++------------------ versioning/BRM/slavecomm.h | 2 - 4 files changed, 56 insertions(+), 254 deletions(-) diff --git a/versioning/BRM/sessionmanagerserver.cpp b/versioning/BRM/sessionmanagerserver.cpp index f114cd483..739abf467 100644 --- a/versioning/BRM/sessionmanagerserver.cpp +++ b/versioning/BRM/sessionmanagerserver.cpp @@ -85,7 +85,7 @@ const uint32_t SessionManagerServer::SS_FORCE = 1 << 5; // In combination w const uint32_t SessionManagerServer::SS_QUERY_READY = 1 << 6; // Set by ProcManager when system is ready for queries -SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0), txnidfd(-1) +SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0) { config::Config* conf; string stmp; @@ -119,26 +119,6 @@ SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0), txnidfd txnidFilename = conf->getConfig("SessionManager", "TxnIDFile"); - if (false && !IDBPolicy::useHdfs()) - { - txnidfd = open(txnidFilename.c_str(), O_RDWR | O_CREAT | O_BINARY, 0664); - - if (txnidfd < 0) - { - perror("SessionManagerServer(): open"); - throw runtime_error("SessionManagerServer: Could not open the transaction ID file"); - } - - //FIXME: do we need this on Win? -#ifndef _MSC_VER - else - { - fchmod(txnidfd, 0664); - } - -#endif - } - semValue = maxTxns; _verID = 0; _sysCatVerID = 0; @@ -154,6 +134,10 @@ SessionManagerServer::SessionManagerServer() : unique32(0), unique64(0), txnidfd } } +SessionManagerServer::~SessionManagerServer() +{ +} + void SessionManagerServer::reset() { mutex.try_lock(); @@ -178,57 +162,7 @@ again: // If we fail to read a full four bytes for any value, then the // value isn't in the file, and we start with the default. - if (false && !IDBPolicy::useHdfs()) - { - // Last transaction id - lseek(txnidfd, 0, SEEK_SET); - err = read(txnidfd, &lastTxnID, 4); - - if (err < 0 && errno != EINTR) - { - perror("Sessionmanager::initSegment(): read"); - throw runtime_error("SessionManagerServer: read failed, aborting"); - } - else if (err < 0) - goto again; - else if (err == sizeof(int)) - _verID = lastTxnID; - - // last system catalog version id - err = read(txnidfd, &lastSysCatVerId, 4); - - if (err < 0 && errno != EINTR) - { - perror("Sessionmanager::initSegment(): read"); - throw runtime_error("SessionManagerServer: read failed, aborting"); - } - else if (err < 0) - goto again; - else if (err == sizeof(int)) - _sysCatVerID = lastSysCatVerId; - - // System state. Contains flags regarding the suspend state of the system. - err = read(txnidfd, &systemState, 4); - - if (err < 0 && errno == EINTR) - { - goto again; - } - else if (err == sizeof(int)) - { - // Turn off the pending and force flags. They make no sense for a clean start. - // Turn off the ready flag. DMLProc will set it back on when - // initialized. - systemState &= - ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_ROLLBACK | SS_FORCE); - } - else - { - // else no problem. System state wasn't saved. Might be an upgraded system. - systemState = 0; - } - } - else if (IDBPolicy::exists(txnidFilename.c_str())) + if (IDBPolicy::exists(txnidFilename.c_str())) { scoped_ptr txnidfp(IDBDataFile::open( IDBPolicy::getType(txnidFilename.c_str(), @@ -297,26 +231,7 @@ again: */ void SessionManagerServer::saveSystemState() { - if (false && !IDBPolicy::useHdfs()) - { - int err = 0; - uint32_t lSystemState = systemState; - - // We don't save the pending flags, the force flag or the ready flags. - lSystemState &= ~(SS_READY | SS_QUERY_READY | SS_SUSPEND_PENDING | SS_SHUTDOWN_PENDING | SS_FORCE); - lseek(txnidfd, 8, SEEK_SET); - err = write(txnidfd, &lSystemState, sizeof(int)); - - if (err < 0) - { - perror("SessionManagerServer::saveSystemState(): write(systemState)"); - throw runtime_error("SessionManagerServer::saveSystemState(): write(systemState) failed"); - } - } - else - { saveSMTxnIDAndState(); - } } const QueryContext SessionManagerServer::verID() diff --git a/versioning/BRM/sessionmanagerserver.h b/versioning/BRM/sessionmanagerserver.h index b25de41f9..e1782e3e8 100644 --- a/versioning/BRM/sessionmanagerserver.h +++ b/versioning/BRM/sessionmanagerserver.h @@ -123,10 +123,7 @@ public: * It does not destroy the semaphores. Those persist until the system * is shut down. */ - virtual ~SessionManagerServer() - { - if (txnidfd >= 0 ) close(txnidfd); - } + virtual ~SessionManagerServer(); /** @brief Gets the current version ID * @@ -276,7 +273,6 @@ private: int maxTxns; // the maximum number of concurrent transactions std::string txnidFilename; - int txnidfd; // file descriptor for the "last txnid" file execplan::CalpontSystemCatalog::SCN _verID; execplan::CalpontSystemCatalog::SCN _sysCatVerID; uint32_t systemState; diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index 39494cb5a..38aa35d00 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -65,7 +65,7 @@ namespace BRM { SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) : - slave(s), currentSaveFD(-1), currentSaveFile(NULL), journalh(NULL) + slave(s), currentSaveFile(NULL), journalh(NULL) #ifdef _MSC_VER , fPids(0), fMaxPids(64) #endif @@ -139,17 +139,9 @@ SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) : journalName = savefile + "_journal"; const char* filename = journalName.c_str(); - if (true || IDBPolicy::useHdfs()) - { - journalh = IDBDataFile::open( - IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0); - } - else - { - journal.open(filename, ios_base::binary | ios_base::out | ios_base::app); - } - - if ((journal.is_open() == false) && (journalh == NULL)) + journalh = IDBDataFile::open( + IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0); + if (journalh == NULL) throw runtime_error("Could not open the BRM journal for writing!"); } else @@ -177,7 +169,7 @@ SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) : } SlaveComm::SlaveComm() - : currentSaveFD(-1), currentSaveFile(NULL), journalh(NULL) + : currentSaveFile(NULL), journalh(NULL) #ifdef _MSC_VER , fPids(0), fMaxPids(64) #endif @@ -219,7 +211,6 @@ SlaveComm::~SlaveComm() if (firstSlave) { - close(currentSaveFD); delete currentSaveFile; currentSaveFile = NULL; } @@ -2004,19 +1995,13 @@ void SlaveComm::do_confirm() if (firstSlave && (takeSnapshot || (journalCount >= snapshotInterval && snapshotInterval >= 0))) { - const char* filename = tmp.c_str(); - - if ((false && !IDBPolicy::useHdfs()) && currentSaveFD < 0) - { - currentSaveFD = open(filename, O_WRONLY | O_CREAT, 0664); - } - else if ((true || IDBPolicy::useHdfs()) && !currentSaveFile) + if (!currentSaveFile) { currentSaveFile = IDBDataFile::open( - IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "wb", 0); + IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0); } - if (currentSaveFD < 0 && currentSaveFile == NULL) + if (currentSaveFile == NULL) { ostringstream os; os << "WorkerComm: failed to open the current savefile. errno: " @@ -2032,72 +2017,33 @@ void SlaveComm::do_confirm() #endif int err = 0; - if (currentSaveFile) + // MCOL-1558. Make the _current file relative to DBRMRoot. + string relative = tmp.substr(tmp.find_last_of('/') + 1); + err = currentSaveFile->write(relative.c_str(), relative.length()); + + if (err < (int) relative.length()) { - // MCOL-1558. Make the _current file relative to DBRMRoot. - string relative = tmp.substr(tmp.find_last_of('/') + 1); - err = currentSaveFile->write(relative.c_str(), relative.length()); + ostringstream os; + os << "WorkerComm: currentfile write() returned " << err + << " file pointer is " << currentSaveFile; - if (err < (int) relative.length()) - { - ostringstream os; - os << "WorkerComm: currentfile write() returned " << err - << " file pointer is " << currentSaveFile; + if (err < 0) + os << " errno: " << strerror(errno); - if (err < 0) - os << " errno: " << strerror(errno); - - log(os.str()); - } - - currentSaveFile->flush(); - delete currentSaveFile; - currentSaveFile = NULL; - saveFileToggle = !saveFileToggle; - - const char* filename = journalName.c_str(); - delete journalh; - journalh = IDBDataFile::open( - IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0); - - if (!journalh) - throw runtime_error("Could not open the BRM journal for writing!"); + log(os.str()); } - else - { - lseek(currentSaveFD, 0, SEEK_SET); - // MCOL-1558. Make the _current file relative to DBRMRoot. - string relative = tmp.substr(tmp.find_last_of('/') + 1); - err = write(currentSaveFD, relative.c_str(), relative.length()); - if (err < (int) relative.length()) - { - ostringstream os; - os << "WorkerComm: currentfile write() returned " << err - << " fd is " << currentSaveFD; + currentSaveFile->flush(); + delete currentSaveFile; + currentSaveFile = NULL; + saveFileToggle = !saveFileToggle; - if (err < 0) - os << " errno: " << strerror(errno); + delete journalh; + journalh = IDBDataFile::open( + IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG), journalName.c_str(), "w+b", 0); - log(os.str()); - } - -#ifdef _MSC_VER - //FIXME: Do we need to account for Windows EOL conversions? - _chsize_s(currentSaveFD, tmp.length()); - _commit(currentSaveFD); -#else - err = ftruncate(currentSaveFD, relative.length()); - fsync(currentSaveFD); -#endif - saveFileToggle = !saveFileToggle; - - /* Is there a nicer way to truncate the file using an ofstream? */ - journal.close(); - uint32_t utmp = ::umask(0); - journal.open(journalName.c_str(), ios_base::binary | ios_base::out | ios_base::trunc); - ::umask(utmp); - } + if (!journalh) + throw runtime_error("Could not open the BRM journal for writing!"); takeSnapshot = false; doSaveDelta = false; @@ -2250,70 +2196,28 @@ int SlaveComm::replayJournal(string prefix) const char* filename = journalName.c_str(); - if (true || IDBPolicy::useHdfs()) + IDBDataFile* journalf = IDBDataFile::open( + IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0); + + if (!journalf) { - IDBDataFile* journalf = IDBDataFile::open( - IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0); - - if (!journalf) - { - cout << "Error opening journal file " << fName << endl; - return -1; - } - - if (journalf->size() == 0) // empty file, nothing to replay - return 0; - - ssize_t readIn = 0; - - do - { - readIn = journalf->read((char*) &len, sizeof(len)); - - if (readIn > 0) - { - cmd.needAtLeast(len); - readIn = journalf->read((char*) cmd.getInputPtr(), len); - cmd.advanceInputPtr(len); - - try - { - processCommand(cmd); - } - catch (exception& e) - { - cout << e.what() << " Journal replay was incomplete." << endl; - slave->undoChanges(); - return -1; - } - - slave->confirmChanges(); - cmd.restart(); - ret++; - } - } - while (readIn > 0); + cout << "Error opening journal file " << fName << endl; + return -1; } - else + + if (journalf->size() == 0) // empty file, nothing to replay + return 0; + + ssize_t readIn = 0; + + do { - ifstream journalf; - journalf.open(filename, ios_base::in | ios_base::binary); + readIn = journalf->read((char*) &len, sizeof(len)); - if (!journalf.is_open()) + if (readIn > 0) { - cout << "Error opening journal file " << fName << endl; - return -1; - } - - while (journalf) - { - journalf.read((char*) &len, sizeof(len)); - - if (!journalf) - break; - cmd.needAtLeast(len); - journalf.read((char*) cmd.getInputPtr(), len); + readIn = journalf->read((char*) cmd.getInputPtr(), len); cmd.advanceInputPtr(len); try @@ -2331,7 +2235,7 @@ int SlaveComm::replayJournal(string prefix) cmd.restart(); ret++; } - } + } while (readIn > 0); return ret; } @@ -2359,21 +2263,10 @@ void SlaveComm::saveDelta() try { uint32_t len = delta.length(); - - if (true || IDBPolicy::useHdfs()) - { - journalh->write((const char*) &len, sizeof(len)); - journalh->write((const char*) delta.buf(), delta.length()); - journalh->flush(); - } - else - { - journal.seekg(0, ios_base::end); - journal.write((const char*) &len, sizeof(len)); - journal.write((const char*) delta.buf(), delta.length()); - journal.sync(); - } - + + journalh->write((const char*) &len, sizeof(len)); + journalh->write((const char*) delta.buf(), delta.length()); + journalh->flush(); journalCount++; } catch (exception& e) diff --git a/versioning/BRM/slavecomm.h b/versioning/BRM/slavecomm.h index c4ba025dd..f967a2239 100644 --- a/versioning/BRM/slavecomm.h +++ b/versioning/BRM/slavecomm.h @@ -125,10 +125,8 @@ private: std::string savefile; bool release, die, firstSlave, saveFileToggle, takeSnapshot, doSaveDelta, standalone, printOnly; messageqcpp::ByteStream delta; - int currentSaveFD; idbdatafile::IDBDataFile* currentSaveFile; std::string journalName; - std::fstream journal; idbdatafile::IDBDataFile* journalh; int64_t snapshotInterval, journalCount; struct timespec MSG_TIMEOUT;