1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Deep build refactoring phase 2 (#3564)

* configcpp refactored

* chore(build): massive removals, auto add files to debian install file

* chore(build): configure before autobake

* chore(build): use custom cmake commands for components, mariadb-plugin-columnstore.install generated

* chore(build): install deps as separate step for build-packages

* more deps

* chore(codemanagement, build): build refactoring stage2

* chore(safety): Locked Map for MessageqCpp with a simpler way

 Please enter the commit message for your changes. Lines starting

* chore(codemanagement, ci): better coredumps handling, deps fixed

* Delete build/bootstrap_mcs.py

* Update charset.cpp (add license)
This commit is contained in:
Leonid Fedorov
2025-07-17 16:14:10 +04:00
committed by GitHub
parent d0ee5dae32
commit 449029a827
107 changed files with 354 additions and 3327 deletions

View File

@ -9,10 +9,17 @@ set(common_LIB_SRCS
MonitorProcMem.cpp
nullvaluemanip.cpp
threadnaming.cpp
utils_utf8.cpp
statistics.cpp
string_prefixes.cpp
)
columnstore_library(common ${common_LIB_SRCS})
columnstore_link(common boost_filesystem configcpp loggingcpp messageqcpp)
columnstore_link(
common
PRIVATE
boost_filesystem
configcpp
loggingcpp
messageqcpp
idbdatafile
mariadb_charset
)

View File

@ -1,39 +0,0 @@
/* Copyright (C) 2020 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <type_traits>
#include <cstring>
namespace utils
{
template <class To, class From>
std::enable_if_t<
sizeof(To) == sizeof(From) && std::is_trivially_copyable_v<From> && std::is_trivially_copyable_v<To>, To>
// constexpr support needs compiler magic
bitCast(const From& src) noexcept
{
static_assert(std::is_trivially_constructible_v<To>,
"This implementation additionally requires "
"destination type to be trivially constructible");
To dst;
std::memcpy(&dst, &src, sizeof(To));
return dst;
}
} // namespace utils

View File

@ -19,12 +19,6 @@
#pragma once
#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
#ifndef __builtin_expect
#define __builtin_expect(x, expected_value) (x)
#endif
#endif
#ifndef LIKELY
#define LIKELY(x) __builtin_expect((x), 1)
#define UNLIKELY(x) __builtin_expect((x), 0)

View File

@ -1,278 +0,0 @@
/*
Copyright (C) 2020-2022 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#if defined(PREFER_MY_CONFIG_H)
#if !defined(MY_CONFIG_H)
#error my_config.h was not included (but PREFER_MY_CONFIG_H was set)
#endif
#include "mcsconfig_conflicting_defs_remember.h"
#include "mcsconfig_conflicting_defs_undef.h"
#else
#if defined(MY_CONFIG_H)
#error my_config.h was included before mcsconfig.h (and PREFER_MY_CONFIG_H was not set)
#endif
#endif // PREFER_MY_CONFIG_H
#include "mcsconfig.h"
#include "exceptclasses.h"
#include "conststring.h"
/*
Redefine definitions used by MariaDB m_ctype.h.
This is needed to avoid including <mariadb.h> and <my_sys.h>,
which conflict with many MCS and boost headers.
*/
#ifndef FALSE
#define FALSE (0)
#endif
#ifndef TRUE
#define TRUE (1)
#endif
#ifndef DBUG_ASSERT
#define DBUG_ASSERT(x) idbassert(x)
#define DBUG_ASSERT_TEMPORARILY_DEFINED
#endif
#ifndef MYSQL_PLUGIN_IMPORT
#if (defined(_WIN32) && defined(MYSQL_DYNAMIC_PLUGIN))
#define MYSQL_PLUGIN_IMPORT __declspec(dllimport)
#else
#define MYSQL_PLUGIN_IMPORT
#endif
#endif
typedef long long int longlong;
typedef unsigned long long int ulonglong;
typedef uint32_t uint32;
typedef uint16_t uint16;
typedef char my_bool;
typedef unsigned char uchar;
#if defined(__GNUC__) && !defined(_lint)
typedef char pchar; /* Mixed prototypes can take char */
typedef char puchar; /* Mixed prototypes can take char */
typedef char pbool; /* Mixed prototypes can take char */
typedef short pshort; /* Mixed prototypes can take short int */
typedef float pfloat; /* Mixed prototypes can take float */
#else
typedef int pchar; /* Mixed prototypes can't take char */
typedef uint puchar; /* Mixed prototypes can't take char */
typedef int pbool; /* Mixed prototypes can't take char */
typedef int pshort; /* Mixed prototypes can't take short int */
typedef double pfloat; /* Mixed prototypes can't take float */
#endif
typedef const struct charset_info_st CHARSET_INFO;
extern "C" MYSQL_PLUGIN_IMPORT CHARSET_INFO* default_charset_info;
#define HAVE_PSI_INTERFACE
#include "m_ctype.h"
#undef FALSE
#undef TRUE
#ifdef DBUG_ASSERT_TEMPORARILY_DEFINED
#undef DBUG_ASSERT
#endif
#if defined(PREFER_MY_CONFIG_H)
#include "mcsconfig_conflicting_defs_restore.h"
#endif
namespace datatypes
{
class MariaDBHasher
{
ulong mPart1;
ulong mPart2;
public:
MariaDBHasher() : mPart1(1), mPart2(4)
{
}
MariaDBHasher& add(CHARSET_INFO* cs, const char* str, size_t length)
{
cs->hash_sort((const uchar*)str, length, &mPart1, &mPart2);
return *this;
}
MariaDBHasher& add(CHARSET_INFO* cs, const utils::ConstString& str)
{
return add(cs, str.str(), str.length());
}
uint32_t finalize() const
{
return (uint32_t)mPart1;
}
};
// A reference to MariaDB CHARSET_INFO.
class Charset
{
protected:
const struct charset_info_st* mCharset;
private:
static constexpr const uint flags_ = MY_STRXFRM_PAD_WITH_SPACE | MY_STRXFRM_PAD_TO_MAXLEN;
public:
Charset(CHARSET_INFO& cs) : mCharset(&cs)
{
}
Charset(CHARSET_INFO* cs = nullptr) : mCharset(cs ? cs : &my_charset_bin)
{
}
bool operator==(const Charset& rhs) const
{
return rhs.getCharset().cs_name.str == getCharset().cs_name.str;
}
bool operator!=(const Charset& rhs) const
{
return !(*this == rhs);
}
std::string convert(const std::string& from, const datatypes::Charset& fromCs) const
{
std::string result;
uint dummy_errors;
result.resize(from.size() * getCharset().mbmaxlen);
size_t resultingSize = my_convert(const_cast<char*>(result.c_str()), result.size(), &getCharset(),
from.c_str(), from.size(), &fromCs.getCharset(), &dummy_errors);
result.resize(resultingSize);
return result;
}
Charset(uint32_t charsetNumber);
void setCharset(uint32_t charsetNumber);
CHARSET_INFO& getCharset() const
{
return *mCharset;
}
uint32_t hash(const char* data, uint64_t len) const
{
return MariaDBHasher().add(mCharset, data, len).finalize();
}
bool eq(const std::string& str1, const std::string& str2) const
{
return mCharset->strnncollsp(str1.data(), str1.length(), str2.data(), str2.length()) == 0;
}
int strnncollsp(const std::string& str1, const std::string& str2) const
{
return mCharset->strnncollsp(str1.data(), str1.length(), str2.data(), str2.length());
}
int strnncollsp(const utils::ConstString& str1, const utils::ConstString& str2) const
{
// nullptr handling below should return values as if nulls are substituted with empty string.
// please note that ConstString has an assertion so that nullptr data has zero length.
const char* s1 = str1.str();
const char* s2 = str2.str();
return mCharset->strnncollsp(s1 ? s1 : "", str1.length(), s2 ? s2 : "", str2.length());
}
int strnncollsp(const char* str1, size_t length1, const char* str2, size_t length2) const
{
return mCharset->strnncollsp(str1, length1, str2, length2);
}
int strnncollsp(const unsigned char* str1, size_t length1, const unsigned char* str2, size_t length2) const
{
return mCharset->strnncollsp((const char*)str1, length1, (const char*)str2, length2);
}
bool test_if_important_data(const char* str, const char* end) const
{
if (mCharset->state & MY_CS_NOPAD)
return str < end;
return str + mCharset->scan(str, end, MY_SEQ_SPACES) < end;
}
bool like(bool neg, const utils::ConstString& subject, const utils::ConstString& pattern) const
{
bool res = !mCharset->wildcmp(subject.str(), subject.end(), pattern.str(), pattern.end(), '\\', '_', '%');
return neg ? !res : res;
}
size_t strnxfrm(uchar* dst, size_t dstlen, uint nweights, const uchar* src, size_t srclen, uint flags)
{
assert(mCharset->coll);
return mCharset->coll->strnxfrm(mCharset, dst, dstlen, nweights, src, srclen, flags);
}
// The magic check that tells that bytes are mapped to weights as 1:1
bool strnxfrmIsValid() const
{
return (mCharset->state & MY_CS_NON1TO1) == 0;
}
template <typename T>
T strnxfrm(const char* src) const
{
T ret = 0;
size_t len __attribute__((unused)) =
mCharset->strnxfrm((char*)&ret, sizeof(T), sizeof(T), src, sizeof(T), flags_);
assert(len <= sizeof(T));
return ret;
}
template <typename T>
T strnxfrm(const utils::ConstString& src) const
{
T ret = 0;
size_t len __attribute__((unused)) =
mCharset->strnxfrm((char*)&ret, sizeof(T), sizeof(T), (char*)src.str(), src.length(), flags_);
assert(len <= sizeof(T));
return ret;
}
static uint getDefaultFlags()
{
return flags_;
}
};
class CollationAwareHasher : public Charset
{
public:
CollationAwareHasher(const Charset& cs) : Charset(cs)
{
}
inline uint32_t operator()(const std::string& s) const
{
return operator()(s.data(), s.length());
}
inline uint32_t operator()(const char* data, uint64_t len) const
{
return Charset::hash(data, len);
}
};
class CollationAwareComparator : public Charset
{
public:
CollationAwareComparator(const Charset& cs) : Charset(cs)
{
}
bool operator()(const std::string& str1, const std::string& str2) const
{
return Charset::eq(str1, str2);
}
};
} // end of namespace datatypes

View File

@ -17,6 +17,9 @@
#pragma once
#include <cstdint>
#include <new>
#include "branchpred.h"
namespace utils

View File

@ -1,121 +0,0 @@
/* Copyright (C) 2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include <string>
#include <string.h>
#include <execinfo.h>
#include "exceptclasses.h"
namespace utils
{
class ConstString
{
protected:
const char* mStr; // it can be NULL now.
size_t mLength;
public:
ConstString(const char* str, size_t length) : mStr(str), mLength(length)
{
if (!mStr)
mLength = 0;
}
explicit ConstString(const std::string& str) : mStr(str.data()), mLength(str.length())
{
}
template <typename T>
ConstString(const T* value, T nullValue, int colWidth)
{
if (*value == nullValue)
{
mStr = nullptr;
mLength = 0;
}
else
{
mStr = reinterpret_cast<const char*>(value);
mLength = colWidth;
}
}
const char* str() const
{
return mStr;
}
const char* end() const
{
// end() should be computed for non-nullptr mStrs, otherwise it is undefined behavior.
if (!mStr)
{
return nullptr;
}
return mStr + mLength;
}
size_t length() const
{
return mLength;
}
std::string toString() const
{
idbassert(mStr);
return std::string(mStr, mLength);
}
bool eq(char ch) const
{
return mLength == 1 && mStr[0] == ch;
}
bool eq(const ConstString& rhs) const
{
if (!mStr || !rhs.mStr)
{
return mStr == rhs.mStr;
}
return mLength == rhs.mLength && !memcmp(mStr, rhs.mStr, mLength);
}
ConstString& rtrimZero()
{
for (; mLength && mStr[mLength - 1] == '\0'; mLength--)
{
}
return *this;
}
ConstString& rtrimSpaces()
{
for (; mLength && mStr[mLength - 1] == ' '; --mLength)
{
}
return *this;
}
void bin2hex(char* o)
{
static const char hexdig[] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
const char* e = end();
for (const char* s = mStr; s < e; s++)
{
*o++ = hexdig[*s >> 4];
*o++ = hexdig[*s & 0xf];
}
}
bool isNull() const
{
return mStr == nullptr;
}
};
} // namespace utils

View File

@ -17,7 +17,7 @@
#pragma once
#include "conststring.h"
#include "basic/conststring.h"
namespace genericparser
{

View File

@ -18,7 +18,7 @@
#pragma once
#include "hasher.h"
#include "collation.h"
#include "mariadb_charset/collation.h"
namespace utils
{

View File

@ -25,7 +25,7 @@
#include <iostream>
#include <memory>
#include "exceptclasses.h"
#include "conststring.h"
#include "basic/conststring.h"
#include "mcs_datatype_basic.h"
namespace utils

View File

@ -1,605 +0,0 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <iostream>
#include <atomic>
#include <boost/filesystem.hpp>
#include "IDBPolicy.h"
#include "brmtypes.h"
#include "hasher.h"
#include "messagequeue.h"
#include "configcpp.h"
#include "statistics.h"
using namespace idbdatafile;
using namespace logging;
namespace statistics
{
StatisticsManager* StatisticsManager::instance()
{
static StatisticsManager* sm = new StatisticsManager();
return sm;
}
void StatisticsManager::collectSample(const rowgroup::RowGroup& rowGroup)
{
std::lock_guard<std::mutex> lock(mut);
const auto rowCount = rowGroup.getRowCount();
const auto columnCount = rowGroup.getColumnCount();
if (!rowCount || !columnCount)
return;
const auto& oids = rowGroup.getOIDs();
for (const auto oid : oids)
{
// Initialize a column data with 0.
if (!columnGroups.count(oid))
columnGroups[oid] = std::vector<uint64_t>(maxSampleSize, 0);
}
// Initialize a first row from the given `rowGroup`.
rowgroup::Row r;
rowGroup.initRow(&r);
rowGroup.getRow(0, &r);
// Generate a uniform distribution.
for (uint32_t i = 0; i < rowCount; ++i)
{
if (currentSampleSize < maxSampleSize)
{
for (uint32_t j = 0; j < columnCount; ++j)
{
if (!r.isNullValue(j))
columnGroups[oids[j]][currentSampleSize] = r.getIntField(j);
}
++currentSampleSize;
}
else
{
const uint32_t index = uniformDistribution(gen32);
if (index < maxSampleSize)
{
for (uint32_t j = 0; j < columnCount; ++j)
if (!r.isNullValue(j))
columnGroups[oids[j]][index] = r.getIntField(j);
}
}
r.nextRow();
}
}
void StatisticsManager::analyzeSample(bool traceOn)
{
if (traceOn)
std::cout << "Sample size: " << currentSampleSize << std::endl;
// PK_FK statistics.
for (const auto& [oid, sample] : columnGroups)
keyTypes[oid] = KeyType::PK;
for (const auto& [oid, sample] : columnGroups)
{
std::unordered_set<uint32_t> columnsCache;
std::unordered_map<uint64_t, uint32_t> columnMCV;
for (uint32_t i = 0; i < currentSampleSize; ++i)
{
const auto value = sample[i];
// PK_FK statistics.
if (columnsCache.count(value) && keyTypes[oid] == KeyType::PK)
keyTypes[oid] = KeyType::FK;
else
columnsCache.insert(value);
// MCV statistics.
if (columnMCV.count(value))
columnMCV[value]++;
else
columnMCV.insert({value, 1});
}
// MCV statistics.
std::vector<pair<uint64_t, uint32_t>> mcvList(columnMCV.begin(), columnMCV.end());
std::sort(mcvList.begin(), mcvList.end(),
[](const std::pair<uint64_t, uint32_t>& a, const std::pair<uint64_t, uint32_t>& b) {
return a.second > b.second;
});
// 200 buckets as Microsoft does.
const auto mcvSize = std::min(columnMCV.size(), static_cast<uint64_t>(200));
mcv[oid] = std::unordered_map<uint64_t, uint32_t>(mcvList.begin(), mcvList.begin() + mcvSize);
}
if (traceOn)
output();
// Clear sample.
columnGroups.clear();
currentSampleSize = 0;
}
void StatisticsManager::output()
{
std::cout << "Columns count: " << keyTypes.size() << std::endl;
std::cout << "Statistics type [PK_FK]: " << std::endl;
for (const auto& p : keyTypes)
{
std::cout << "[OID: " << p.first << ": ";
if (static_cast<uint32_t>(p.second) == 0)
std::cout << "PK] ";
else
std::cout << "FK] ";
}
std::cout << "\nStatistics type [MCV]: " << std::endl;
for (const auto& [oid, columnMCV] : mcv)
{
std::cout << "[OID: " << oid << std::endl;
for (const auto& [value, count] : columnMCV)
std::cout << value << ": " << count << ", ";
cout << "]" << endl;
}
}
// Someday it will be a virtual method, based on statistics type we processing.
std::unique_ptr<char[]> StatisticsManager::convertStatsToDataStream(uint64_t& dataStreamSize)
{
// Number of pairs.
uint64_t count = keyTypes.size();
// count, [[uid, keyType], ... ]
dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType));
// Count the size of the MCV.
for (const auto& [oid, mcvColumn] : mcv)
{
// [oid, list size, list [value, count]]
dataStreamSize +=
(sizeof(uint32_t) + sizeof(uint32_t) + ((sizeof(uint64_t) + sizeof(uint32_t)) * mcvColumn.size()));
}
// Allocate memory for data stream.
std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
auto* dataStream = dataStreamSmartPtr.get();
// Initialize the data stream.
uint64_t offset = 0;
std::memcpy(dataStream, reinterpret_cast<char*>(&count), sizeof(uint64_t));
offset += sizeof(uint64_t);
// For each pair [oid, key type].
for (const auto& p : keyTypes)
{
uint32_t oid = p.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
offset += sizeof(uint32_t);
KeyType keyType = p.second;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&keyType), sizeof(KeyType));
offset += sizeof(KeyType);
}
// For each [oid, list size, list [value, count]].
for (const auto& p : mcv)
{
// [oid]
uint32_t oid = p.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
offset += sizeof(uint32_t);
// [list size]
const auto& mcvColumn = p.second;
uint32_t size = mcvColumn.size();
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&size), sizeof(uint32_t));
offset += sizeof(uint32_t);
// [list [value, count]]
for (const auto& mcvPair : mcvColumn)
{
uint64_t value = mcvPair.first;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&value), sizeof(uint64_t));
offset += sizeof(uint64_t);
uint32_t count = mcvPair.second;
std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&count), sizeof(uint32_t));
offset += sizeof(uint32_t);
}
}
return dataStreamSmartPtr;
}
void StatisticsManager::convertStatsFromDataStream(std::unique_ptr<char[]> dataStreamSmartPtr)
{
auto* dataStream = dataStreamSmartPtr.get();
uint64_t count = 0;
std::memcpy(reinterpret_cast<char*>(&count), dataStream, sizeof(uint64_t));
uint64_t offset = sizeof(uint64_t);
// For each pair.
for (uint64_t i = 0; i < count; ++i)
{
uint32_t oid;
KeyType keyType;
std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
std::memcpy(reinterpret_cast<char*>(&keyType), &dataStream[offset], sizeof(KeyType));
offset += sizeof(KeyType);
keyTypes[oid] = keyType;
}
for (uint64_t i = 0; i < count; ++i)
{
uint32_t oid;
std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t mcvSize;
std::memcpy(reinterpret_cast<char*>(&mcvSize), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
std::unordered_map<uint64_t, uint32_t> columnMCV;
for (uint32_t j = 0; j < mcvSize; ++j)
{
uint64_t value;
std::memcpy(reinterpret_cast<char*>(&value), &dataStream[offset], sizeof(uint64_t));
offset += sizeof(uint64_t);
uint32_t count;
std::memcpy(reinterpret_cast<char*>(&count), &dataStream[offset], sizeof(uint32_t));
offset += sizeof(uint32_t);
columnMCV[value] = count;
}
mcv[oid] = std::move(columnMCV);
}
}
void StatisticsManager::saveToFile()
{
std::lock_guard<std::mutex> lock(mut);
const char* fileName = statsFile.c_str();
std::unique_ptr<IDBDataFile> out(
IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "wb", 1));
if (!out)
{
BRM::log_errno("StatisticsManager::saveToFile(): open");
throw ios_base::failure("StatisticsManager::saveToFile(): open failed.");
}
// Compute hash.
uint64_t dataStreamSize = 0;
std::unique_ptr<char[]> dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize);
utils::Hasher128 hasher;
// Prepare a statistics file header.
const uint32_t headerSize = sizeof(StatisticsFileHeader);
StatisticsFileHeader fileHeader;
std::memset(&fileHeader, 0, headerSize);
fileHeader.version = version;
fileHeader.epoch = epoch;
fileHeader.dataSize = dataStreamSize;
// Compute hash from the data.
fileHeader.dataHash = hasher(dataStreamSmartPtr.get(), dataStreamSize);
// Write statistics file header.
uint64_t size = out->write(reinterpret_cast<char*>(&fileHeader), headerSize);
if (size != headerSize)
{
auto rc = IDBPolicy::remove(fileName);
if (rc == -1)
std::cerr << "Cannot remove file " << fileName << std::endl;
throw ios_base::failure("StatisticsManager::saveToFile(): write failed. ");
}
// Write data.
size = out->write(dataStreamSmartPtr.get(), dataStreamSize);
if (size != dataStreamSize)
{
auto rc = IDBPolicy::remove(fileName);
if (rc == -1)
std::cerr << "Cannot remove file " << fileName << std::endl;
throw ios_base::failure("StatisticsManager::saveToFile(): write failed. ");
}
}
void StatisticsManager::loadFromFile()
{
std::lock_guard<std::mutex> lock(mut);
// Check that stats file does exist.
if (!boost::filesystem::exists(statsFile))
return;
const char* fileName = statsFile.c_str();
std::unique_ptr<IDBDataFile> in(
IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "rb", 1));
if (!in)
{
BRM::log_errno("StatisticsManager::loadFromFile(): open");
throw ios_base::failure("StatisticsManager::loadFromFile(): open failed. Check the error log.");
}
// Read the file header.
StatisticsFileHeader fileHeader;
const uint32_t headerSize = sizeof(StatisticsFileHeader);
int64_t size = in->read(reinterpret_cast<char*>(&fileHeader), headerSize);
if (size != headerSize)
throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. ");
// Initialize fields from the file header.
version = fileHeader.version;
epoch = fileHeader.epoch;
const auto dataHash = fileHeader.dataHash;
const auto dataStreamSize = fileHeader.dataSize;
// Allocate the memory for the file data.
std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
auto* dataStream = dataStreamSmartPtr.get();
// Read the data.
uint64_t dataOffset = 0;
auto sizeToRead = dataStreamSize;
size = in->read(dataStream, sizeToRead);
sizeToRead -= size;
dataOffset += size;
while (sizeToRead > 0)
{
size = in->read(dataStream + dataOffset, sizeToRead);
if (size < 0)
throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. ");
sizeToRead -= size;
dataOffset += size;
}
utils::Hasher128 hasher;
auto computedDataHash = hasher(dataStream, dataStreamSize);
if (dataHash != computedDataHash)
throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. ");
convertStatsFromDataStream(std::move(dataStreamSmartPtr));
}
uint64_t StatisticsManager::computeHashFromStats()
{
utils::Hasher128 hasher;
uint64_t dataStreamSize = 0;
std::unique_ptr<char[]> dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize);
return hasher(dataStreamSmartPtr.get(), dataStreamSize);
}
void StatisticsManager::serialize(messageqcpp::ByteStream& bs)
{
uint64_t count = keyTypes.size();
bs << version;
bs << epoch;
bs << count;
// PK_FK
for (const auto& keyType : keyTypes)
{
bs << keyType.first;
bs << (uint32_t)keyType.second;
}
// MCV
for (const auto& p : mcv)
{
bs << p.first;
const auto& mcvColumn = p.second;
bs << static_cast<uint32_t>(mcvColumn.size());
for (const auto& mcvPair : mcvColumn)
{
bs << mcvPair.first;
bs << mcvPair.second;
}
}
}
void StatisticsManager::unserialize(messageqcpp::ByteStream& bs)
{
uint64_t count;
bs >> version;
bs >> epoch;
bs >> count;
// PK_FK
for (uint32_t i = 0; i < count; ++i)
{
uint32_t oid, keyType;
bs >> oid;
bs >> keyType;
keyTypes[oid] = static_cast<KeyType>(keyType);
}
// MCV
for (uint32_t i = 0; i < count; ++i)
{
uint32_t oid, mcvSize;
bs >> oid;
bs >> mcvSize;
std::unordered_map<uint64_t, uint32_t> mcvColumn;
for (uint32_t j = 0; j < mcvSize; ++j)
{
uint64_t value;
uint32_t count;
bs >> value;
bs >> count;
mcvColumn[value] = count;
}
mcv[oid] = std::move(mcvColumn);
}
}
bool StatisticsManager::hasKey(uint32_t oid)
{
return keyTypes.count(oid) > 0 ? true : false;
}
KeyType StatisticsManager::getKeyType(uint32_t oid)
{
return keyTypes[oid];
}
StatisticsDistributor* StatisticsDistributor::instance()
{
static StatisticsDistributor* sd = new StatisticsDistributor();
return sd;
}
void StatisticsDistributor::distributeStatistics()
{
countClients();
{
std::lock_guard<std::mutex> lock(mut);
// No clients.
if (clientsCount == 0)
return;
#ifdef DEBUG_STATISTICS
std::cout << "Distribute statistics from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
#endif
messageqcpp::ByteStream msg, statsHash, statsBs;
// Current hash.
statsHash << statistics::StatisticsManager::instance()->computeHashFromStats();
// Statistics.
statistics::StatisticsManager::instance()->serialize(statsBs);
for (uint32_t i = 0; i < clientsCount; ++i)
{
try
{
messageqcpp::ByteStream::quadbyte qb = ANALYZE_TABLE_REC_STATS;
msg << qb;
auto exeMgrID = "ExeMgr" + std::to_string(i + 2);
// Create a client.
std::unique_ptr<messageqcpp::MessageQueueClient> exemgrClient(
new messageqcpp::MessageQueueClient(exeMgrID));
#ifdef DEBUG_STATISTICS
std::cout << "Try to connect to " << exeMgrID << std::endl;
#endif
// Try to connect to the client.
if (!exemgrClient->connect())
{
msg.restart();
#ifdef DEBUG_STATISTICS
std::cout << "Unable to connect to " << exeMgrID << std::endl;
#endif
continue;
}
#ifdef DEBUG_STATISTICS
std::cout << "Write flag ANALYZE_TABLE_REC_STATS from ExeMgr(Server) to ExeMgr(Clients) "
<< std::endl;
#endif
// Write a flag to client ExeMgr.
exemgrClient->write(msg);
#ifdef DEBUG_STATISTICS
std::cout << "Write statistics hash from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
#endif
// Write a hash of the stats.
exemgrClient->write(statsHash);
// Read the state from Client.
msg.restart();
msg = exemgrClient->read();
msg >> qb;
// Do not need a stats.
if (qb == ANALYZE_TABLE_SUCCESS)
{
msg.restart();
continue;
}
#ifdef DEBUG_STATISTICS
std::cout << "Write statistics bytestream from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
#endif
// Write a statistics to client ExeMgr.
exemgrClient->write(statsBs);
// Read the flag back from the client ExeMgr.
msg.restart();
msg = exemgrClient->read();
if (msg.length() == 0)
throw runtime_error("Lost conection to ExeMgr.");
#ifdef DEBUG_STATISTICS
std::cout << "Read flag on ExeMgr(Server) from ExeMgr(Client) " << std::endl;
#endif
msg.restart();
}
catch (std::exception& e)
{
msg.restart();
std::cerr << "distributeStatistics() failed with error: " << e.what() << std::endl;
}
catch (...)
{
msg.restart();
std::cerr << "distributeStatistics() failed with unknown error." << std::endl;
}
}
}
}
void StatisticsDistributor::countClients()
{
#ifdef DEBUG_STATISTICS
std::cout << "count clients to distribute statistics " << std::endl;
#endif
auto* config = config::Config::makeConfig();
// Starting from the ExeMgr2, since the Server starts on the ExeMgr1.
std::atomic<uint32_t> exeMgrNumber(2);
try
{
while (true)
{
auto exeMgrID = "ExeMgr" + std::to_string(exeMgrNumber);
auto exeMgrIP = config->getConfig(exeMgrID, "IPAddr");
if (exeMgrIP == "")
break;
#ifdef DEBUG_STATISTICS
std::cout << "Client: " << exeMgrID << std::endl;
#endif
++exeMgrNumber;
}
}
catch (std::exception& e)
{
std::cerr << "countClients() failed with error: " << e.what() << std::endl;
}
catch (...)
{
std::cerr << "countClients() failed with unknown error: ";
}
clientsCount = exeMgrNumber - 2;
#ifdef DEBUG_STATISTICS
std::cout << "Number of clients: " << clientsCount << std::endl;
#endif
}
} // namespace statistics

View File

@ -1,161 +0,0 @@
/* Copyright (C) 2021 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#pragma once
#include "rowgroup.h"
#include "logger.h"
#include "hasher.h"
#include "IDBPolicy.h"
#include <map>
#include <unordered_set>
#include <mutex>
#include <random>
// Represents a commands for `ExeMgr`.
#define ANALYZE_TABLE_EXECUTE 6
#define ANALYZE_TABLE_REC_STATS 7
#define ANALYZE_TABLE_NEED_STATS 8
#define ANALYZE_TABLE_SUCCESS 9
// #define DEBUG_STATISTICS
using namespace idbdatafile;
namespace statistics
{
// Represents a column key type:
// PK - primary key.
// FK - foreign key.
enum class KeyType : uint32_t
{
PK,
FK
};
// Rerpresents types of statistics CS supports.
enum class StatisticsType : uint32_t
{
// A special statistics type, specifies whether a column a primary key or foreign key.
PK_FK,
// Most common values.
MCV
};
// Represetns a header for the statistics file.
struct StatisticsFileHeader
{
uint64_t version;
uint64_t epoch;
uint64_t dataHash;
uint64_t dataSize;
uint8_t offset[1024];
};
using ColumnsCache = std::unordered_map<uint32_t, std::unordered_set<uint64_t>>;
using ColumnGroup = std::unordered_map<uint32_t, std::vector<uint64_t>>;
using KeyTypes = std::unordered_map<uint32_t, KeyType>;
using MCVList = std::unordered_map<uint32_t, std::unordered_map<uint64_t, uint32_t>>;
// This class is responsible for processing and storing statistics.
// On each `analyze table` iteration it increases an epoch and stores
// the updated statistics into the special file.
class StatisticsManager
{
public:
// Returns the instance of this class, static initialization happens only once.
static StatisticsManager* instance();
// Collect samples from the given `rowGroup`.
void collectSample(const rowgroup::RowGroup& rowGroup);
// Analyzes collected samples.
void analyzeSample(bool traceOn);
// Ouputs stats to out stream.
void output();
// Saves stats to the file.
void saveToFile();
// Loads stats from the file.
void loadFromFile();
void incEpoch()
{
++epoch;
}
// Serialize stats to the given `bs`.
void serialize(messageqcpp::ByteStream& bs);
// Unserialize stats from the given `bs`.
void unserialize(messageqcpp::ByteStream& bs);
// Computes hash from the current statistics data.
uint64_t computeHashFromStats();
// Checks whether statistics is available for the given `oid`.
bool hasKey(uint32_t oid);
// Returns a KeyType for the given `oid`.
KeyType getKeyType(uint32_t oid);
private:
StatisticsManager() : currentSampleSize(0), epoch(0), version(1)
{
// Initialize plugins.
IDBPolicy::configIDBPolicy();
// Generate distibution once in range [0, UINT_MAX].
gen32 = std::mt19937(randomDevice());
uniformDistribution = std::uniform_int_distribution<uint32_t>(0, UINT_MAX);
}
std::unique_ptr<char[]> convertStatsToDataStream(uint64_t& dataStreamSize);
void convertStatsFromDataStream(std::unique_ptr<char[]> dataStreamSmartPtr);
std::random_device randomDevice;
std::mt19937 gen32;
std::uniform_int_distribution<uint32_t> uniformDistribution;
// Internal data represents a sample [OID, vector of values].
ColumnGroup columnGroups;
// Internal data for the PK/FK statistics [OID, bool value].
KeyTypes keyTypes;
// Internal data for MCV list [OID, list[value, count]]
MCVList mcv;
// TODO: Think about sample size.
const uint32_t maxSampleSize = 64000;
uint32_t currentSampleSize;
uint32_t epoch;
uint32_t version;
std::mutex mut;
std::string statsFile = "/var/lib/columnstore/local/statistics";
};
// This class is responsible for distributing the statistics across all `ExeMgr` in a cluster.
class StatisticsDistributor
{
public:
// Returns the instance of this class, static initialization happens only once.
static StatisticsDistributor* instance();
// Distribute stats across all `ExeMgr` in cluster by connecting to them using config file.
void distributeStatistics();
private:
StatisticsDistributor() : clientsCount(0)
{
}
// Count the number of clients by reading config file and evaluating `ExeMgr` fields.
void countClients();
uint32_t clientsCount;
std::mutex mut;
};
} // namespace statistics

View File

@ -18,7 +18,7 @@
/* handling of the conversion of string prefixes to int64_t for quick range checking */
#include "collation.h"
#include "mariadb_charset/collation.h"
#include "joblisttypes.h"
#include "string_prefixes.h"

View File

@ -1,157 +0,0 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/** @file */
/*
* classes isyncstream and osyncstream provide a C++ iostream interface
* for C stdio FILE* streams. The current implementation does not provide
* the necessary methods to support seeking. The I/O buffering of the
* input FILE* is used. The C++ iostream library calls syncbuf::sync()
* for every line, so output buffering is line-by-line.
* */
/*
#include "syncstream.h"
void copyStream(istream& iss, ostream& oss)
{
string line;
getline(iss, line);
while (iss.good())
{
oss << line << endl;
getline(iss, line);
}
}
main()
{
FILE* ifp;
FILE* ofp;
...
isyncstream iss(ifp);
osyncstream oss(ofp);
copyStream(iss, oss);
...
}
*/
#pragma once
#include <iostream>
#include <cstdio>
namespace syncstream
{
/** A streambuf implementation for C stdio FILE* streams.
*
* Adapted from http://www.drdobbs.com/184401305
*/
class syncbuf : public std::streambuf
{
public:
/** ctor */
syncbuf(FILE* f) : std::streambuf(), fptr(f)
{
}
protected:
/** Write character in the case of overflow */
virtual int overflow(int c = EOF)
{
return (c != EOF ? fputc(c, fptr) : EOF);
}
/** Get character in the case of overflow */
virtual int underflow()
{
int c = getc(fptr);
if (c != EOF)
ungetc(c, fptr);
return c;
}
/** Get character in the case of overflow and advance get pointer */
virtual int uflow()
{
return getc(fptr);
}
/** put character back in the case of backup underflow */
virtual int pbackfail(int c = EOF)
{
return (c != EOF ? ungetc(c, fptr) : EOF);
}
/** Synchronize stream buffer */
virtual int sync()
{
return fflush(fptr);
}
private:
FILE* fptr;
};
/** An istream adaptor for input FILE* streams */
class isyncstream : public std::istream
{
public:
/** ctor */
isyncstream() : istream(&buf), buf(0)
{
}
/** ctor */
isyncstream(FILE* fptr) : istream(&buf), buf(fptr)
{
}
/** const streambuf accessor */
const syncbuf* rdbuf() const
{
return &buf;
}
private:
syncbuf buf;
};
/** An ostream adaptor for output FILE* streams */
class osyncstream : public std::ostream
{
public:
/** ctor */
osyncstream() : ostream(&buf), buf(0)
{
}
/** ctor */
osyncstream(FILE* fptr) : ostream(&buf), buf(fptr)
{
}
/** const streambuf accessor */
const syncbuf* rdbuf() const
{
return &buf;
}
private:
syncbuf buf;
};
} // namespace syncstream

View File

@ -1,38 +0,0 @@
/* Copyright (C) 2020 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
#include "utils_utf8.h"
#include "mariadb_my_sys.h"
namespace datatypes
{
static inline CHARSET_INFO& get_charset_or_bin(int32_t charsetNumber)
{
CHARSET_INFO* cs = get_charset(charsetNumber, MYF(MY_WME));
return cs ? *cs : my_charset_bin;
}
Charset::Charset(uint32_t charsetNumber) : mCharset(&get_charset_or_bin(charsetNumber))
{
}
void Charset::setCharset(uint32_t charsetNumber)
{
mCharset = &get_charset_or_bin(charsetNumber);
}
} // namespace datatypes

View File

@ -20,16 +20,10 @@
#pragma once
#include <cstdint>
#include <string>
#if defined(__FreeBSD__)
//#include <cstdlib>
#else
#include <alloca.h>
#endif
#include <cstdlib>
#include <clocale>
#include "liboamcpp.h"
// Change the name from utf8. Even change the file name to something resembling char helper
namespace utf8