You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
feat(fdb): MCOL-5802 Add support for blob insertion into FDB. (#3351)
This commit is contained in:
@ -215,6 +215,7 @@ clean_old_installation()
|
||||
rm -rf /etc/mysql
|
||||
rm -rf /etc/my.cnf.d/columnstore.cnf
|
||||
rm -rf /etc/mysql/mariadb.conf.d/columnstore.cnf
|
||||
fdbcli --exec "writemode on; clearrange \x00 \xff"
|
||||
}
|
||||
|
||||
build()
|
||||
|
@ -21,6 +21,12 @@
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <boost/uuid/random_generator.hpp>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
// https://apple.github.io/foundationdb/api-c.html
|
||||
// We have to define `FDB_API_VERSION` before include `fdb_c.h` header.
|
||||
@ -104,5 +110,77 @@ class DataBaseCreator
|
||||
static std::shared_ptr<FDBDataBase> createDataBase(const std::string clusterFilePath);
|
||||
};
|
||||
|
||||
using Block = std::pair<uint32_t, std::string>;
|
||||
using Key = std::string;
|
||||
using Keys = std::vector<Key>;
|
||||
// Maps a key to associated block.
|
||||
using KeyBlockMap = std::unordered_map<Key, Block>;
|
||||
using TreeLevelNumKeysMap = std::unordered_map<uint32_t, uint32_t>;
|
||||
|
||||
class KeyGenerator
|
||||
{
|
||||
public:
|
||||
virtual ~KeyGenerator()
|
||||
{
|
||||
}
|
||||
virtual Key generateKey() = 0;
|
||||
virtual uint32_t getKeySize() = 0;
|
||||
};
|
||||
|
||||
class BoostUIDKeyGenerator : public KeyGenerator
|
||||
{
|
||||
public:
|
||||
Key generateKey() override;
|
||||
uint32_t getKeySize() override;
|
||||
};
|
||||
|
||||
// This class represetns a machinery to handle a data `blob`.
|
||||
class BlobHandler
|
||||
{
|
||||
public:
|
||||
BlobHandler(std::shared_ptr<KeyGenerator> keyGen, uint32_t blockSizeInBytes = 100000)
|
||||
: keyGen_(keyGen), blockSizeInBytes_(blockSizeInBytes)
|
||||
{
|
||||
// Block size in 100KB shows the best performance.
|
||||
keySizeInBytes_ = keyGen_->getKeySize();
|
||||
assert(keySizeInBytes_);
|
||||
assert(blockSizeInBytes_);
|
||||
assert((keySizeInBytes_ + keyBlockIdentifier.size()) <= blockSizeInBytes_);
|
||||
numKeysInBlock_ = (blockSizeInBytes_ - keyBlockIdentifier.size()) / keySizeInBytes_;
|
||||
assert(blockSizeInBytes_ > dataBlockIdentifier.size());
|
||||
dataBlockSizeInBytes_ = (blockSizeInBytes_ - dataBlockIdentifier.size());
|
||||
}
|
||||
|
||||
// Writes the given `blob` with given `key`.
|
||||
bool writeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const ByteArray& key, const ByteArray& blob);
|
||||
// Reads `blob` by the given `key`, on error returns false.
|
||||
std::pair<bool, std::string> readBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const ByteArray& key);
|
||||
// Removes a `blob` by the given `key`, on error returns false.
|
||||
bool removeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const ByteArray& key);
|
||||
|
||||
private:
|
||||
size_t insertData(Block& block, const std::string& blob, const size_t offset);
|
||||
void insertKey(Block& block, const std::string& value);
|
||||
std::pair<bool, Keys> getKeysFromBlock(const Block& block);
|
||||
Keys generateKeys(const uint32_t num);
|
||||
bool isDataBlock(const Block& block);
|
||||
bool commitKeys(std::shared_ptr<FDBCS::FDBDataBase> database, KeyBlockMap& keyBlockMap, const Keys& keys);
|
||||
bool commitKey(std::shared_ptr<FDBCS::FDBDataBase> database, const Key& key, const ByteArray& value);
|
||||
bool removeKeys(std::shared_ptr<FDBCS::FDBDataBase> database, const Keys& keys);
|
||||
TreeLevelNumKeysMap computeNumKeysForEachTreeLevel(const int32_t treeLen, const uint32_t numBlocks);
|
||||
inline float log(const uint32_t base, const uint32_t value);
|
||||
|
||||
std::shared_ptr<KeyGenerator> keyGen_;
|
||||
uint32_t blockSizeInBytes_;
|
||||
uint32_t keySizeInBytes_;
|
||||
uint32_t numKeysInBlock_;
|
||||
uint32_t dataBlockSizeInBytes_;
|
||||
// FIXME: Doc says that 10MB is limit, currently taking in account `key` size and `value` size, but 10MB
|
||||
// limit returns error on transaction.
|
||||
const uint32_t maxTnxSize_{8192000};
|
||||
const std::string keyBlockIdentifier{"K"};
|
||||
const std::string dataBlockIdentifier{"D"};
|
||||
};
|
||||
|
||||
bool setAPIVersion();
|
||||
} // namespace FDBCS
|
||||
|
@ -19,10 +19,24 @@
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include "../include/fdbcs.hpp"
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <boost/uuid/random_generator.hpp>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
#include <chrono>
|
||||
#include "fdbcs.hpp"
|
||||
|
||||
namespace FDBCS
|
||||
{
|
||||
|
||||
#define RETURN_ON_ERROR(exp) \
|
||||
do \
|
||||
{ \
|
||||
auto rc = (exp); \
|
||||
if (!rc) \
|
||||
return rc; \
|
||||
} while (false);
|
||||
|
||||
Transaction::Transaction(FDBTransaction* tnx) : tnx_(tnx)
|
||||
{
|
||||
}
|
||||
@ -111,6 +125,13 @@ bool Transaction::commit() const
|
||||
std::cerr << "fdb_future_block_until_ready error, code: " << (int)err << std::endl;
|
||||
return false;
|
||||
}
|
||||
err = fdb_future_get_error(future);
|
||||
if (err)
|
||||
{
|
||||
fdb_future_destroy(future);
|
||||
std::cerr << "fdb_future_get_error(), code: " << (int)err << std::endl;
|
||||
return false;
|
||||
}
|
||||
fdb_future_destroy(future);
|
||||
}
|
||||
return true;
|
||||
@ -206,6 +227,310 @@ std::shared_ptr<FDBDataBase> DataBaseCreator::createDataBase(const std::string c
|
||||
return std::make_shared<FDBDataBase>(database);
|
||||
}
|
||||
|
||||
Key BoostUIDKeyGenerator::generateKey()
|
||||
{
|
||||
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
|
||||
}
|
||||
|
||||
uint32_t BoostUIDKeyGenerator::getKeySize()
|
||||
{
|
||||
return boost::lexical_cast<std::string>(boost::uuids::random_generator()()).size();
|
||||
}
|
||||
|
||||
Keys BlobHandler::generateKeys(const uint32_t num)
|
||||
{
|
||||
Keys keys;
|
||||
keys.reserve(num);
|
||||
for (uint32_t i = 0; i < num; ++i)
|
||||
keys.push_back(keyGen_->generateKey());
|
||||
|
||||
return keys;
|
||||
}
|
||||
// FIXME: Put it to util?
|
||||
float BlobHandler::log(const uint32_t base, const uint32_t value)
|
||||
{
|
||||
return std::log(value) / std::log(base);
|
||||
}
|
||||
|
||||
void BlobHandler::insertKey(Block& block, const std::string& value)
|
||||
{
|
||||
if (!block.first)
|
||||
{
|
||||
block.second.reserve(blockSizeInBytes_);
|
||||
block.second.insert(block.second.end(), keyBlockIdentifier.begin(), keyBlockIdentifier.end());
|
||||
block.first += keyBlockIdentifier.size();
|
||||
}
|
||||
block.second.insert(block.second.begin() + block.first, value.begin(), value.end());
|
||||
block.first += value.size();
|
||||
}
|
||||
|
||||
size_t BlobHandler::insertData(Block& block, const std::string& blob, const size_t offset)
|
||||
{
|
||||
const size_t endOfBlock = std::min(offset + dataBlockSizeInBytes_, blob.size());
|
||||
auto& dataBlock = block.second;
|
||||
if (!block.first)
|
||||
{
|
||||
dataBlock.reserve(blockSizeInBytes_);
|
||||
dataBlock.insert(dataBlock.end(), dataBlockIdentifier.begin(), dataBlockIdentifier.end());
|
||||
block.first += dataBlockIdentifier.size();
|
||||
}
|
||||
dataBlock.insert(dataBlock.begin() + block.first, blob.begin() + offset, blob.begin() + endOfBlock);
|
||||
return endOfBlock;
|
||||
}
|
||||
|
||||
bool BlobHandler::commitKeys(std::shared_ptr<FDBCS::FDBDataBase> dataBase, KeyBlockMap& keyBlockMap,
|
||||
const Keys& keys)
|
||||
{
|
||||
auto tnx = dataBase->createTransaction();
|
||||
if (!tnx)
|
||||
return false;
|
||||
|
||||
for (const auto& key : keys)
|
||||
tnx->set(key, keyBlockMap[key].second);
|
||||
|
||||
return tnx->commit();
|
||||
}
|
||||
|
||||
bool BlobHandler::commitKey(std::shared_ptr<FDBCS::FDBDataBase> dataBase, const Key& key,
|
||||
const ByteArray& value)
|
||||
{
|
||||
auto tnx = dataBase->createTransaction();
|
||||
tnx->set(key, value);
|
||||
return tnx->commit();
|
||||
}
|
||||
|
||||
TreeLevelNumKeysMap BlobHandler::computeNumKeysForEachTreeLevel(const int32_t treeLen,
|
||||
const uint32_t numBlocks)
|
||||
{
|
||||
TreeLevelNumKeysMap levelMap;
|
||||
levelMap[treeLen] = numBlocks;
|
||||
if (!treeLen)
|
||||
return levelMap;
|
||||
|
||||
for (int32_t level = treeLen - 1; level >= 0; --level)
|
||||
{
|
||||
if (level + 1 == treeLen)
|
||||
levelMap[level] = levelMap[level + 1];
|
||||
else
|
||||
levelMap[level] = (levelMap[level + 1] + (numKeysInBlock_ - 1)) / numKeysInBlock_;
|
||||
}
|
||||
return levelMap;
|
||||
}
|
||||
|
||||
bool BlobHandler::writeBlob(std::shared_ptr<FDBCS::FDBDataBase> dataBase, const ByteArray& key,
|
||||
const ByteArray& blob)
|
||||
{
|
||||
const size_t blobSizeInBytes = blob.size();
|
||||
if (!blobSizeInBytes)
|
||||
return commitKey(dataBase, key, "");
|
||||
|
||||
const uint32_t numDataBlocks = (blobSizeInBytes + (dataBlockSizeInBytes_ - 1)) / dataBlockSizeInBytes_;
|
||||
const uint32_t treeLen = std::ceil(log(numKeysInBlock_, numDataBlocks));
|
||||
Keys currentKeys{key};
|
||||
auto numKeyLevelMap = computeNumKeysForEachTreeLevel(treeLen, numDataBlocks);
|
||||
|
||||
KeyBlockMap keyBlockMap;
|
||||
keyBlockMap[key] = {0, std::string()};
|
||||
Keys keysInTnx;
|
||||
size_t currentTnxSize = 0;
|
||||
|
||||
for (uint32_t currentLevel = 0; currentLevel < treeLen; ++currentLevel)
|
||||
{
|
||||
const uint32_t nextLevelKeyNum = numKeyLevelMap[currentLevel];
|
||||
auto nextLevelKeys = generateKeys(nextLevelKeyNum);
|
||||
uint32_t nextKeysIt = 0;
|
||||
for (uint32_t i = 0, size = currentKeys.size(); i < size && nextKeysIt < nextLevelKeyNum; ++i)
|
||||
{
|
||||
const auto& currentKey = currentKeys[i];
|
||||
auto& block = keyBlockMap[currentKey];
|
||||
for (uint32_t j = 0; j < numKeysInBlock_ && nextKeysIt < nextLevelKeyNum; ++j, ++nextKeysIt)
|
||||
{
|
||||
const auto& nextKey = nextLevelKeys[nextKeysIt];
|
||||
insertKey(block, nextKey);
|
||||
keyBlockMap[nextKey] = {0, std::string()};
|
||||
}
|
||||
if (currentTnxSize + (keySizeInBytes_ + block.second.size()) >= maxTnxSize_)
|
||||
{
|
||||
RETURN_ON_ERROR(commitKeys(dataBase, keyBlockMap, keysInTnx));
|
||||
currentTnxSize = 0;
|
||||
keysInTnx.clear();
|
||||
}
|
||||
currentTnxSize += block.second.size() + keySizeInBytes_;
|
||||
keysInTnx.push_back(currentKey);
|
||||
}
|
||||
currentKeys = std::move(nextLevelKeys);
|
||||
}
|
||||
|
||||
size_t offset = 0;
|
||||
for (uint32_t i = 0; i < numDataBlocks; ++i)
|
||||
{
|
||||
const auto& currentKey = currentKeys[i];
|
||||
auto& block = keyBlockMap[currentKey];
|
||||
offset = insertData(block, blob, offset);
|
||||
if (currentTnxSize + (keySizeInBytes_ + block.second.size()) >= maxTnxSize_)
|
||||
{
|
||||
RETURN_ON_ERROR(commitKeys(dataBase, keyBlockMap, keysInTnx));
|
||||
currentTnxSize = 0;
|
||||
keysInTnx.clear();
|
||||
}
|
||||
keysInTnx.push_back(currentKey);
|
||||
currentTnxSize += block.second.size() + keySizeInBytes_;
|
||||
}
|
||||
|
||||
if (currentTnxSize)
|
||||
RETURN_ON_ERROR(commitKeys(dataBase, keyBlockMap, keysInTnx));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::pair<bool, Keys> BlobHandler::getKeysFromBlock(const Block& block)
|
||||
{
|
||||
Keys keys;
|
||||
const auto& blockData = block.second;
|
||||
if (blockData.size() > blockSizeInBytes_)
|
||||
return {false, {""}};
|
||||
|
||||
uint32_t offset = 1;
|
||||
for (uint32_t i = 0; i < numKeysInBlock_ && offset + keySizeInBytes_ <= blockData.size(); ++i)
|
||||
{
|
||||
Key key(blockData.begin() + offset, blockData.begin() + offset + keySizeInBytes_);
|
||||
keys.push_back(std::move(key));
|
||||
offset += keySizeInBytes_;
|
||||
}
|
||||
|
||||
return {true, keys};
|
||||
}
|
||||
|
||||
bool BlobHandler::isDataBlock(const Block& block)
|
||||
{
|
||||
return block.second.compare(0, keyBlockIdentifier.size(), keyBlockIdentifier) != 0;
|
||||
}
|
||||
|
||||
std::pair<bool, std::string> BlobHandler::readBlob(std::shared_ptr<FDBCS::FDBDataBase> database,
|
||||
const ByteArray& key)
|
||||
{
|
||||
Keys currentKeys{key};
|
||||
bool dataBlockReached = false;
|
||||
|
||||
while (!dataBlockReached)
|
||||
{
|
||||
auto tnx = database->createTransaction();
|
||||
if (!tnx)
|
||||
return {false, ""};
|
||||
|
||||
std::vector<Block> blocks;
|
||||
for (const auto& key : currentKeys)
|
||||
{
|
||||
auto p = tnx->get(key);
|
||||
if (!p.first)
|
||||
return {false, ""};
|
||||
|
||||
Block block{0, p.second};
|
||||
if (isDataBlock(block))
|
||||
{
|
||||
dataBlockReached = true;
|
||||
break;
|
||||
}
|
||||
blocks.push_back(block);
|
||||
}
|
||||
|
||||
if (dataBlockReached)
|
||||
break;
|
||||
|
||||
Keys nextKeys;
|
||||
for (const auto& block : blocks)
|
||||
{
|
||||
auto keysPair = getKeysFromBlock(block);
|
||||
if (!keysPair.first)
|
||||
return {false, ""};
|
||||
|
||||
auto& keys = keysPair.second;
|
||||
nextKeys.insert(nextKeys.end(), keys.begin(), keys.end());
|
||||
}
|
||||
currentKeys = std::move(nextKeys);
|
||||
}
|
||||
|
||||
std::string blob;
|
||||
for (const auto& key : currentKeys)
|
||||
{
|
||||
auto tnx = database->createTransaction();
|
||||
if (!tnx)
|
||||
return {false, ""};
|
||||
|
||||
auto resultPair = tnx->get(key);
|
||||
if (!resultPair.first)
|
||||
return {false, ""};
|
||||
|
||||
auto& dataBlock = resultPair.second;
|
||||
if (!dataBlock.size())
|
||||
return {false, ""};
|
||||
|
||||
blob.insert(blob.end(), dataBlock.begin() + dataBlockIdentifier.size(), dataBlock.end());
|
||||
}
|
||||
|
||||
return {true, blob};
|
||||
}
|
||||
|
||||
bool BlobHandler::removeKeys(std::shared_ptr<FDBCS::FDBDataBase> database, const Keys& keys)
|
||||
{
|
||||
for (const auto& key : keys)
|
||||
{
|
||||
auto tnx = database->createTransaction();
|
||||
if (!tnx)
|
||||
return false;
|
||||
tnx->remove(key);
|
||||
RETURN_ON_ERROR(tnx->commit());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool BlobHandler::removeBlob(std::shared_ptr<FDBCS::FDBDataBase> database, const Key& key)
|
||||
{
|
||||
std::unordered_map<uint32_t, Keys> treeLevel;
|
||||
auto tnx = database->createTransaction();
|
||||
if (!tnx)
|
||||
return false;
|
||||
|
||||
uint32_t currentLevel = 0;
|
||||
treeLevel[0] = {key};
|
||||
|
||||
while (true)
|
||||
{
|
||||
const auto& currentKeys = treeLevel[currentLevel];
|
||||
std::vector<Block> blocks;
|
||||
for (const auto& key : currentKeys)
|
||||
{
|
||||
auto p = tnx->get(key);
|
||||
if (!p.first)
|
||||
return false;
|
||||
blocks.push_back({0, p.second});
|
||||
}
|
||||
|
||||
if (isDataBlock(blocks.front()) || !currentKeys.size())
|
||||
break;
|
||||
|
||||
Keys nextKeys;
|
||||
for (const auto& block : blocks)
|
||||
{
|
||||
auto keysPair = getKeysFromBlock(block);
|
||||
if (!keysPair.first)
|
||||
return false;
|
||||
|
||||
auto& keys = keysPair.second;
|
||||
nextKeys.insert(nextKeys.end(), keys.begin(), keys.end());
|
||||
}
|
||||
|
||||
++currentLevel;
|
||||
treeLevel[currentLevel] = std::move(nextKeys);
|
||||
}
|
||||
|
||||
for (uint32_t level = 0; level <= currentLevel; ++level)
|
||||
RETURN_ON_ERROR(removeKeys(database, treeLevel[level]));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool setAPIVersion()
|
||||
{
|
||||
auto err = fdb_select_api_version(FDB_API_VERSION);
|
||||
|
@ -20,6 +20,12 @@
|
||||
|
||||
using namespace std;
|
||||
using namespace FDBCS;
|
||||
using std::chrono::duration;
|
||||
using std::chrono::duration_cast;
|
||||
using std::chrono::high_resolution_clock;
|
||||
using std::chrono::milliseconds;
|
||||
|
||||
// #define TEST_PERF 1
|
||||
|
||||
template <typename T>
|
||||
static void assert_internal(const T& value, const std::string& errMessage)
|
||||
@ -31,6 +37,68 @@ static void assert_internal(const T& value, const std::string& errMessage)
|
||||
}
|
||||
}
|
||||
|
||||
static std::string generateBlob(const uint32_t len)
|
||||
{
|
||||
std::string blob;
|
||||
blob.reserve(len);
|
||||
for (uint32_t i = 0; i < len; ++i)
|
||||
{
|
||||
blob.push_back('a' + (i % 26));
|
||||
}
|
||||
return blob;
|
||||
}
|
||||
|
||||
static void testBlobHandler(std::shared_ptr<FDBCS::FDBDataBase> db)
|
||||
{
|
||||
#ifdef TEST_PERF
|
||||
std::vector<uint32_t> blobSizes{0, 1, 11, 101, 1001, 10001, 100001, 1000001, 10000001, 100000001};
|
||||
std::vector<uint32_t> blockSizes{10000, 100000};
|
||||
#else
|
||||
std::vector<uint32_t> blobSizes{0, 1, 10001, 100001, 10000001, 100000001};
|
||||
std::vector<uint32_t> blockSizes{100000};
|
||||
#endif
|
||||
|
||||
for (auto blobSize : blobSizes)
|
||||
{
|
||||
for (auto blockSize : blockSizes)
|
||||
{
|
||||
std::string rootKey = "root";
|
||||
auto blobA = generateBlob(blobSize);
|
||||
std::shared_ptr<BoostUIDKeyGenerator> gen = std::make_shared<BoostUIDKeyGenerator>();
|
||||
BlobHandler handler(gen, blockSize);
|
||||
#ifdef TEST_PERF
|
||||
auto t1 = high_resolution_clock::now();
|
||||
#endif
|
||||
handler.writeBlob(db, rootKey, blobA);
|
||||
#ifdef TEST_PERF
|
||||
auto t2 = high_resolution_clock::now();
|
||||
auto ms_int = duration_cast<milliseconds>(t2 - t1);
|
||||
std::cout << "Write blob time: " << ms_int.count() << std::endl;
|
||||
t1 = high_resolution_clock::now();
|
||||
#endif
|
||||
auto p = handler.readBlob(db, rootKey);
|
||||
#ifdef TEST_PERF
|
||||
t2 = high_resolution_clock::now();
|
||||
cout << "size readed " << p.second.size() << endl;
|
||||
#endif
|
||||
assert_internal(p.second == blobA, "Blobs not equal");
|
||||
#ifdef TEST_PERF
|
||||
ms_int = duration_cast<milliseconds>(t2 - t1);
|
||||
std::cout << "Read blob time: " << ms_int.count() << std::endl;
|
||||
t1 = high_resolution_clock::now();
|
||||
#endif
|
||||
assert_internal(handler.removeBlob(db, rootKey), "Remove blob error");
|
||||
#ifdef TEST_PERF
|
||||
t2 = high_resolution_clock::now();
|
||||
ms_int = duration_cast<milliseconds>(t2 - t1);
|
||||
std::cout << "Remove blob time: " << ms_int.count() << std::endl;
|
||||
#endif
|
||||
p = handler.readBlob(db, rootKey);
|
||||
assert_internal(!p.first, "Blob present after remove");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
std::string path = "/etc/foundationdb/fdb.cluster";
|
||||
@ -98,5 +166,6 @@ int main()
|
||||
}
|
||||
}
|
||||
|
||||
testBlobHandler(db);
|
||||
return 0;
|
||||
}
|
||||
|
Reference in New Issue
Block a user