diff --git a/storage-manager/src/MetadataFile.cpp b/storage-manager/src/MetadataFile.cpp index 8cf964c69..025565203 100644 --- a/storage-manager/src/MetadataFile.cpp +++ b/storage-manager/src/MetadataFile.cpp @@ -390,10 +390,10 @@ int MetadataFile::writeMetadata() stringstream stream; write_json(stream, *jsontree); - if (!blobWriter.writeBlob(kvStorage, metaKVName_, stream.str())) + if (!blobWriter.writeOrUpdateBlob(kvStorage, metaKVName_, stream.str())) { - SMLogging::get()->log(LOG_CRIT, "Metadatafile: cannot commit tnx set()."); - throw runtime_error("Metadatafile: cannot commit tnx set()."); + SMLogging::get()->log(LOG_CRIT, "Metadatafile: cannot write metadata to storage."); + throw runtime_error("Metadatafile: cannot write metadata to storage."); } } diff --git a/utils/fdb_wrapper_cpp/include/fdbcs.hpp b/utils/fdb_wrapper_cpp/include/fdbcs.hpp index 4298dfba9..b083ecb88 100644 --- a/utils/fdb_wrapper_cpp/include/fdbcs.hpp +++ b/utils/fdb_wrapper_cpp/include/fdbcs.hpp @@ -50,6 +50,8 @@ class Transaction explicit Transaction(FDBTransaction* tnx); ~Transaction(); + // Tries to atomically (during one transaction) swap keys. + bool swap(const ByteArray &key1, const ByteArray &key2); // Sets a given `key` and given `value`. void set(const ByteArray& key, const ByteArray& value) const; // Gets a `value` by the given `key`. @@ -117,6 +119,7 @@ using Keys = std::vector; using KeyBlockMap = std::unordered_map; using TreeLevelNumKeysMap = std::unordered_map; +// Represents an abstract class for key generators. class KeyGenerator { public: @@ -152,12 +155,37 @@ class BlobHandler } // Writes the given `blob` with given `key`. + // The semantic of this `write` is not atomic, it splits the data into multiple fdb transactions, if one of + // this transaction fails, we should call `removeBlob` to clear data which were written partially, and then + // try to `writeBlob` again. bool writeBlob(std::shared_ptr database, const ByteArray& key, const ByteArray& blob); + + // This function: + // 1) Checks if blob with the same key exists if not, uses `writeBlob` function. + // 2) Creates a new tree with a new key. + // 3) Atomically (during one fdb transaction) swaps root nodes for the original tree and new tree. + // 4) Removes original tree. + bool writeOrUpdateBlob(std::shared_ptr database, const ByteArray& key, + const ByteArray& blob); + // Reads `blob` by the given `key`, on error returns false. std::pair readBlob(std::shared_ptr database, const ByteArray& key); + + // Read blocks of the data based on the given `keys` starting from `index` position in keys vector + // and taking in account the max size of transaction. + // If `DataBlock` reached in a tree (leaf nodes), sets `dataBlockReached` flag to true. + std::pair> readBlocks(std::shared_ptr database, + const std::vector& keys, uint32_t& index, + bool& dataBlockReached); + // Removes a `blob` by the given `key`, on error returns false. + // The semantic of this `remove` is not atomic, it splits keys to remove into multiple fdb transactions, if + // one of this transaction fails, we should call `removeBlob` again to remove keys, which were not removed. bool removeBlob(std::shared_ptr database, const ByteArray& key); + // Checks if key exis. + bool keyExists(std::shared_ptr database, const ByteArray& key); + private: size_t insertData(Block& block, const std::string& blob, const size_t offset); void insertKey(Block& block, const std::string& value); diff --git a/utils/fdb_wrapper_cpp/src/fdbcs.cpp b/utils/fdb_wrapper_cpp/src/fdbcs.cpp index a6de89676..e6e681b02 100644 --- a/utils/fdb_wrapper_cpp/src/fdbcs.cpp +++ b/utils/fdb_wrapper_cpp/src/fdbcs.cpp @@ -50,6 +50,24 @@ Transaction::~Transaction() } } +bool Transaction::swap(const ByteArray& key1, const ByteArray& key2) +{ + if (!tnx_) + return false; + + auto resultPair1 = get(key1); + if (!resultPair1.first) + return false; + + auto resultPair2 = get(key2); + if (!resultPair2.first) + return false; + + set(key1, resultPair2.second); + set(key2, resultPair1.second); + return true; +} + void Transaction::set(const ByteArray& key, const ByteArray& value) const { if (tnx_) @@ -71,6 +89,14 @@ std::pair Transaction::get(const ByteArray& key) const return {false, {}}; } + err = fdb_future_get_error(future); + if (err) + { + fdb_future_destroy(future); + std::cerr << "fdb_future_get_error error, code: " << (int)err << std::endl; + return {false, {}}; + } + const uint8_t* outValue; int outValueLength; fdb_bool_t present; @@ -85,13 +111,9 @@ std::pair Transaction::get(const ByteArray& key) const fdb_future_destroy(future); if (present) - { return {true, ByteArray(outValue, outValue + outValueLength)}; - } else - { return {false, {}}; - } } return {false, {}}; } @@ -115,25 +137,25 @@ void Transaction::removeRange(const ByteArray& beginKey, const ByteArray& endKey bool Transaction::commit() const { - if (tnx_) + if (!tnx_) + return false; + + FDBFuture* future = fdb_transaction_commit(tnx_); + auto err = fdb_future_block_until_ready(future); + if (err) { - FDBFuture* future = fdb_transaction_commit(tnx_); - auto err = fdb_future_block_until_ready(future); - if (err) - { - fdb_future_destroy(future); - std::cerr << "fdb_future_block_until_ready error, code: " << (int)err << std::endl; - return false; - } - err = fdb_future_get_error(future); - if (err) - { - fdb_future_destroy(future); - std::cerr << "fdb_future_get_error(), code: " << (int)err << std::endl; - return false; - } fdb_future_destroy(future); + std::cerr << "fdb_future_block_until_ready error, code: " << (int)err << std::endl; + return false; } + err = fdb_future_get_error(future); + if (err) + { + fdb_future_destroy(future); + std::cerr << "fdb_future_get_error(), code: " << (int)err << std::endl; + return false; + } + fdb_future_destroy(future); return true; } @@ -320,6 +342,9 @@ TreeLevelNumKeysMap BlobHandler::computeNumKeysForEachTreeLevel(const int32_t tr bool BlobHandler::writeBlob(std::shared_ptr dataBase, const ByteArray& key, const ByteArray& blob) { + if (!dataBase) + return false; + const size_t blobSizeInBytes = blob.size(); if (!blobSizeInBytes) return commitKey(dataBase, key, ""); @@ -383,6 +408,37 @@ bool BlobHandler::writeBlob(std::shared_ptr dataBase, const return true; } +bool BlobHandler::keyExists(std::shared_ptr database, const ByteArray& key) +{ + auto tnx = database->createTransaction(); + if (!tnx) + return false; + return tnx->get(key).first; +} + +bool BlobHandler::writeOrUpdateBlob(std::shared_ptr dataBase, const ByteArray& key, + const ByteArray& blob) +{ + if (!dataBase) + return false; + + if (keyExists(dataBase, key)) + { + auto newKey = key + "new"; + if (!writeBlob(dataBase, newKey, blob)) + return false; + // Tnx destructor calls destroy on transaction if it fails/not commited. + { + auto tnx = dataBase->createTransaction(); + if (!tnx->swap(newKey, key) || !tnx->commit()) + return false; + } + return removeBlob(dataBase, newKey); + } + else + return writeBlob(dataBase, key, blob); +} + std::pair BlobHandler::getKeysFromBlock(const Block& block) { Keys keys; @@ -406,66 +462,88 @@ bool BlobHandler::isDataBlock(const Block& block) return block.second.compare(0, keyBlockIdentifier.size(), keyBlockIdentifier) != 0; } +std::pair> BlobHandler::readBlocks(std::shared_ptr database, + const std::vector& keys, + uint32_t& index, bool& dataBlockReached) +{ + if (!database) + return {false, {}}; + + auto tnx = database->createTransaction(); + if (!tnx) + return {false, {}}; + + size_t currentTnxSize = 0; + std::vector blocks; + // Take in account the size of the data that was read. + while ((index < keys.size()) && (currentTnxSize + blockSizeInBytes_ < maxTnxSize_)) + { + const auto& key = keys[index]; + auto p = tnx->get(key); + if (!p.first) + return {false, {}}; + currentTnxSize += key.size() + p.second.size(); + + Block block{0, p.second}; + if (!dataBlockReached && isDataBlock(block)) + dataBlockReached = true; + + blocks.push_back(block); + ++index; + } + + return {true, blocks}; +} + std::pair BlobHandler::readBlob(std::shared_ptr database, const ByteArray& key) { + if (!database) + return {false, ""}; + Keys currentKeys{key}; bool dataBlockReached = false; + std::string blob; while (!dataBlockReached) { - auto tnx = database->createTransaction(); - if (!tnx) - return {false, ""}; - std::vector blocks; - for (const auto& key : currentKeys) + blocks.reserve(currentKeys.size()); + + uint32_t index = 0; + while (index < currentKeys.size()) { - auto p = tnx->get(key); + const auto p = readBlocks(database, currentKeys, index, dataBlockReached); if (!p.first) return {false, ""}; - - Block block{0, p.second}; - if (isDataBlock(block)) - { - dataBlockReached = true; - break; - } - blocks.push_back(block); + blocks.insert(blocks.end(), p.second.begin(), p.second.end()); } - if (dataBlockReached) - break; - - Keys nextKeys; - for (const auto& block : blocks) + if (!dataBlockReached) [[likely]] { - auto keysPair = getKeysFromBlock(block); - if (!keysPair.first) - return {false, ""}; + Keys nextKeys; + for (const auto& block : blocks) + { + auto keysPair = getKeysFromBlock(block); + if (!keysPair.first) + return {false, ""}; - auto& keys = keysPair.second; - nextKeys.insert(nextKeys.end(), keys.begin(), keys.end()); + auto& keys = keysPair.second; + nextKeys.insert(nextKeys.end(), keys.begin(), keys.end()); + } + currentKeys = std::move(nextKeys); + } + else + { + blob.reserve(blocks.size() * dataBlockSizeInBytes_); + for (const auto& dataBlock : blocks) + { + if (!dataBlock.second.size()) + return {false, ""}; + blob.insert(blob.end(), dataBlock.second.begin() + dataBlockIdentifier.size(), + dataBlock.second.end()); + } } - currentKeys = std::move(nextKeys); - } - - std::string blob; - for (const auto& key : currentKeys) - { - auto tnx = database->createTransaction(); - if (!tnx) - return {false, ""}; - - auto resultPair = tnx->get(key); - if (!resultPair.first) - return {false, ""}; - - auto& dataBlock = resultPair.second; - if (!dataBlock.size()) - return {false, ""}; - - blob.insert(blob.end(), dataBlock.begin() + dataBlockIdentifier.size(), dataBlock.end()); } return {true, blob}; @@ -488,29 +566,30 @@ bool BlobHandler::removeKeys(std::shared_ptr database, const bool BlobHandler::removeBlob(std::shared_ptr database, const Key& key) { std::unordered_map treeLevel; - auto tnx = database->createTransaction(); - if (!tnx) - return false; - - uint32_t currentLevel = 0; + int32_t currentLevel = 0; treeLevel[0] = {key}; + bool dataBlockReached = false; while (true) { const auto& currentKeys = treeLevel[currentLevel]; std::vector blocks; - for (const auto& key : currentKeys) + blocks.reserve(currentKeys.size()); + + uint32_t index = 0; + while (index < currentKeys.size()) { - auto p = tnx->get(key); + const auto p = readBlocks(database, currentKeys, index, dataBlockReached); if (!p.first) return false; - blocks.push_back({0, p.second}); + blocks.insert(blocks.end(), p.second.begin(), p.second.end()); } - if (isDataBlock(blocks.front()) || !currentKeys.size()) + if (dataBlockReached) break; Keys nextKeys; + nextKeys.reserve(blocks.size() * numKeysInBlock_); for (const auto& block : blocks) { auto keysPair = getKeysFromBlock(block); @@ -525,8 +604,14 @@ bool BlobHandler::removeBlob(std::shared_ptr database, const treeLevel[currentLevel] = std::move(nextKeys); } - for (uint32_t level = 0; level <= currentLevel; ++level) - RETURN_ON_ERROR(removeKeys(database, treeLevel[level])); + // Start to remove keys from the bottom of the tree. + // If we fail in the middle of removing operation, we can try to remove a unremoved data again, because a + // tree is still in a valid state, even if the bottom levels are removed. + while (currentLevel >= 0) + { + RETURN_ON_ERROR(removeKeys(database, treeLevel[currentLevel])); + --currentLevel; + } return true; } @@ -536,4 +621,4 @@ bool setAPIVersion() auto err = fdb_select_api_version(FDB_API_VERSION); return err ? false : true; } -} // namespace FDBCS +} // namespace FDBCS \ No newline at end of file diff --git a/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp b/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp index 4f8f1decd..f6bae3df6 100644 --- a/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp +++ b/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp @@ -54,7 +54,7 @@ static void testBlobHandler(std::shared_ptr db) std::vector blobSizes{0, 1, 11, 101, 1001, 10001, 100001, 1000001, 10000001, 100000001}; std::vector blockSizes{10000, 100000}; #else - std::vector blobSizes{0, 1, 10001, 100001, 10000001, 100000001}; + std::vector blobSizes{0, 1, 100001, 10000001, 100000001}; std::vector blockSizes{100000}; #endif @@ -87,6 +87,10 @@ static void testBlobHandler(std::shared_ptr db) std::cout << "Read blob time: " << ms_int.count() << std::endl; t1 = high_resolution_clock::now(); #endif + auto blobB = generateBlob(blobSize + 1); + assert_internal(handler.writeOrUpdateBlob(db, rootKey, blobB), "Remove blob error"); + p = handler.readBlob(db, rootKey); + assert_internal(p.second == blobB, "Blobs not equal"); assert_internal(handler.removeBlob(db, rootKey), "Remove blob error"); #ifdef TEST_PERF t2 = high_resolution_clock::now();