From 02114b5b7cbc60da7f7bf472d8711e8e3f9c3440 Mon Sep 17 00:00:00 2001 From: Denis Khalikov Date: Mon, 25 Sep 2023 23:39:49 +0300 Subject: [PATCH] feat(BRM) MCOL-5555 Reduce a number of direct writes to BRM journal/tablelocks files --- docs/StorageManagerMetaVersioning.md | 37 ++++++++++++++++++++++++++++ versioning/BRM/brmtypes.cpp | 35 +++++++++++++++++++++++++- versioning/BRM/brmtypes.h | 5 ++++ versioning/BRM/slavecomm.cpp | 21 ++++++++++------ versioning/BRM/tablelockserver.cpp | 11 +++++++-- 5 files changed, 98 insertions(+), 11 deletions(-) create mode 100644 docs/StorageManagerMetaVersioning.md diff --git a/docs/StorageManagerMetaVersioning.md b/docs/StorageManagerMetaVersioning.md new file mode 100644 index 000000000..ebeb21810 --- /dev/null +++ b/docs/StorageManagerMetaVersioning.md @@ -0,0 +1,37 @@ + +`Create meta version.` +```mermaid +sequenceDiagram + participant MetaVersionNew + participant StorageManager + participant S3Bucket + participant ControllerNode + StorageManager->>ControllerNode: Send command to freeze. (similar to readonly) + StorageManager->>MetaVersionNew: Create new meta version + StorageManager->>S3Bucket: Pack and put file with meta version on S3. + StorageManager->>ControllerNode: Send command to unfreeze. +``` + +`Regular work flow.` +```mermaid +sequenceDiagram + participant MetaVersion + participant StorageManager + participant S3Bucket + StorageManager->>S3Bucket: Request object from S3 + S3Bucket->>StorageManager: Send object to worker node + StorageManager->>S3Bucket: Update and put new object on S3 + StorageManager->>MetaVersion: Check object in meta version + StorageManager->>S3Bucket: Delete object if not in MetaVersion +``` + +`On rollback to version.` +```mermaid +sequenceDiagram + participant MetaVersion + participant StorageManager + participant S3Bucket + StorageManager->>S3Bucket: Request meta version. + S3Bucket->>StorageManager: Send meta version to worker node. + StorageManager->>MetaVersion: Unpack metaversion. +``` diff --git a/versioning/BRM/brmtypes.cpp b/versioning/BRM/brmtypes.cpp index 3992b5c61..89d92d05a 100644 --- a/versioning/BRM/brmtypes.cpp +++ b/versioning/BRM/brmtypes.cpp @@ -444,7 +444,40 @@ void TableLockInfo::deserialize(istream& i) dbrootList.resize(dbrootListSize); for (uint32_t j = 0; j < dbrootListSize; j++) - i.read((char*)&dbrootList[j], 4); + i.read((char*)&dbrootList[j], sizeof(uint32_t)); +} + +uint32_t TableLockInfo::getInternalSize() const +{ + const uint32_t nameLen = ownerName.length(); + const uint32_t dbrootListSize = dbrootList.size(); + return (32 + sizeof(time_t) + nameLen + (dbrootListSize * sizeof(uint32_t))); +} + +void TableLockInfo::serializeElement(char* buffer, const char* src, const uint32_t size, uint32_t& offset) +{ + std::memcpy(&buffer[offset], src, size); + offset += size; +} + +void TableLockInfo::serialize(char* buffer, uint32_t& offset) +{ + const uint32_t nameLen = ownerName.length(); + const uint32_t dbrootListSize = dbrootList.size(); + + serializeElement(buffer, (char*)&id, 8, offset); + serializeElement(buffer, (char*)&tableOID, 4, offset); + serializeElement(buffer, (char*)&ownerPID, 4, offset); + serializeElement(buffer, (char*)&state, 4, offset); + serializeElement(buffer, (char*)&ownerSessionID, 4, offset); + serializeElement(buffer, (char*)&ownerTxnID, 4, offset); + serializeElement(buffer, (char*)&creationTime, sizeof(time_t), offset); + serializeElement(buffer, (char*)&nameLen, 2, offset); + serializeElement(buffer, (char*)ownerName.c_str(), nameLen, offset); + serializeElement(buffer, (char*)&dbrootListSize, 2, offset); + + for (uint32_t j = 0; j < dbrootListSize; j++) + serializeElement(buffer, (char*)&dbrootList[j], sizeof(uint32_t), offset); } void TableLockInfo::serialize(IDBDataFile* o) const diff --git a/versioning/BRM/brmtypes.h b/versioning/BRM/brmtypes.h index a399e103b..594319b49 100644 --- a/versioning/BRM/brmtypes.h +++ b/versioning/BRM/brmtypes.h @@ -254,7 +254,12 @@ struct TableLockInfo : public messageqcpp::Serializeable EXPORT void deserialize(messageqcpp::ByteStream& bs); EXPORT void serialize(idbdatafile::IDBDataFile*) const; EXPORT void deserialize(idbdatafile::IDBDataFile*); + EXPORT void serialize(char* buffer, uint32_t& offset); + EXPORT uint32_t getInternalSize() const; bool operator<(const TableLockInfo&) const; + + private: + void serializeElement(char* buffer, const char* src, const uint32_t size, uint32_t& offset); }; /// A Serializeable version of InlineLBIDRange diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index a9e88decc..b4f648eb7 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -191,7 +191,6 @@ SlaveComm::SlaveComm() slave = std::make_unique(); } - void SlaveComm::stop() { die = true; @@ -1928,7 +1927,8 @@ void SlaveComm::do_confirm() { if (!currentSaveFile) { - currentSaveFile.reset(IDBDataFile::open(IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0)); + currentSaveFile.reset( + IDBDataFile::open(IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0)); } if (currentSaveFile == NULL) @@ -1951,7 +1951,8 @@ void SlaveComm::do_confirm() if (err < (int)relative.length()) { ostringstream os; - os << "WorkerComm: currentfile write() returned " << err << " file pointer is " << currentSaveFile.get(); + os << "WorkerComm: currentfile write() returned " << err << " file pointer is " + << currentSaveFile.get(); if (err < 0) os << " errno: " << strerror(errno); @@ -1966,7 +1967,7 @@ void SlaveComm::do_confirm() ; journalh.reset(IDBDataFile::open(IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG), - journalName.c_str(), "w+b", 0)); + journalName.c_str(), "w+b", 0)); if (!journalh) throw runtime_error("Could not open the BRM journal for writing!"); @@ -2182,11 +2183,16 @@ void SlaveComm::saveDelta() { try { - uint32_t len = delta.length(); + const uint32_t deltaLen = delta.length(); + const uint32_t bufferSize = sizeof(deltaLen) + deltaLen; + std::unique_ptr buffer(new char[bufferSize]); + uint32_t offset = 0; + std::memcpy(&buffer[offset], (char*)&deltaLen, sizeof(deltaLen)); + offset += sizeof(deltaLen); + std::memcpy(&buffer[offset], (char*)delta.buf(), deltaLen); journalh->seek(0, SEEK_END); - journalh->write((const char*)&len, sizeof(len)); - journalh->write((const char*)delta.buf(), delta.length()); + journalh->write((const char*)buffer.get(), bufferSize); journalh->flush(); journalCount++; } @@ -2326,4 +2332,3 @@ void SlaveComm::do_dmlReleaseLBIDRanges(ByteStream& msg) } } // namespace BRM - diff --git a/versioning/BRM/tablelockserver.cpp b/versioning/BRM/tablelockserver.cpp index fd13d211e..238605269 100644 --- a/versioning/BRM/tablelockserver.cpp +++ b/versioning/BRM/tablelockserver.cpp @@ -70,14 +70,21 @@ void TableLockServer::save() if (!out) throw runtime_error("TableLockServer::save(): could not open save file"); - out->write((char*)&count, 4); + uint32_t bufferSize = 4; + for (const auto& lock : locks) + bufferSize += lock.second.getInternalSize(); + + std::unique_ptr buffer(new char[bufferSize]); + uint32_t offset = 0; + std::memcpy(&buffer[offset], (char*)&count, 4); + offset += 4; for (it = locks.begin(); it != locks.end(); ++it) { if (!out) throw runtime_error("TableLockServer::save(): could not write save file"); - it->second.serialize(out.get()); + it->second.serialize(buffer.get(), offset); } }