diff --git a/build/bootstrap_mcs.sh b/build/bootstrap_mcs.sh index 3821eef46..5bc1e706b 100755 --- a/build/bootstrap_mcs.sh +++ b/build/bootstrap_mcs.sh @@ -215,6 +215,7 @@ clean_old_installation() rm -rf /etc/mysql rm -rf /etc/my.cnf.d/columnstore.cnf rm -rf /etc/mysql/mariadb.conf.d/columnstore.cnf + fdbcli --exec "writemode on; clearrange \x00 \xff" } build() diff --git a/utils/fdb_wrapper_cpp/include/fdbcs.hpp b/utils/fdb_wrapper_cpp/include/fdbcs.hpp index 202b5318d..4298dfba9 100644 --- a/utils/fdb_wrapper_cpp/include/fdbcs.hpp +++ b/utils/fdb_wrapper_cpp/include/fdbcs.hpp @@ -21,6 +21,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include // https://apple.github.io/foundationdb/api-c.html // We have to define `FDB_API_VERSION` before include `fdb_c.h` header. @@ -104,5 +110,77 @@ class DataBaseCreator static std::shared_ptr createDataBase(const std::string clusterFilePath); }; +using Block = std::pair; +using Key = std::string; +using Keys = std::vector; +// Maps a key to associated block. +using KeyBlockMap = std::unordered_map; +using TreeLevelNumKeysMap = std::unordered_map; + +class KeyGenerator +{ + public: + virtual ~KeyGenerator() + { + } + virtual Key generateKey() = 0; + virtual uint32_t getKeySize() = 0; +}; + +class BoostUIDKeyGenerator : public KeyGenerator +{ + public: + Key generateKey() override; + uint32_t getKeySize() override; +}; + +// This class represetns a machinery to handle a data `blob`. +class BlobHandler +{ + public: + BlobHandler(std::shared_ptr keyGen, uint32_t blockSizeInBytes = 100000) + : keyGen_(keyGen), blockSizeInBytes_(blockSizeInBytes) + { + // Block size in 100KB shows the best performance. + keySizeInBytes_ = keyGen_->getKeySize(); + assert(keySizeInBytes_); + assert(blockSizeInBytes_); + assert((keySizeInBytes_ + keyBlockIdentifier.size()) <= blockSizeInBytes_); + numKeysInBlock_ = (blockSizeInBytes_ - keyBlockIdentifier.size()) / keySizeInBytes_; + assert(blockSizeInBytes_ > dataBlockIdentifier.size()); + dataBlockSizeInBytes_ = (blockSizeInBytes_ - dataBlockIdentifier.size()); + } + + // Writes the given `blob` with given `key`. + bool writeBlob(std::shared_ptr database, const ByteArray& key, const ByteArray& blob); + // Reads `blob` by the given `key`, on error returns false. + std::pair readBlob(std::shared_ptr database, const ByteArray& key); + // Removes a `blob` by the given `key`, on error returns false. + bool removeBlob(std::shared_ptr database, const ByteArray& key); + + private: + size_t insertData(Block& block, const std::string& blob, const size_t offset); + void insertKey(Block& block, const std::string& value); + std::pair getKeysFromBlock(const Block& block); + Keys generateKeys(const uint32_t num); + bool isDataBlock(const Block& block); + bool commitKeys(std::shared_ptr database, KeyBlockMap& keyBlockMap, const Keys& keys); + bool commitKey(std::shared_ptr database, const Key& key, const ByteArray& value); + bool removeKeys(std::shared_ptr database, const Keys& keys); + TreeLevelNumKeysMap computeNumKeysForEachTreeLevel(const int32_t treeLen, const uint32_t numBlocks); + inline float log(const uint32_t base, const uint32_t value); + + std::shared_ptr keyGen_; + uint32_t blockSizeInBytes_; + uint32_t keySizeInBytes_; + uint32_t numKeysInBlock_; + uint32_t dataBlockSizeInBytes_; + // FIXME: Doc says that 10MB is limit, currently taking in account `key` size and `value` size, but 10MB + // limit returns error on transaction. + const uint32_t maxTnxSize_{8192000}; + const std::string keyBlockIdentifier{"K"}; + const std::string dataBlockIdentifier{"D"}; +}; + bool setAPIVersion(); } // namespace FDBCS diff --git a/utils/fdb_wrapper_cpp/src/fdbcs.cpp b/utils/fdb_wrapper_cpp/src/fdbcs.cpp index f85314da9..a6de89676 100644 --- a/utils/fdb_wrapper_cpp/src/fdbcs.cpp +++ b/utils/fdb_wrapper_cpp/src/fdbcs.cpp @@ -19,10 +19,24 @@ #include #include #include -#include "../include/fdbcs.hpp" +#include +#include +#include +#include +#include +#include "fdbcs.hpp" namespace FDBCS { + +#define RETURN_ON_ERROR(exp) \ + do \ + { \ + auto rc = (exp); \ + if (!rc) \ + return rc; \ + } while (false); + Transaction::Transaction(FDBTransaction* tnx) : tnx_(tnx) { } @@ -111,6 +125,13 @@ bool Transaction::commit() const std::cerr << "fdb_future_block_until_ready error, code: " << (int)err << std::endl; return false; } + err = fdb_future_get_error(future); + if (err) + { + fdb_future_destroy(future); + std::cerr << "fdb_future_get_error(), code: " << (int)err << std::endl; + return false; + } fdb_future_destroy(future); } return true; @@ -206,6 +227,310 @@ std::shared_ptr DataBaseCreator::createDataBase(const std::string c return std::make_shared(database); } +Key BoostUIDKeyGenerator::generateKey() +{ + return boost::lexical_cast(boost::uuids::random_generator()()); +} + +uint32_t BoostUIDKeyGenerator::getKeySize() +{ + return boost::lexical_cast(boost::uuids::random_generator()()).size(); +} + +Keys BlobHandler::generateKeys(const uint32_t num) +{ + Keys keys; + keys.reserve(num); + for (uint32_t i = 0; i < num; ++i) + keys.push_back(keyGen_->generateKey()); + + return keys; +} +// FIXME: Put it to util? +float BlobHandler::log(const uint32_t base, const uint32_t value) +{ + return std::log(value) / std::log(base); +} + +void BlobHandler::insertKey(Block& block, const std::string& value) +{ + if (!block.first) + { + block.second.reserve(blockSizeInBytes_); + block.second.insert(block.second.end(), keyBlockIdentifier.begin(), keyBlockIdentifier.end()); + block.first += keyBlockIdentifier.size(); + } + block.second.insert(block.second.begin() + block.first, value.begin(), value.end()); + block.first += value.size(); +} + +size_t BlobHandler::insertData(Block& block, const std::string& blob, const size_t offset) +{ + const size_t endOfBlock = std::min(offset + dataBlockSizeInBytes_, blob.size()); + auto& dataBlock = block.second; + if (!block.first) + { + dataBlock.reserve(blockSizeInBytes_); + dataBlock.insert(dataBlock.end(), dataBlockIdentifier.begin(), dataBlockIdentifier.end()); + block.first += dataBlockIdentifier.size(); + } + dataBlock.insert(dataBlock.begin() + block.first, blob.begin() + offset, blob.begin() + endOfBlock); + return endOfBlock; +} + +bool BlobHandler::commitKeys(std::shared_ptr dataBase, KeyBlockMap& keyBlockMap, + const Keys& keys) +{ + auto tnx = dataBase->createTransaction(); + if (!tnx) + return false; + + for (const auto& key : keys) + tnx->set(key, keyBlockMap[key].second); + + return tnx->commit(); +} + +bool BlobHandler::commitKey(std::shared_ptr dataBase, const Key& key, + const ByteArray& value) +{ + auto tnx = dataBase->createTransaction(); + tnx->set(key, value); + return tnx->commit(); +} + +TreeLevelNumKeysMap BlobHandler::computeNumKeysForEachTreeLevel(const int32_t treeLen, + const uint32_t numBlocks) +{ + TreeLevelNumKeysMap levelMap; + levelMap[treeLen] = numBlocks; + if (!treeLen) + return levelMap; + + for (int32_t level = treeLen - 1; level >= 0; --level) + { + if (level + 1 == treeLen) + levelMap[level] = levelMap[level + 1]; + else + levelMap[level] = (levelMap[level + 1] + (numKeysInBlock_ - 1)) / numKeysInBlock_; + } + return levelMap; +} + +bool BlobHandler::writeBlob(std::shared_ptr dataBase, const ByteArray& key, + const ByteArray& blob) +{ + const size_t blobSizeInBytes = blob.size(); + if (!blobSizeInBytes) + return commitKey(dataBase, key, ""); + + const uint32_t numDataBlocks = (blobSizeInBytes + (dataBlockSizeInBytes_ - 1)) / dataBlockSizeInBytes_; + const uint32_t treeLen = std::ceil(log(numKeysInBlock_, numDataBlocks)); + Keys currentKeys{key}; + auto numKeyLevelMap = computeNumKeysForEachTreeLevel(treeLen, numDataBlocks); + + KeyBlockMap keyBlockMap; + keyBlockMap[key] = {0, std::string()}; + Keys keysInTnx; + size_t currentTnxSize = 0; + + for (uint32_t currentLevel = 0; currentLevel < treeLen; ++currentLevel) + { + const uint32_t nextLevelKeyNum = numKeyLevelMap[currentLevel]; + auto nextLevelKeys = generateKeys(nextLevelKeyNum); + uint32_t nextKeysIt = 0; + for (uint32_t i = 0, size = currentKeys.size(); i < size && nextKeysIt < nextLevelKeyNum; ++i) + { + const auto& currentKey = currentKeys[i]; + auto& block = keyBlockMap[currentKey]; + for (uint32_t j = 0; j < numKeysInBlock_ && nextKeysIt < nextLevelKeyNum; ++j, ++nextKeysIt) + { + const auto& nextKey = nextLevelKeys[nextKeysIt]; + insertKey(block, nextKey); + keyBlockMap[nextKey] = {0, std::string()}; + } + if (currentTnxSize + (keySizeInBytes_ + block.second.size()) >= maxTnxSize_) + { + RETURN_ON_ERROR(commitKeys(dataBase, keyBlockMap, keysInTnx)); + currentTnxSize = 0; + keysInTnx.clear(); + } + currentTnxSize += block.second.size() + keySizeInBytes_; + keysInTnx.push_back(currentKey); + } + currentKeys = std::move(nextLevelKeys); + } + + size_t offset = 0; + for (uint32_t i = 0; i < numDataBlocks; ++i) + { + const auto& currentKey = currentKeys[i]; + auto& block = keyBlockMap[currentKey]; + offset = insertData(block, blob, offset); + if (currentTnxSize + (keySizeInBytes_ + block.second.size()) >= maxTnxSize_) + { + RETURN_ON_ERROR(commitKeys(dataBase, keyBlockMap, keysInTnx)); + currentTnxSize = 0; + keysInTnx.clear(); + } + keysInTnx.push_back(currentKey); + currentTnxSize += block.second.size() + keySizeInBytes_; + } + + if (currentTnxSize) + RETURN_ON_ERROR(commitKeys(dataBase, keyBlockMap, keysInTnx)); + + return true; +} + +std::pair BlobHandler::getKeysFromBlock(const Block& block) +{ + Keys keys; + const auto& blockData = block.second; + if (blockData.size() > blockSizeInBytes_) + return {false, {""}}; + + uint32_t offset = 1; + for (uint32_t i = 0; i < numKeysInBlock_ && offset + keySizeInBytes_ <= blockData.size(); ++i) + { + Key key(blockData.begin() + offset, blockData.begin() + offset + keySizeInBytes_); + keys.push_back(std::move(key)); + offset += keySizeInBytes_; + } + + return {true, keys}; +} + +bool BlobHandler::isDataBlock(const Block& block) +{ + return block.second.compare(0, keyBlockIdentifier.size(), keyBlockIdentifier) != 0; +} + +std::pair BlobHandler::readBlob(std::shared_ptr database, + const ByteArray& key) +{ + Keys currentKeys{key}; + bool dataBlockReached = false; + + while (!dataBlockReached) + { + auto tnx = database->createTransaction(); + if (!tnx) + return {false, ""}; + + std::vector blocks; + for (const auto& key : currentKeys) + { + auto p = tnx->get(key); + if (!p.first) + return {false, ""}; + + Block block{0, p.second}; + if (isDataBlock(block)) + { + dataBlockReached = true; + break; + } + blocks.push_back(block); + } + + if (dataBlockReached) + break; + + Keys nextKeys; + for (const auto& block : blocks) + { + auto keysPair = getKeysFromBlock(block); + if (!keysPair.first) + return {false, ""}; + + auto& keys = keysPair.second; + nextKeys.insert(nextKeys.end(), keys.begin(), keys.end()); + } + currentKeys = std::move(nextKeys); + } + + std::string blob; + for (const auto& key : currentKeys) + { + auto tnx = database->createTransaction(); + if (!tnx) + return {false, ""}; + + auto resultPair = tnx->get(key); + if (!resultPair.first) + return {false, ""}; + + auto& dataBlock = resultPair.second; + if (!dataBlock.size()) + return {false, ""}; + + blob.insert(blob.end(), dataBlock.begin() + dataBlockIdentifier.size(), dataBlock.end()); + } + + return {true, blob}; +} + +bool BlobHandler::removeKeys(std::shared_ptr database, const Keys& keys) +{ + for (const auto& key : keys) + { + auto tnx = database->createTransaction(); + if (!tnx) + return false; + tnx->remove(key); + RETURN_ON_ERROR(tnx->commit()); + } + + return true; +} + +bool BlobHandler::removeBlob(std::shared_ptr database, const Key& key) +{ + std::unordered_map treeLevel; + auto tnx = database->createTransaction(); + if (!tnx) + return false; + + uint32_t currentLevel = 0; + treeLevel[0] = {key}; + + while (true) + { + const auto& currentKeys = treeLevel[currentLevel]; + std::vector blocks; + for (const auto& key : currentKeys) + { + auto p = tnx->get(key); + if (!p.first) + return false; + blocks.push_back({0, p.second}); + } + + if (isDataBlock(blocks.front()) || !currentKeys.size()) + break; + + Keys nextKeys; + for (const auto& block : blocks) + { + auto keysPair = getKeysFromBlock(block); + if (!keysPair.first) + return false; + + auto& keys = keysPair.second; + nextKeys.insert(nextKeys.end(), keys.begin(), keys.end()); + } + + ++currentLevel; + treeLevel[currentLevel] = std::move(nextKeys); + } + + for (uint32_t level = 0; level <= currentLevel; ++level) + RETURN_ON_ERROR(removeKeys(database, treeLevel[level])); + + return true; +} + bool setAPIVersion() { auto err = fdb_select_api_version(FDB_API_VERSION); diff --git a/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp b/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp index dc9edb3e7..4f8f1decd 100644 --- a/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp +++ b/utils/fdb_wrapper_cpp/test/test_fdb_api.cpp @@ -20,6 +20,12 @@ using namespace std; using namespace FDBCS; +using std::chrono::duration; +using std::chrono::duration_cast; +using std::chrono::high_resolution_clock; +using std::chrono::milliseconds; + +// #define TEST_PERF 1 template static void assert_internal(const T& value, const std::string& errMessage) @@ -31,6 +37,68 @@ static void assert_internal(const T& value, const std::string& errMessage) } } +static std::string generateBlob(const uint32_t len) +{ + std::string blob; + blob.reserve(len); + for (uint32_t i = 0; i < len; ++i) + { + blob.push_back('a' + (i % 26)); + } + return blob; +} + +static void testBlobHandler(std::shared_ptr db) +{ +#ifdef TEST_PERF + std::vector blobSizes{0, 1, 11, 101, 1001, 10001, 100001, 1000001, 10000001, 100000001}; + std::vector blockSizes{10000, 100000}; +#else + std::vector blobSizes{0, 1, 10001, 100001, 10000001, 100000001}; + std::vector blockSizes{100000}; +#endif + + for (auto blobSize : blobSizes) + { + for (auto blockSize : blockSizes) + { + std::string rootKey = "root"; + auto blobA = generateBlob(blobSize); + std::shared_ptr gen = std::make_shared(); + BlobHandler handler(gen, blockSize); +#ifdef TEST_PERF + auto t1 = high_resolution_clock::now(); +#endif + handler.writeBlob(db, rootKey, blobA); +#ifdef TEST_PERF + auto t2 = high_resolution_clock::now(); + auto ms_int = duration_cast(t2 - t1); + std::cout << "Write blob time: " << ms_int.count() << std::endl; + t1 = high_resolution_clock::now(); +#endif + auto p = handler.readBlob(db, rootKey); +#ifdef TEST_PERF + t2 = high_resolution_clock::now(); + cout << "size readed " << p.second.size() << endl; +#endif + assert_internal(p.second == blobA, "Blobs not equal"); +#ifdef TEST_PERF + ms_int = duration_cast(t2 - t1); + std::cout << "Read blob time: " << ms_int.count() << std::endl; + t1 = high_resolution_clock::now(); +#endif + assert_internal(handler.removeBlob(db, rootKey), "Remove blob error"); +#ifdef TEST_PERF + t2 = high_resolution_clock::now(); + ms_int = duration_cast(t2 - t1); + std::cout << "Remove blob time: " << ms_int.count() << std::endl; +#endif + p = handler.readBlob(db, rootKey); + assert_internal(!p.first, "Blob present after remove"); + } + } +} + int main() { std::string path = "/etc/foundationdb/fdb.cluster"; @@ -98,5 +166,6 @@ int main() } } + testBlobHandler(db); return 0; }