You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
MCOL-5464: Fixes of bugs from ASAN warnings, part one (#2792)
* Fixes of bugs from ASAN warnings, part one * MQC as static library, with nifty counter for global map and mutex * Switch clang to 16 * link messageqcpp to execplan
This commit is contained in:
@ -21,8 +21,16 @@
|
||||
|
||||
namespace utils
|
||||
{
|
||||
const uint8_t MAXLEGACYWIDTH = 8ULL;
|
||||
const uint8_t MAXCOLUMNWIDTH = 16ULL;
|
||||
constexpr uint8_t MAXLEGACYWIDTH = 8ULL;
|
||||
constexpr uint8_t MAXCOLUMNWIDTH = 16ULL;
|
||||
|
||||
struct AlignedDeleter
|
||||
{
|
||||
void operator()(uint8_t* ptr)
|
||||
{
|
||||
operator delete[](ptr, std::align_val_t(utils::MAXCOLUMNWIDTH));
|
||||
};
|
||||
};
|
||||
|
||||
inline bool isWide(uint8_t width)
|
||||
{
|
||||
|
@ -69,15 +69,17 @@ const fs::path defaultConfigFilePath(configDefaultFileName);
|
||||
|
||||
namespace config
|
||||
{
|
||||
Config* globConfigInstancePtr = nullptr;
|
||||
Config::configMap_t Config::fInstanceMap;
|
||||
boost::mutex Config::fInstanceMapMutex;
|
||||
Config::configMap_t Config::fInstanceMap;
|
||||
// duplicate to that in the Config class
|
||||
boost::mutex Config::fXmlLock;
|
||||
// duplicate to that in the Config class
|
||||
boost::mutex Config::fWriteXmlLock;
|
||||
std::atomic_bool globHasConfig;
|
||||
|
||||
ConfigUniqPtr globConfigInstancePtr;
|
||||
|
||||
|
||||
void Config::checkAndReloadConfig()
|
||||
{
|
||||
struct stat statbuf;
|
||||
@ -105,20 +107,20 @@ Config* Config::makeConfig(const string& cf)
|
||||
if (globConfigInstancePtr)
|
||||
{
|
||||
globConfigInstancePtr->checkAndReloadConfig();
|
||||
return globConfigInstancePtr;
|
||||
return globConfigInstancePtr.get();
|
||||
}
|
||||
|
||||
// Make this configurable at least at compile-time.
|
||||
std::string configFilePath =
|
||||
std::string(MCSSYSCONFDIR) + std::string("/columnstore/") + configDefaultFileName;
|
||||
globConfigInstancePtr = new Config(configFilePath);
|
||||
globConfigInstancePtr.reset(new Config(configFilePath));
|
||||
globHasConfig.store(true, std::memory_order_relaxed);
|
||||
return globConfigInstancePtr;
|
||||
return globConfigInstancePtr.get();
|
||||
}
|
||||
|
||||
boost::mutex::scoped_lock lk(fInstanceMapMutex);
|
||||
globConfigInstancePtr->checkAndReloadConfig();
|
||||
return globConfigInstancePtr;
|
||||
return globConfigInstancePtr.get();
|
||||
}
|
||||
|
||||
boost::mutex::scoped_lock lk(fInstanceMapMutex);
|
||||
@ -526,21 +528,6 @@ void Config::writeConfigFile(messageqcpp::ByteStream msg) const
|
||||
/* static */
|
||||
void Config::deleteInstanceMap()
|
||||
{
|
||||
boost::mutex::scoped_lock lk(fInstanceMapMutex);
|
||||
|
||||
for (Config::configMap_t::iterator iter = fInstanceMap.begin(); iter != fInstanceMap.end(); ++iter)
|
||||
{
|
||||
Config* instance = iter->second;
|
||||
delete instance;
|
||||
}
|
||||
|
||||
fInstanceMap.clear();
|
||||
|
||||
if (globConfigInstancePtr)
|
||||
{
|
||||
delete globConfigInstancePtr;
|
||||
globConfigInstancePtr = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
/* static */
|
||||
@ -643,4 +630,18 @@ std::string Config::getTempFileDir(Config::TempDirPurpose what)
|
||||
return {};
|
||||
}
|
||||
|
||||
void Config::ConfigDeleter::operator()(Config* config)
|
||||
{
|
||||
boost::mutex::scoped_lock lk(fInstanceMapMutex);
|
||||
|
||||
for (Config::configMap_t::iterator iter = fInstanceMap.begin(); iter != fInstanceMap.end(); ++iter)
|
||||
{
|
||||
Config* instance = iter->second;
|
||||
delete instance;
|
||||
}
|
||||
|
||||
fInstanceMap.clear();
|
||||
delete config;
|
||||
}
|
||||
|
||||
} // namespace config
|
||||
|
@ -56,6 +56,11 @@ namespace config
|
||||
class Config
|
||||
{
|
||||
public:
|
||||
struct ConfigDeleter
|
||||
{
|
||||
void operator()(Config* config);
|
||||
};
|
||||
|
||||
/** @brief Config factory method
|
||||
*
|
||||
* Creates a singleton Config object
|
||||
@ -249,8 +254,14 @@ class Config
|
||||
*
|
||||
*/
|
||||
void checkAndReloadConfig();
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
||||
using ConfigUniqPtr = std::unique_ptr<Config, Config::ConfigDeleter>;
|
||||
|
||||
|
||||
} // namespace config
|
||||
|
||||
#undef EXPORT
|
||||
|
@ -1,102 +0,0 @@
|
||||
// Microsoft Visual C++ generated resource script.
|
||||
//
|
||||
#include "resource.h"
|
||||
|
||||
#define APSTUDIO_READONLY_SYMBOLS
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Generated from the TEXTINCLUDE 2 resource.
|
||||
//
|
||||
#include "afxres.h"
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
#undef APSTUDIO_READONLY_SYMBOLS
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// English (U.S.) resources
|
||||
|
||||
#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU)
|
||||
#ifdef _WIN32
|
||||
LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US
|
||||
#pragma code_page(1252)
|
||||
#endif //_WIN32
|
||||
|
||||
#ifdef APSTUDIO_INVOKED
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// TEXTINCLUDE
|
||||
//
|
||||
|
||||
1 TEXTINCLUDE
|
||||
BEGIN
|
||||
"resource.h\0"
|
||||
END
|
||||
|
||||
2 TEXTINCLUDE
|
||||
BEGIN
|
||||
"#include ""afxres.h""\r\n"
|
||||
"\0"
|
||||
END
|
||||
|
||||
3 TEXTINCLUDE
|
||||
BEGIN
|
||||
"\r\n"
|
||||
"\0"
|
||||
END
|
||||
|
||||
#endif // APSTUDIO_INVOKED
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Version
|
||||
//
|
||||
|
||||
VS_VERSION_INFO VERSIONINFO
|
||||
FILEVERSION 4,6,0,0
|
||||
PRODUCTVERSION 4,6,0,0
|
||||
FILEFLAGSMASK 0x17L
|
||||
#ifdef _DEBUG
|
||||
FILEFLAGS 0x1L
|
||||
#else
|
||||
FILEFLAGS 0x0L
|
||||
#endif
|
||||
FILEOS 0x4L
|
||||
FILETYPE 0x2L
|
||||
FILESUBTYPE 0x0L
|
||||
BEGIN
|
||||
BLOCK "StringFileInfo"
|
||||
BEGIN
|
||||
BLOCK "040904b0"
|
||||
BEGIN
|
||||
VALUE "CompanyName", "InfiniDB, Inc."
|
||||
VALUE "FileDescription", "InfiniDB Config API"
|
||||
VALUE "FileVersion", "4.6.0-0"
|
||||
VALUE "InternalName", "libconfigcpp"
|
||||
VALUE "LegalCopyright", "Copyright (C) 2014"
|
||||
VALUE "OriginalFilename", "libconfigcpp.dll"
|
||||
VALUE "ProductName", "InfiniDB"
|
||||
VALUE "ProductVersion", "4.6.0.0 Beta"
|
||||
END
|
||||
END
|
||||
BLOCK "VarFileInfo"
|
||||
BEGIN
|
||||
VALUE "Translation", 0x409, 1200
|
||||
END
|
||||
END
|
||||
|
||||
#endif // English (U.S.) resources
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
|
||||
#ifndef APSTUDIO_INVOKED
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Generated from the TEXTINCLUDE 3 resource.
|
||||
//
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
#endif // not APSTUDIO_INVOKED
|
||||
|
@ -150,6 +150,7 @@ void XMLParser::setConfig(xmlDocPtr doc, const string& section, const string& na
|
||||
{
|
||||
xmlAddChild(cur2, xmlNewText((const xmlChar*)"\t"));
|
||||
cur3 = cur2->xmlChildrenNode;
|
||||
xmlFree(cur3->content);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -48,26 +48,6 @@ using namespace logging;
|
||||
|
||||
namespace
|
||||
{
|
||||
const int64_t columnstore_precision[19] = {0,
|
||||
9,
|
||||
99,
|
||||
999,
|
||||
9999,
|
||||
99999,
|
||||
999999,
|
||||
9999999,
|
||||
99999999,
|
||||
999999999,
|
||||
9999999999LL,
|
||||
99999999999LL,
|
||||
999999999999LL,
|
||||
9999999999999LL,
|
||||
99999999999999LL,
|
||||
999999999999999LL,
|
||||
9999999999999999LL,
|
||||
99999999999999999LL,
|
||||
999999999999999999LL};
|
||||
|
||||
template <class T>
|
||||
bool from_string(T& t, const std::string& s, std::ios_base& (*f)(std::ios_base&))
|
||||
{
|
||||
@ -475,25 +455,16 @@ void number_int_value(const string& data, cscDataType typeCode,
|
||||
if ((typeCode == datatypes::SystemCatalog::DECIMAL) || (typeCode == datatypes::SystemCatalog::UDECIMAL) ||
|
||||
(ct.scale > 0))
|
||||
{
|
||||
T rangeUp, rangeLow;
|
||||
|
||||
if (ct.precision < 19)
|
||||
auto precision =
|
||||
ct.precision == rowgroup::MagicPrecisionForCountAgg ? datatypes::INT128MAXPRECISION : ct.precision;
|
||||
if (precision > datatypes::INT128MAXPRECISION || precision < 0)
|
||||
{
|
||||
rangeUp = (T)columnstore_precision[ct.precision];
|
||||
}
|
||||
else
|
||||
{
|
||||
auto precision =
|
||||
ct.precision == rowgroup::MagicPrecisionForCountAgg ? datatypes::INT128MAXPRECISION : ct.precision;
|
||||
if (precision > datatypes::INT128MAXPRECISION || precision < 0)
|
||||
{
|
||||
throw QueryDataExcept("Unsupported precision " + std::to_string(precision) + " converting DECIMAL ",
|
||||
dataTypeErr);
|
||||
}
|
||||
rangeUp = datatypes::ConversionRangeMaxValue[ct.precision - 19];
|
||||
throw QueryDataExcept("Unsupported precision " + std::to_string(precision) + " converting DECIMAL ",
|
||||
dataTypeErr);
|
||||
}
|
||||
|
||||
rangeLow = -rangeUp;
|
||||
T rangeUp = dataconvert::decimalRangeUp<T>(precision);
|
||||
T rangeLow = -rangeUp;
|
||||
|
||||
if (intVal > rangeUp)
|
||||
{
|
||||
@ -2849,7 +2820,8 @@ int64_t DataConvert::stringToTime(const string& data)
|
||||
{
|
||||
if (!hasDate)
|
||||
{
|
||||
day = strtol(data.substr(0, pos).c_str(), &end, 10);
|
||||
std::string tmpDataSegment = data.substr(0, pos);
|
||||
day = strtol(tmpDataSegment.c_str(), &end, 10);
|
||||
|
||||
if (*end != '\0')
|
||||
return -1;
|
||||
|
@ -104,6 +104,7 @@ const int64_t IDB_pow[19] = {1,
|
||||
100000000000000000LL,
|
||||
1000000000000000000LL};
|
||||
|
||||
|
||||
const int32_t SECS_PER_MIN = 60;
|
||||
const int32_t MINS_PER_HOUR = 60;
|
||||
const int32_t HOURS_PER_DAY = 24;
|
||||
@ -1553,6 +1554,20 @@ inline int128_t strtoll128(const char* data, bool& saturate, char** ep)
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
template <class T>
|
||||
T decimalRangeUp(int32_t precision)
|
||||
{
|
||||
if (precision < 19)
|
||||
{
|
||||
return (T)datatypes::columnstore_precision[precision];
|
||||
}
|
||||
else
|
||||
{
|
||||
return datatypes::ConversionRangeMaxValue[precision - 19];
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
inline int128_t string_to_ll<int128_t>(const std::string& data, bool& bSaturate)
|
||||
{
|
||||
|
@ -1072,7 +1072,6 @@ IDB_Decimal Func_cast_decimal::getDecimalVal(Row& row, FunctionParm& parm, bool&
|
||||
if (decimal.isTSInt128ByPrecision())
|
||||
{
|
||||
int128_t max_number_decimal = datatypes::ConversionRangeMaxValue[max_length - 19];
|
||||
|
||||
uint128_t uval = parm[0]->data()->getUintVal(row, isNull);
|
||||
|
||||
if (uval > (uint128_t)datatypes::Decimal::maxInt128)
|
||||
|
@ -591,6 +591,12 @@ uint64_t dateAdd(uint64_t time, const string& expr, IntervalColumn::interval_typ
|
||||
|
||||
if (-day < month_length[monthSave])
|
||||
{
|
||||
if (monthSave == 0)
|
||||
{
|
||||
monthSave = 12;
|
||||
tmpYear--;
|
||||
}
|
||||
|
||||
month--;
|
||||
monthSave--;
|
||||
|
||||
@ -613,6 +619,12 @@ uint64_t dateAdd(uint64_t time, const string& expr, IntervalColumn::interval_typ
|
||||
// BUG 5448 - changed from '==' to '<='
|
||||
if (day <= 0)
|
||||
{
|
||||
if (monthSave == 0)
|
||||
{
|
||||
monthSave = 12;
|
||||
tmpYear--;
|
||||
}
|
||||
|
||||
month--;
|
||||
monthSave--;
|
||||
|
||||
@ -635,6 +647,17 @@ uint64_t dateAdd(uint64_t time, const string& expr, IntervalColumn::interval_typ
|
||||
break;
|
||||
}
|
||||
|
||||
if (monthSave == 0)
|
||||
{
|
||||
monthSave = 12;
|
||||
tmpYear--;
|
||||
|
||||
if (isLeapYear(tmpYear))
|
||||
month_length[2] = 29;
|
||||
else
|
||||
month_length[2] = 28;
|
||||
}
|
||||
|
||||
month--;
|
||||
monthSave--;
|
||||
|
||||
|
@ -37,7 +37,7 @@ std::string Func_json_unquote::getStrVal(rowgroup::Row& row, FunctionParm& fp, b
|
||||
if (unlikely(jsEg.s.error) || jsEg.value_type != JSON_VALUE_STRING)
|
||||
return js.safeString();
|
||||
|
||||
char* buf = (char*)alloca(jsEg.value_len);
|
||||
char* buf = (char*)alloca(jsEg.value_len + 1);
|
||||
if ((strLen = json_unescape(cs, jsEg.value, jsEg.value + jsEg.value_len, &my_charset_utf8mb3_general_ci,
|
||||
(uchar*)buf, (uchar*)(buf + jsEg.value_len))) >= 0)
|
||||
{
|
||||
|
@ -47,15 +47,10 @@ bool IDBFactory::installDefaultPlugins()
|
||||
// protect these methods since we are changing our static data structure
|
||||
boost::mutex::scoped_lock lock(fac_guard);
|
||||
|
||||
s_plugins[IDBDataFile::BUFFERED] =
|
||||
FileFactoryEnt(IDBDataFile::BUFFERED, "buffered", new BufferedFileFactory(), new PosixFileSystem());
|
||||
s_plugins[IDBDataFile::UNBUFFERED] = FileFactoryEnt(IDBDataFile::UNBUFFERED, "unbuffered",
|
||||
new UnbufferedFileFactory(), new PosixFileSystem());
|
||||
|
||||
// TODO: use the installPlugin fcn below instead of declaring this statically, then remove the dependency
|
||||
// IDBDatafile -> cloudio
|
||||
// s_plugins[IDBDataFile::CLOUD] = FileFactoryEnt(IDBDataFile::CLOUD, "cloud", new SMFileFactory(), new
|
||||
// SMFileSystem());
|
||||
s_plugins.emplace(IDBDataFile::BUFFERED, FileFactoryEnt(IDBDataFile::BUFFERED, "buffered", new BufferedFileFactory(),
|
||||
new PosixFileSystem()));
|
||||
s_plugins.emplace(IDBDataFile::UNBUFFERED, FileFactoryEnt(IDBDataFile::UNBUFFERED, "unbuffered",
|
||||
new UnbufferedFileFactory(), new PosixFileSystem()));
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -86,7 +81,7 @@ bool IDBFactory::installPlugin(const std::string& plugin)
|
||||
}
|
||||
|
||||
FileFactoryEnt ent = (*(FileFactoryEntryFunc)functor)();
|
||||
s_plugins[ent.type] = ent;
|
||||
s_plugins.emplace(ent.type, std::move(ent));
|
||||
|
||||
std::ostringstream oss;
|
||||
oss << "IDBFactory::installPlugin: installed filesystem plugin " << plugin;
|
||||
@ -112,7 +107,7 @@ IDBDataFile* IDBFactory::open(IDBDataFile::Types type, const char* fname, const
|
||||
throw std::runtime_error(oss.str());
|
||||
}
|
||||
|
||||
return s_plugins[type].factory->open(fname, mode, opts, colWidth);
|
||||
return s_plugins.at(type).factory->open(fname, mode, opts, colWidth);
|
||||
}
|
||||
|
||||
IDBFileSystem& IDBFactory::getFs(IDBDataFile::Types type)
|
||||
@ -124,7 +119,14 @@ IDBFileSystem& IDBFactory::getFs(IDBDataFile::Types type)
|
||||
throw std::runtime_error(oss.str());
|
||||
}
|
||||
|
||||
return *(s_plugins[type].filesystem);
|
||||
return *(s_plugins.at(type).filesystem);
|
||||
}
|
||||
|
||||
FileFactoryEnt::~FileFactoryEnt()
|
||||
{
|
||||
delete filesystem;
|
||||
delete factory;
|
||||
}
|
||||
|
||||
|
||||
} // namespace idbdatafile
|
||||
|
@ -31,21 +31,31 @@ class IDBFileSystem;
|
||||
|
||||
struct FileFactoryEnt
|
||||
{
|
||||
FileFactoryEnt() : type(IDBDataFile::UNKNOWN), name("unknown"), factory(0), filesystem(0)
|
||||
{
|
||||
;
|
||||
}
|
||||
|
||||
FileFactoryEnt(IDBDataFile::Types t, const std::string& n, FileFactoryBase* f, IDBFileSystem* fs)
|
||||
: type(t), name(n), factory(f), filesystem(fs)
|
||||
{
|
||||
;
|
||||
}
|
||||
|
||||
IDBDataFile::Types type;
|
||||
std::string name;
|
||||
FileFactoryBase* factory;
|
||||
IDBFileSystem* filesystem;
|
||||
FileFactoryEnt(const FileFactoryEnt&) = delete;
|
||||
FileFactoryEnt& operator=(const FileFactoryEnt&) = delete;
|
||||
FileFactoryEnt& operator=(FileFactoryEnt&&) = delete;
|
||||
FileFactoryEnt(FileFactoryEnt&& temporary)
|
||||
: factory(temporary.factory)
|
||||
, filesystem(temporary.filesystem)
|
||||
{
|
||||
temporary.factory = nullptr;
|
||||
temporary.filesystem = nullptr;
|
||||
}
|
||||
|
||||
|
||||
|
||||
~FileFactoryEnt();
|
||||
|
||||
IDBDataFile::Types type = IDBDataFile::UNKNOWN;
|
||||
std::string name = "unknown";
|
||||
FileFactoryBase* factory = nullptr;
|
||||
IDBFileSystem* filesystem = nullptr;
|
||||
};
|
||||
|
||||
typedef FileFactoryEnt (*FileFactoryEntryFunc)();
|
||||
@ -101,8 +111,8 @@ class IDBFactory
|
||||
|
||||
static FactoryMap s_plugins;
|
||||
|
||||
IDBFactory();
|
||||
virtual ~IDBFactory();
|
||||
IDBFactory() = delete;
|
||||
~IDBFactory() = delete;
|
||||
};
|
||||
|
||||
inline const std::string& IDBFactory::name(IDBDataFile::Types type)
|
||||
@ -112,7 +122,7 @@ inline const std::string& IDBFactory::name(IDBDataFile::Types type)
|
||||
throw std::runtime_error("unknown plugin type in IDBFactory::name");
|
||||
}
|
||||
|
||||
return s_plugins[type].name;
|
||||
return s_plugins.at(type).name;
|
||||
}
|
||||
|
||||
} // namespace idbdatafile
|
||||
|
@ -14,7 +14,7 @@ set(messageqcpp_LIB_SRCS
|
||||
bytestreampool.cpp
|
||||
)
|
||||
|
||||
add_library(messageqcpp SHARED ${messageqcpp_LIB_SRCS})
|
||||
add_library(messageqcpp STATIC ${messageqcpp_LIB_SRCS})
|
||||
|
||||
add_dependencies(messageqcpp loggingcpp)
|
||||
|
||||
|
@ -15,23 +15,51 @@
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
||||
MA 02110-1301, USA. */
|
||||
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <map>
|
||||
#include <time.h>
|
||||
#include <mutex>
|
||||
#include "messagequeuepool.h"
|
||||
#include "messagequeue.h"
|
||||
|
||||
#include <new>
|
||||
#include <type_traits>
|
||||
|
||||
|
||||
namespace messageqcpp
|
||||
{
|
||||
|
||||
std::mutex& getQueueMutex()
|
||||
using ClientMapType = std::multimap<std::string, std::unique_ptr<ClientObject>>;
|
||||
|
||||
struct LockedClientMap
|
||||
{
|
||||
static std::mutex queueMutex;
|
||||
return queueMutex;
|
||||
LockedClientMap()
|
||||
{
|
||||
}
|
||||
~LockedClientMap()
|
||||
{
|
||||
}
|
||||
ClientMapType clientMap;
|
||||
std::mutex queueMutex;
|
||||
};
|
||||
|
||||
static int clientMapNiftyCounter;
|
||||
|
||||
static typename std::aligned_storage<sizeof(LockedClientMap), alignof(LockedClientMap)>::type clientMapBuf;
|
||||
|
||||
auto& lockedMap = reinterpret_cast<LockedClientMap&>(clientMapBuf);
|
||||
|
||||
|
||||
LockedClientMapInitilizer::LockedClientMapInitilizer ()
|
||||
{
|
||||
if (clientMapNiftyCounter++ == 0) new (&lockedMap) LockedClientMap (); // placement new
|
||||
}
|
||||
LockedClientMapInitilizer::~LockedClientMapInitilizer ()
|
||||
{
|
||||
if (--clientMapNiftyCounter == 0) (&lockedMap)->~LockedClientMap();
|
||||
}
|
||||
|
||||
// Make linker happy
|
||||
std::multimap<std::string, ClientObject*> MessageQueueClientPool::clientMap;
|
||||
|
||||
// 300 seconds idle until cleanup
|
||||
#define MAX_IDLE_TIME 300
|
||||
@ -43,7 +71,7 @@ static uint64_t TimeSpecToSeconds(struct timespec* ts)
|
||||
|
||||
MessageQueueClient* MessageQueueClientPool::getInstance(const std::string& dnOrIp, uint64_t port)
|
||||
{
|
||||
std::scoped_lock lock(getQueueMutex());
|
||||
auto lock = std::scoped_lock(lockedMap.queueMutex);
|
||||
|
||||
std::ostringstream oss;
|
||||
oss << dnOrIp << "_" << port;
|
||||
@ -63,16 +91,17 @@ MessageQueueClient* MessageQueueClientPool::getInstance(const std::string& dnOrI
|
||||
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||
uint64_t nowSeconds = TimeSpecToSeconds(&now);
|
||||
|
||||
newClientObject->client = new MessageQueueClient(dnOrIp, port);
|
||||
newClientObject->client.reset(new MessageQueueClient(dnOrIp, port));
|
||||
newClientObject->inUse = true;
|
||||
newClientObject->lastUsed = nowSeconds;
|
||||
clientMap.insert(std::pair<std::string, ClientObject*>(searchString, newClientObject));
|
||||
return newClientObject->client;
|
||||
lockedMap.clientMap.emplace(std::move(searchString), std::move(newClientObject));
|
||||
return newClientObject->client.get();
|
||||
}
|
||||
|
||||
MessageQueueClient* MessageQueueClientPool::getInstance(const std::string& module)
|
||||
{
|
||||
std::scoped_lock lock(getQueueMutex());
|
||||
auto lock = std::scoped_lock(lockedMap.queueMutex);
|
||||
|
||||
|
||||
MessageQueueClient* returnClient = MessageQueueClientPool::findInPool(module);
|
||||
|
||||
@ -83,16 +112,19 @@ MessageQueueClient* MessageQueueClientPool::getInstance(const std::string& modul
|
||||
}
|
||||
|
||||
// We didn't find one, create new one
|
||||
ClientObject* newClientObject = new ClientObject();
|
||||
auto newClientObject = std::make_unique<ClientObject>();
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||
uint64_t nowSeconds = TimeSpecToSeconds(&now);
|
||||
|
||||
newClientObject->client = new MessageQueueClient(module);
|
||||
|
||||
|
||||
newClientObject->client.reset(new MessageQueueClient(module));
|
||||
newClientObject->inUse = true;
|
||||
newClientObject->lastUsed = nowSeconds;
|
||||
clientMap.insert(std::pair<std::string, ClientObject*>(module, newClientObject));
|
||||
return newClientObject->client;
|
||||
auto result = newClientObject->client.get();
|
||||
lockedMap.clientMap.emplace(std::move(module), std::move(newClientObject));
|
||||
return result;
|
||||
}
|
||||
|
||||
MessageQueueClient* MessageQueueClientPool::findInPool(const std::string& search)
|
||||
@ -102,40 +134,37 @@ MessageQueueClient* MessageQueueClientPool::findInPool(const std::string& search
|
||||
uint64_t nowSeconds = TimeSpecToSeconds(&now);
|
||||
MessageQueueClient* returnClient = NULL;
|
||||
|
||||
std::multimap<std::string, ClientObject*>::iterator it = clientMap.begin();
|
||||
auto it = lockedMap.clientMap.begin();
|
||||
|
||||
|
||||
// Scan pool
|
||||
while (it != clientMap.end())
|
||||
while (it != lockedMap.clientMap.end())
|
||||
{
|
||||
ClientObject* clientObject = it->second;
|
||||
ClientObject* clientObject = it->second.get();
|
||||
uint64_t elapsedTime = nowSeconds - clientObject->lastUsed;
|
||||
|
||||
// If connection hasn't been used for MAX_IDLE_TIME we probably don't need it so drop it
|
||||
// Don't drop in use connections that have been in use a long time
|
||||
if ((elapsedTime >= MAX_IDLE_TIME) && (!clientObject->inUse))
|
||||
{
|
||||
delete clientObject->client;
|
||||
delete clientObject;
|
||||
// Do this so we don't invalidate current interator
|
||||
std::multimap<std::string, ClientObject*>::iterator toDelete = it;
|
||||
auto toDelete = it;
|
||||
it++;
|
||||
clientMap.erase(toDelete);
|
||||
lockedMap.clientMap.erase(toDelete);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!clientObject->inUse)
|
||||
{
|
||||
MessageQueueClient* client = clientObject->client;
|
||||
MessageQueueClient* client = clientObject->client.get();
|
||||
|
||||
// If the unused socket isn't connected or has data pending read, destroy it
|
||||
if (!client->isConnected() || client->hasData())
|
||||
{
|
||||
delete client;
|
||||
delete clientObject;
|
||||
// Do this so we don't invalidate current interator
|
||||
std::multimap<std::string, ClientObject*>::iterator toDelete = it;
|
||||
auto toDelete = it;
|
||||
it++;
|
||||
clientMap.erase(toDelete);
|
||||
lockedMap.clientMap.erase(toDelete);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@ -145,7 +174,7 @@ MessageQueueClient* MessageQueueClientPool::findInPool(const std::string& search
|
||||
{
|
||||
if ((returnClient == NULL) && (!clientObject->inUse))
|
||||
{
|
||||
returnClient = clientObject->client;
|
||||
returnClient = clientObject->client.get();
|
||||
clientObject->inUse = true;
|
||||
return returnClient;
|
||||
}
|
||||
@ -165,12 +194,12 @@ void MessageQueueClientPool::releaseInstance(MessageQueueClient* client)
|
||||
if (client == NULL)
|
||||
return;
|
||||
|
||||
std::scoped_lock lock(getQueueMutex());
|
||||
std::multimap<std::string, ClientObject*>::iterator it = clientMap.begin();
|
||||
auto lock = std::scoped_lock(lockedMap.queueMutex);
|
||||
auto it = lockedMap.clientMap.begin();
|
||||
|
||||
while (it != clientMap.end())
|
||||
while (it != lockedMap.clientMap.end())
|
||||
{
|
||||
if (it->second->client == client)
|
||||
if (it->second->client.get() == client)
|
||||
{
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_MONOTONIC, &now);
|
||||
@ -193,16 +222,15 @@ void MessageQueueClientPool::deleteInstance(MessageQueueClient* client)
|
||||
if (client == NULL)
|
||||
return;
|
||||
|
||||
std::scoped_lock lock(getQueueMutex());
|
||||
std::multimap<std::string, ClientObject*>::iterator it = clientMap.begin();
|
||||
|
||||
while (it != clientMap.end())
|
||||
auto lock = std::scoped_lock(lockedMap.queueMutex);
|
||||
auto it = lockedMap.clientMap.begin();
|
||||
|
||||
while (it != lockedMap.clientMap.end())
|
||||
{
|
||||
if (it->second->client == client)
|
||||
if (it->second->client.get() == client)
|
||||
{
|
||||
delete it->second->client;
|
||||
delete it->second;
|
||||
clientMap.erase(it);
|
||||
lockedMap.clientMap.erase(it);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -19,18 +19,24 @@
|
||||
|
||||
#include <map>
|
||||
#include "messagequeue.h"
|
||||
#include <memory>
|
||||
|
||||
#include <boost/stacktrace.hpp>
|
||||
|
||||
namespace messageqcpp
|
||||
{
|
||||
|
||||
|
||||
static struct LockedClientMapInitilizer {
|
||||
LockedClientMapInitilizer ();
|
||||
~LockedClientMapInitilizer ();
|
||||
} clientMapInitilizer; // static initializer for every translation unit
|
||||
|
||||
struct ClientObject
|
||||
{
|
||||
MessageQueueClient* client;
|
||||
uint64_t lastUsed;
|
||||
bool inUse;
|
||||
|
||||
ClientObject() : client(NULL), lastUsed(0), inUse(false)
|
||||
{
|
||||
}
|
||||
std::unique_ptr<MessageQueueClient> client;
|
||||
uint64_t lastUsed = 0;
|
||||
bool inUse = false;
|
||||
};
|
||||
|
||||
class MessageQueueClientPool
|
||||
@ -45,8 +51,6 @@ class MessageQueueClientPool
|
||||
private:
|
||||
MessageQueueClientPool(){};
|
||||
~MessageQueueClientPool(){};
|
||||
|
||||
static std::multimap<std::string, ClientObject*> clientMap;
|
||||
};
|
||||
|
||||
} // namespace messageqcpp
|
||||
|
@ -50,19 +50,7 @@ namespace rowgroup
|
||||
{
|
||||
using cscType = execplan::CalpontSystemCatalog::ColDataType;
|
||||
|
||||
StringStore::StringStore() : empty(true), fUseStoreStringMutex(false)
|
||||
{
|
||||
}
|
||||
|
||||
StringStore::StringStore(const StringStore&)
|
||||
{
|
||||
throw logic_error("Don't call StringStore copy ctor");
|
||||
}
|
||||
|
||||
StringStore& StringStore::operator=(const StringStore&)
|
||||
{
|
||||
throw logic_error("Don't call StringStore operator=");
|
||||
}
|
||||
|
||||
StringStore::~StringStore()
|
||||
{
|
||||
@ -86,10 +74,10 @@ StringStore::~StringStore()
|
||||
|
||||
uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
|
||||
{
|
||||
MemChunk* lastMC = NULL;
|
||||
MemChunk* lastMC = nullptr;
|
||||
uint64_t ret = 0;
|
||||
|
||||
empty = false; // At least a NULL is being stored.
|
||||
empty = false; // At least a nullptr is being stored.
|
||||
|
||||
// Sometimes the caller actually wants "" to be returned....... argggghhhh......
|
||||
// if (len == 0)
|
||||
@ -121,7 +109,7 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((lastMC == NULL) || (lastMC->capacity - lastMC->currentSize < (len + 4)))
|
||||
if ((lastMC == nullptr) || (lastMC->capacity - lastMC->currentSize < (len + 4)))
|
||||
{
|
||||
// mem usage debugging
|
||||
// if (lastMC)
|
||||
@ -215,20 +203,12 @@ void StringStore::clear()
|
||||
empty = true;
|
||||
}
|
||||
|
||||
UserDataStore::UserDataStore() : fUseUserDataMutex(false)
|
||||
{
|
||||
}
|
||||
|
||||
UserDataStore::~UserDataStore()
|
||||
{
|
||||
}
|
||||
|
||||
uint32_t UserDataStore::storeUserData(mcsv1sdk::mcsv1Context& context,
|
||||
boost::shared_ptr<mcsv1sdk::UserData> data, uint32_t len)
|
||||
{
|
||||
uint32_t ret = 0;
|
||||
|
||||
if (len == 0 || data == NULL)
|
||||
if (len == 0 || data == nullptr)
|
||||
{
|
||||
return numeric_limits<uint32_t>::max();
|
||||
}
|
||||
@ -305,7 +285,7 @@ void UserDataStore::deserialize(ByteStream& bs)
|
||||
}
|
||||
|
||||
mcsv1sdk::mcsv1_UDAF::ReturnCode rc;
|
||||
mcsv1sdk::UserData* userData = NULL;
|
||||
mcsv1sdk::UserData* userData = nullptr;
|
||||
rc = funcIter->second->createUserData(userData, vStoreData[i].length);
|
||||
|
||||
if (rc != mcsv1sdk::mcsv1_UDAF::SUCCESS)
|
||||
@ -323,10 +303,6 @@ void UserDataStore::deserialize(ByteStream& bs)
|
||||
return;
|
||||
}
|
||||
|
||||
RGData::RGData()
|
||||
{
|
||||
// cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl;
|
||||
}
|
||||
|
||||
RGData::RGData(const RowGroup& rg, uint32_t rowCount)
|
||||
{
|
||||
@ -336,6 +312,9 @@ RGData::RGData(const RowGroup& rg, uint32_t rowCount)
|
||||
if (rg.usesStringTable() && rowCount > 0)
|
||||
strings.reset(new StringStore());
|
||||
|
||||
userDataStore.reset();
|
||||
|
||||
|
||||
#ifdef VALGRIND
|
||||
/* In a PM-join, we can serialize entire tables; not every value has been
|
||||
* filled in yet. Need to look into that. Valgrind complains that
|
||||
@ -354,6 +333,8 @@ RGData::RGData(const RowGroup& rg)
|
||||
if (rg.usesStringTable())
|
||||
strings.reset(new StringStore());
|
||||
|
||||
userDataStore.reset();
|
||||
|
||||
#ifdef VALGRIND
|
||||
/* In a PM-join, we can serialize entire tables; not every value has been
|
||||
* filled in yet. Need to look into that. Valgrind complains that
|
||||
@ -366,6 +347,7 @@ RGData::RGData(const RowGroup& rg)
|
||||
void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
|
||||
{
|
||||
rowData.reset(new uint8_t[rg.getDataSize(rowCount)]);
|
||||
userDataStore.reset();
|
||||
|
||||
if (rg.usesStringTable())
|
||||
strings.reset(new StringStore());
|
||||
@ -386,16 +368,6 @@ void RGData::reinit(const RowGroup& rg)
|
||||
reinit(rg, 8192);
|
||||
}
|
||||
|
||||
RGData::RGData(const RGData& r) : rowData(r.rowData), strings(r.strings), userDataStore(r.userDataStore)
|
||||
{
|
||||
// cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl;
|
||||
}
|
||||
|
||||
RGData::~RGData()
|
||||
{
|
||||
// cout << "rgdata-- = " << __sync_sub_and_fetch(&rgDataCount, 1) << endl;
|
||||
}
|
||||
|
||||
void RGData::serialize(ByteStream& bs, uint32_t amount) const
|
||||
{
|
||||
// cout << "serializing!\n";
|
||||
@ -464,6 +436,7 @@ void RGData::clear()
|
||||
{
|
||||
rowData.reset();
|
||||
strings.reset();
|
||||
userDataStore.reset();
|
||||
}
|
||||
|
||||
// UserDataStore is only used for UDAF.
|
||||
@ -478,10 +451,6 @@ UserDataStore* RGData::getUserDataStore()
|
||||
return userDataStore.get();
|
||||
}
|
||||
|
||||
Row::Row() : data(NULL), strings(NULL), userDataStore(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
Row::Row(const Row& r)
|
||||
: columnCount(r.columnCount)
|
||||
, baseRid(r.baseRid)
|
||||
@ -501,11 +470,7 @@ Row::Row(const Row& r)
|
||||
, hasLongStringField(r.hasLongStringField)
|
||||
, sTableThreshold(r.sTableThreshold)
|
||||
, forceInline(r.forceInline)
|
||||
, userDataStore(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
Row::~Row()
|
||||
, userDataStore(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
@ -1023,7 +988,7 @@ bool Row::equals(const Row& r2, uint32_t lastCol) const
|
||||
|
||||
const CHARSET_INFO* Row::getCharset(uint32_t col) const
|
||||
{
|
||||
if (charsets[col] == NULL)
|
||||
if (charsets[col] == nullptr)
|
||||
{
|
||||
const_cast<CHARSET_INFO**>(charsets)[col] = &datatypes::Charset(charsetNumbers[col]).getCharset();
|
||||
}
|
||||
@ -1031,14 +996,6 @@ const CHARSET_INFO* Row::getCharset(uint32_t col) const
|
||||
}
|
||||
|
||||
RowGroup::RowGroup()
|
||||
: columnCount(0)
|
||||
, data(NULL)
|
||||
, rgData(NULL)
|
||||
, strings(NULL)
|
||||
, useStringTable(true)
|
||||
, hasCollation(false)
|
||||
, hasLongStringField(false)
|
||||
, sTableThreshold(20)
|
||||
{
|
||||
// 1024 is too generous to waste.
|
||||
oldOffsets.reserve(10);
|
||||
@ -1057,7 +1014,7 @@ RowGroup::RowGroup(uint32_t colCount, const vector<uint32_t>& positions, const v
|
||||
const vector<uint32_t>& cprecision, uint32_t stringTableThreshold, bool stringTable,
|
||||
const vector<bool>& forceInlineData)
|
||||
: columnCount(colCount)
|
||||
, data(NULL)
|
||||
, data(nullptr)
|
||||
, oldOffsets(positions)
|
||||
, oids(roids)
|
||||
, keys(tkeys)
|
||||
@ -1065,8 +1022,8 @@ RowGroup::RowGroup(uint32_t colCount, const vector<uint32_t>& positions, const v
|
||||
, charsetNumbers(csNumbers)
|
||||
, scale(cscale)
|
||||
, precision(cprecision)
|
||||
, rgData(NULL)
|
||||
, strings(NULL)
|
||||
, rgData(nullptr)
|
||||
, strings(nullptr)
|
||||
, sTableThreshold(stringTableThreshold)
|
||||
{
|
||||
uint32_t i;
|
||||
@ -1107,8 +1064,8 @@ RowGroup::RowGroup(uint32_t colCount, const vector<uint32_t>& positions, const v
|
||||
useStringTable = (stringTable && hasLongStringField);
|
||||
offsets = (useStringTable ? &stOffsets[0] : &oldOffsets[0]);
|
||||
|
||||
// Set all the charsets to NULL for jit initialization.
|
||||
charsets.insert(charsets.begin(), charsetNumbers.size(), NULL);
|
||||
// Set all the charsets to nullptr for jit initialization.
|
||||
charsets.insert(charsets.begin(), charsetNumbers.size(), nullptr);
|
||||
}
|
||||
|
||||
RowGroup::RowGroup(const RowGroup& r)
|
||||
@ -1176,14 +1133,6 @@ RowGroup& RowGroup::operator=(const RowGroup& r)
|
||||
}
|
||||
|
||||
RowGroup::RowGroup(ByteStream& bs)
|
||||
: columnCount(0)
|
||||
, data(nullptr)
|
||||
, rgData(nullptr)
|
||||
, strings(nullptr)
|
||||
, useStringTable(true)
|
||||
, hasCollation(false)
|
||||
, hasLongStringField(false)
|
||||
, sTableThreshold(20)
|
||||
{
|
||||
this->deserialize(bs);
|
||||
}
|
||||
@ -1254,22 +1203,13 @@ void RowGroup::deserialize(ByteStream& bs)
|
||||
else if (!useStringTable && !oldOffsets.empty())
|
||||
offsets = &oldOffsets[0];
|
||||
|
||||
// Set all the charsets to NULL for jit initialization.
|
||||
charsets.insert(charsets.begin(), charsetNumbers.size(), NULL);
|
||||
// Set all the charsets to nullptr for jit initialization.
|
||||
charsets.insert(charsets.begin(), charsetNumbers.size(), nullptr);
|
||||
}
|
||||
|
||||
void RowGroup::serializeRGData(ByteStream& bs) const
|
||||
{
|
||||
// cout << "****** serializing\n" << toString() << en
|
||||
// if (useStringTable || !hasLongStringField)
|
||||
rgData->serialize(bs, getDataSize());
|
||||
// else {
|
||||
// uint64_t size;
|
||||
// RGData *compressed = convertToStringTable(&size);
|
||||
// compressed->serialize(bs, size);
|
||||
// if (compressed != rgData)
|
||||
// delete compressed;
|
||||
// }
|
||||
}
|
||||
|
||||
uint32_t RowGroup::getDataSize() const
|
||||
@ -1354,7 +1294,7 @@ string RowGroup::toString(const std::vector<uint64_t>& used) const
|
||||
|
||||
// os << "strings = " << hex << (int64_t) strings << "\n";
|
||||
// os << "data = " << (int64_t) data << "\n" << dec;
|
||||
if (data != NULL)
|
||||
if (data != nullptr)
|
||||
{
|
||||
Row r;
|
||||
initRow(&r);
|
||||
@ -1580,7 +1520,7 @@ void RowGroup::addToSysDataList(execplan::CalpontSystemCatalog::NJLSysDataList&
|
||||
|
||||
const CHARSET_INFO* RowGroup::getCharset(uint32_t col)
|
||||
{
|
||||
if (charsets[col] == NULL)
|
||||
if (charsets[col] == nullptr)
|
||||
{
|
||||
charsets[col] = &datatypes::Charset(charsetNumbers[col]).getCharset();
|
||||
}
|
||||
|
@ -132,7 +132,11 @@ inline T derefFromTwoVectorPtrs(const std::vector<T>* outer, const std::vector<T
|
||||
class StringStore
|
||||
{
|
||||
public:
|
||||
StringStore();
|
||||
StringStore() = default;
|
||||
StringStore(const StringStore&) = delete;
|
||||
StringStore(StringStore&&) = delete;
|
||||
StringStore& operator=(const StringStore&) = delete;
|
||||
StringStore& operator=(StringStore&&) = delete;
|
||||
virtual ~StringStore();
|
||||
|
||||
inline utils::NullString getString(uint64_t offset) const;
|
||||
@ -177,17 +181,14 @@ class StringStore
|
||||
|
||||
private:
|
||||
std::string empty_str;
|
||||
|
||||
StringStore(const StringStore&);
|
||||
StringStore& operator=(const StringStore&);
|
||||
static constexpr const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2
|
||||
|
||||
std::vector<boost::shared_array<uint8_t>> mem;
|
||||
|
||||
// To store strings > 64KB (BLOB/TEXT)
|
||||
std::vector<boost::shared_array<uint8_t>> longStrings;
|
||||
bool empty;
|
||||
bool fUseStoreStringMutex; //@bug6065, make StringStore::storeString() thread safe
|
||||
bool empty = true;
|
||||
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
|
||||
boost::mutex fMutex;
|
||||
};
|
||||
|
||||
@ -214,8 +215,13 @@ class UserDataStore
|
||||
};
|
||||
|
||||
public:
|
||||
UserDataStore();
|
||||
virtual ~UserDataStore();
|
||||
UserDataStore() = default;
|
||||
virtual ~UserDataStore() = default;
|
||||
UserDataStore(const UserDataStore&) = delete;
|
||||
UserDataStore(UserDataStore&&) = delete;
|
||||
UserDataStore& operator=(const UserDataStore&) = delete;
|
||||
UserDataStore& operator=(UserDataStore&&) = delete;
|
||||
|
||||
|
||||
void serialize(messageqcpp::ByteStream&) const;
|
||||
void deserialize(messageqcpp::ByteStream&);
|
||||
@ -237,12 +243,10 @@ class UserDataStore
|
||||
boost::shared_ptr<mcsv1sdk::UserData> getUserData(uint32_t offset) const;
|
||||
|
||||
private:
|
||||
UserDataStore(const UserDataStore&);
|
||||
UserDataStore& operator=(const UserDataStore&);
|
||||
|
||||
std::vector<StoreData> vStoreData;
|
||||
|
||||
bool fUseUserDataMutex;
|
||||
bool fUseUserDataMutex = false;
|
||||
boost::mutex fMutex;
|
||||
};
|
||||
|
||||
@ -254,13 +258,16 @@ class Row;
|
||||
class RGData
|
||||
{
|
||||
public:
|
||||
RGData(); // useless unless followed by an = or a deserialize operation
|
||||
RGData() = default; // useless unless followed by an = or a deserialize operation
|
||||
RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData
|
||||
explicit RGData(const RowGroup& rg);
|
||||
RGData(const RGData&);
|
||||
virtual ~RGData();
|
||||
RGData& operator=(const RGData&) = default;
|
||||
RGData& operator=(RGData&&) = default;
|
||||
RGData(const RGData&) = default;
|
||||
RGData(RGData&&) = default;
|
||||
virtual ~RGData() = default;
|
||||
|
||||
|
||||
inline RGData& operator=(const RGData&);
|
||||
|
||||
// amount should be the # returned by RowGroup::getDataSize()
|
||||
void serialize(messageqcpp::ByteStream&, uint32_t amount) const;
|
||||
@ -274,7 +281,7 @@ class RGData
|
||||
void clear();
|
||||
void reinit(const RowGroup& rg);
|
||||
void reinit(const RowGroup& rg, uint32_t rowCount);
|
||||
inline void setStringStore(boost::shared_ptr<StringStore>& ss)
|
||||
inline void setStringStore(std::shared_ptr<StringStore>& ss)
|
||||
{
|
||||
strings = ss;
|
||||
}
|
||||
@ -307,18 +314,21 @@ class RGData
|
||||
return (userDataStore ? (userDataStore->useUserDataMutex()) : false);
|
||||
}
|
||||
|
||||
boost::shared_array<uint8_t> rowData;
|
||||
boost::shared_ptr<StringStore> strings;
|
||||
boost::shared_ptr<UserDataStore> userDataStore;
|
||||
bool hasRowData() const
|
||||
{
|
||||
return !!rowData;
|
||||
}
|
||||
|
||||
private:
|
||||
// boost::shared_array<uint8_t> rowData;
|
||||
// boost::shared_ptr<StringStore> strings;
|
||||
std::shared_ptr<uint8_t[]> rowData;
|
||||
std::shared_ptr<StringStore> strings;
|
||||
std::shared_ptr<UserDataStore> userDataStore;
|
||||
|
||||
// Need sig to support backward compat. RGData can deserialize both forms.
|
||||
static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data
|
||||
|
||||
friend class RowGroup;
|
||||
friend class RowGroupStorage;
|
||||
};
|
||||
|
||||
class Row
|
||||
@ -326,28 +336,26 @@ class Row
|
||||
public:
|
||||
struct Pointer
|
||||
{
|
||||
inline Pointer() : data(NULL), strings(NULL), userDataStore(NULL)
|
||||
{
|
||||
}
|
||||
inline Pointer() = default;
|
||||
|
||||
// Pointer(uint8_t*) implicitly makes old code compatible with the string table impl;
|
||||
inline Pointer(uint8_t* d) : data(d), strings(NULL), userDataStore(NULL)
|
||||
inline Pointer(uint8_t* d) : data(d)
|
||||
{
|
||||
}
|
||||
inline Pointer(uint8_t* d, StringStore* s) : data(d), strings(s), userDataStore(NULL)
|
||||
inline Pointer(uint8_t* d, StringStore* s) : data(d), strings(s)
|
||||
{
|
||||
}
|
||||
inline Pointer(uint8_t* d, StringStore* s, UserDataStore* u) : data(d), strings(s), userDataStore(u)
|
||||
{
|
||||
}
|
||||
uint8_t* data;
|
||||
StringStore* strings;
|
||||
UserDataStore* userDataStore;
|
||||
uint8_t* data = nullptr;
|
||||
StringStore* strings = nullptr;
|
||||
UserDataStore* userDataStore = nullptr;
|
||||
};
|
||||
|
||||
Row();
|
||||
Row() = default;
|
||||
Row(const Row&);
|
||||
~Row();
|
||||
~Row() = default;
|
||||
|
||||
Row& operator=(const Row&);
|
||||
bool operator==(const Row&) const;
|
||||
@ -500,7 +508,7 @@ class Row
|
||||
template <typename T>
|
||||
inline void setBinaryField_offset(const T* value, uint32_t width, uint32_t colIndex);
|
||||
// support VARBINARY
|
||||
// Add 2-byte length at the CHARSET_INFO*beginning of the field. NULL and zero length field are
|
||||
// Add 2-byte length at the CHARSET_INFO*beginning of the field. nullptr and zero length field are
|
||||
// treated the same, could use one of the length bit to distinguish these two cases.
|
||||
inline void setVarBinaryField(const utils::NullString& val, uint32_t colIndex);
|
||||
// No string construction is necessary for better performance.
|
||||
@ -598,31 +606,32 @@ class Row
|
||||
|
||||
const CHARSET_INFO* getCharset(uint32_t col) const;
|
||||
|
||||
private:
|
||||
uint32_t columnCount;
|
||||
uint64_t baseRid;
|
||||
private:
|
||||
inline bool inStringTable(uint32_t col) const;
|
||||
|
||||
private:
|
||||
uint32_t columnCount = 0;
|
||||
uint64_t baseRid = 0;
|
||||
|
||||
// Note, the mem behind these pointer fields is owned by RowGroup not Row
|
||||
uint32_t* oldOffsets;
|
||||
uint32_t* stOffsets;
|
||||
uint32_t* offsets;
|
||||
uint32_t* colWidths;
|
||||
execplan::CalpontSystemCatalog::ColDataType* types;
|
||||
uint32_t* charsetNumbers;
|
||||
CHARSET_INFO** charsets;
|
||||
uint8_t* data;
|
||||
uint32_t* scale;
|
||||
uint32_t* precision;
|
||||
uint32_t* oldOffsets = nullptr;
|
||||
uint32_t* stOffsets = nullptr;
|
||||
uint32_t* offsets = nullptr;
|
||||
uint32_t* colWidths = nullptr;
|
||||
execplan::CalpontSystemCatalog::ColDataType* types = nullptr;
|
||||
uint32_t* charsetNumbers = nullptr;
|
||||
CHARSET_INFO** charsets = nullptr;
|
||||
uint8_t* data = nullptr;
|
||||
uint32_t* scale = nullptr;
|
||||
uint32_t* precision = nullptr;
|
||||
|
||||
StringStore* strings;
|
||||
bool useStringTable;
|
||||
bool hasCollation;
|
||||
bool hasLongStringField;
|
||||
uint32_t sTableThreshold;
|
||||
StringStore* strings = nullptr;
|
||||
bool useStringTable = true;
|
||||
bool hasCollation = false;
|
||||
bool hasLongStringField = false;
|
||||
uint32_t sTableThreshold = 20;
|
||||
boost::shared_array<bool> forceInline;
|
||||
inline bool inStringTable(uint32_t col) const;
|
||||
|
||||
UserDataStore* userDataStore; // For UDAF
|
||||
UserDataStore* userDataStore = nullptr; // For UDAF
|
||||
|
||||
friend class RowGroup;
|
||||
};
|
||||
@ -1538,9 +1547,6 @@ class RowGroup : public messageqcpp::Serializeable
|
||||
inline bool usesStringTable() const;
|
||||
inline void setUseStringTable(bool);
|
||||
|
||||
// RGData *convertToInlineData(uint64_t *size = NULL) const; // caller manages the memory returned by
|
||||
// this void convertToInlineDataInPlace(); RGData *convertToStringTable(uint64_t *size = NULL)
|
||||
// const; void convertToStringTableInPlace();
|
||||
void serializeRGData(messageqcpp::ByteStream&) const;
|
||||
inline uint32_t getStringTableThreshold() const;
|
||||
|
||||
@ -1576,17 +1582,17 @@ class RowGroup : public messageqcpp::Serializeable
|
||||
const uint16_t& blockNum);
|
||||
inline void getLocation(uint32_t* partNum, uint16_t* segNum, uint8_t* extentNum, uint16_t* blockNum);
|
||||
|
||||
inline void setStringStore(boost::shared_ptr<StringStore>);
|
||||
inline void setStringStore(std::shared_ptr<StringStore>);
|
||||
|
||||
const CHARSET_INFO* getCharset(uint32_t col);
|
||||
|
||||
private:
|
||||
uint32_t columnCount;
|
||||
uint8_t* data;
|
||||
uint32_t columnCount = 0;
|
||||
uint8_t* data = nullptr;
|
||||
|
||||
std::vector<uint32_t> oldOffsets; // inline data offsets
|
||||
std::vector<uint32_t> stOffsets; // string table offsets
|
||||
uint32_t* offsets; // offsets either points to oldOffsets or stOffsets
|
||||
uint32_t* offsets = nullptr; // offsets either points to oldOffsets or stOffsets
|
||||
std::vector<uint32_t> colWidths;
|
||||
// oids: the real oid of the column, may have duplicates with alias.
|
||||
// This oid is necessary for front-end to decide the real column width.
|
||||
@ -1604,12 +1610,12 @@ class RowGroup : public messageqcpp::Serializeable
|
||||
std::vector<uint32_t> precision;
|
||||
|
||||
// string table impl
|
||||
RGData* rgData;
|
||||
StringStore* strings; // note, strings and data belong to rgData
|
||||
bool useStringTable;
|
||||
bool hasCollation;
|
||||
bool hasLongStringField;
|
||||
uint32_t sTableThreshold;
|
||||
RGData* rgData = nullptr;
|
||||
StringStore* strings = nullptr; // note, strings and data belong to rgData
|
||||
bool useStringTable = true;
|
||||
bool hasCollation = false;
|
||||
bool hasLongStringField = false;
|
||||
uint32_t sTableThreshold = 20;
|
||||
boost::shared_array<bool> forceInline;
|
||||
|
||||
static const uint32_t headerSize = 18;
|
||||
@ -1646,7 +1652,7 @@ every row, they're a measurable performance penalty */
|
||||
inline uint32_t RowGroup::getRowCount() const
|
||||
{
|
||||
// idbassert(data);
|
||||
// if (!data) throw std::logic_error("RowGroup::getRowCount(): data is NULL!");
|
||||
// if (!data) throw std::logic_error("RowGroup::getRowCount(): data is nullptr!");
|
||||
return *((uint32_t*)&data[rowCountOffset]);
|
||||
}
|
||||
|
||||
@ -1677,8 +1683,8 @@ inline void RowGroup::getRow(uint32_t rowNum, Row* r) const
|
||||
inline void RowGroup::setData(uint8_t* d)
|
||||
{
|
||||
data = d;
|
||||
strings = NULL;
|
||||
rgData = NULL;
|
||||
strings = nullptr;
|
||||
rgData = nullptr;
|
||||
setUseStringTable(false);
|
||||
}
|
||||
|
||||
@ -1712,7 +1718,7 @@ inline void RowGroup::setUseStringTable(bool b)
|
||||
offsets = &oldOffsets[0];
|
||||
|
||||
if (!useStringTable)
|
||||
strings = NULL;
|
||||
strings = nullptr;
|
||||
}
|
||||
|
||||
inline uint64_t RowGroup::getBaseRid() const
|
||||
@ -1772,7 +1778,7 @@ inline uint32_t RowGroup::getRowSizeWithStrings() const
|
||||
|
||||
inline uint64_t RowGroup::getSizeWithStrings(uint64_t n) const
|
||||
{
|
||||
if (strings == NULL)
|
||||
if (strings == nullptr)
|
||||
return getDataSize(n);
|
||||
else
|
||||
return getDataSize(n) + strings->getSize();
|
||||
@ -1896,7 +1902,7 @@ inline uint32_t RowGroup::getStringTableThreshold() const
|
||||
return sTableThreshold;
|
||||
}
|
||||
|
||||
inline void RowGroup::setStringStore(boost::shared_ptr<StringStore> ss)
|
||||
inline void RowGroup::setStringStore(std::shared_ptr<StringStore> ss)
|
||||
{
|
||||
if (useStringTable)
|
||||
{
|
||||
@ -2123,7 +2129,7 @@ inline const uint8_t* StringStore::getPointer(uint64_t off) const
|
||||
|
||||
inline bool StringStore::isNullValue(uint64_t off) const
|
||||
{
|
||||
if (off == std::numeric_limits<uint64_t>::max())
|
||||
if (off == std::numeric_limits<uint64_t>::max())
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
@ -2190,14 +2196,6 @@ inline uint64_t StringStore::getSize() const
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline RGData& RGData::operator=(const RGData& r)
|
||||
{
|
||||
rowData = r.rowData;
|
||||
strings = r.strings;
|
||||
userDataStore = r.userDataStore;
|
||||
return *this;
|
||||
}
|
||||
|
||||
inline void RGData::getRow(uint32_t num, Row* row)
|
||||
{
|
||||
uint32_t size = row->getSize();
|
||||
|
@ -1,102 +0,0 @@
|
||||
// Microsoft Visual C++ generated resource script.
|
||||
//
|
||||
#include "resource.h"
|
||||
|
||||
#define APSTUDIO_READONLY_SYMBOLS
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Generated from the TEXTINCLUDE 2 resource.
|
||||
//
|
||||
#include "afxres.h"
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
#undef APSTUDIO_READONLY_SYMBOLS
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// English (U.S.) resources
|
||||
|
||||
#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU)
|
||||
#ifdef _WIN32
|
||||
LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US
|
||||
#pragma code_page(1252)
|
||||
#endif //_WIN32
|
||||
|
||||
#ifdef APSTUDIO_INVOKED
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// TEXTINCLUDE
|
||||
//
|
||||
|
||||
1 TEXTINCLUDE
|
||||
BEGIN
|
||||
"resource.h\0"
|
||||
END
|
||||
|
||||
2 TEXTINCLUDE
|
||||
BEGIN
|
||||
"#include ""afxres.h""\r\n"
|
||||
"\0"
|
||||
END
|
||||
|
||||
3 TEXTINCLUDE
|
||||
BEGIN
|
||||
"\r\n"
|
||||
"\0"
|
||||
END
|
||||
|
||||
#endif // APSTUDIO_INVOKED
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Version
|
||||
//
|
||||
|
||||
VS_VERSION_INFO VERSIONINFO
|
||||
FILEVERSION 4,6,0,0
|
||||
PRODUCTVERSION 4,6,0,0
|
||||
FILEFLAGSMASK 0x17L
|
||||
#ifdef _DEBUG
|
||||
FILEFLAGS 0x1L
|
||||
#else
|
||||
FILEFLAGS 0x0L
|
||||
#endif
|
||||
FILEOS 0x4L
|
||||
FILETYPE 0x1L
|
||||
FILESUBTYPE 0x0L
|
||||
BEGIN
|
||||
BLOCK "StringFileInfo"
|
||||
BEGIN
|
||||
BLOCK "040904b0"
|
||||
BEGIN
|
||||
VALUE "CompanyName", "InfiniDB, Inc."
|
||||
VALUE "FileDescription", "InfiniDB UDF API"
|
||||
VALUE "FileVersion", "4.6.0-0"
|
||||
VALUE "InternalName", "libudfsdk"
|
||||
VALUE "LegalCopyright", "Copyright (C) 2014"
|
||||
VALUE "OriginalFilename", "libudfsdk.dll"
|
||||
VALUE "ProductName", "InfiniDB"
|
||||
VALUE "ProductVersion", "4.6.0.0 Beta"
|
||||
END
|
||||
END
|
||||
BLOCK "VarFileInfo"
|
||||
BEGIN
|
||||
VALUE "Translation", 0x409, 1200
|
||||
END
|
||||
END
|
||||
|
||||
#endif // English (U.S.) resources
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
|
||||
#ifndef APSTUDIO_INVOKED
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Generated from the TEXTINCLUDE 3 resource.
|
||||
//
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
#endif // not APSTUDIO_INVOKED
|
||||
|
@ -1,102 +0,0 @@
|
||||
// Microsoft Visual C++ generated resource script.
|
||||
//
|
||||
#include "resource.h"
|
||||
|
||||
#define APSTUDIO_READONLY_SYMBOLS
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Generated from the TEXTINCLUDE 2 resource.
|
||||
//
|
||||
#include "afxres.h"
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
#undef APSTUDIO_READONLY_SYMBOLS
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// English (U.S.) resources
|
||||
|
||||
#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU)
|
||||
#ifdef _WIN32
|
||||
LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US
|
||||
#pragma code_page(1252)
|
||||
#endif //_WIN32
|
||||
|
||||
#ifdef APSTUDIO_INVOKED
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// TEXTINCLUDE
|
||||
//
|
||||
|
||||
1 TEXTINCLUDE
|
||||
BEGIN
|
||||
"resource.h\0"
|
||||
END
|
||||
|
||||
2 TEXTINCLUDE
|
||||
BEGIN
|
||||
"#include ""afxres.h""\r\n"
|
||||
"\0"
|
||||
END
|
||||
|
||||
3 TEXTINCLUDE
|
||||
BEGIN
|
||||
"\r\n"
|
||||
"\0"
|
||||
END
|
||||
|
||||
#endif // APSTUDIO_INVOKED
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Version
|
||||
//
|
||||
|
||||
VS_VERSION_INFO VERSIONINFO
|
||||
FILEVERSION 4,6,0,0
|
||||
PRODUCTVERSION 4,6,0,0
|
||||
FILEFLAGSMASK 0x17L
|
||||
#ifdef _DEBUG
|
||||
FILEFLAGS 0x1L
|
||||
#else
|
||||
FILEFLAGS 0x0L
|
||||
#endif
|
||||
FILEOS 0x4L
|
||||
FILETYPE 0x1L
|
||||
FILESUBTYPE 0x0L
|
||||
BEGIN
|
||||
BLOCK "StringFileInfo"
|
||||
BEGIN
|
||||
BLOCK "040904b0"
|
||||
BEGIN
|
||||
VALUE "CompanyName", "InfiniDB, Inc."
|
||||
VALUE "FileDescription", "InfiniDB UDF API"
|
||||
VALUE "FileVersion", "4.6.0-0"
|
||||
VALUE "InternalName", "libudfsdk"
|
||||
VALUE "LegalCopyright", "Copyright (C) 2014"
|
||||
VALUE "OriginalFilename", "libudfsdk.dll"
|
||||
VALUE "ProductName", "InfiniDB"
|
||||
VALUE "ProductVersion", "4.6.0.0 Beta"
|
||||
END
|
||||
END
|
||||
BLOCK "VarFileInfo"
|
||||
BEGIN
|
||||
VALUE "Translation", 0x409, 1200
|
||||
END
|
||||
END
|
||||
|
||||
#endif // English (U.S.) resources
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
|
||||
#ifndef APSTUDIO_INVOKED
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Generated from the TEXTINCLUDE 3 resource.
|
||||
//
|
||||
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
#endif // not APSTUDIO_INVOKED
|
||||
|
Reference in New Issue
Block a user