1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-08 14:22:09 +03:00

feat(): propagate long strings SP type change

This commit is contained in:
drrtuy
2024-12-04 23:45:05 +00:00
parent 789a382be2
commit 937d09768b
7 changed files with 71 additions and 30 deletions

View File

@@ -19,6 +19,7 @@
#include <atomic>
#include <cstddef>
#include "countingallocator.h"
#include "rowgroup.h"
using namespace allocators;
@@ -108,6 +109,13 @@ TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
ptr.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
size_t allocSize = 16ULL * rowgroup::rgCommonSize;
auto buf = boost::allocate_shared<rowgroup::RGDataBufType>(allocator, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);
buf.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
}
// Test 5: Thread Safety - Concurrent Allocations and Deallocations

View File

@@ -180,17 +180,17 @@ void ByteStream::growBuf(uint32_t toSize)
}
}
std::vector<std::shared_ptr<uint8_t[]>>& ByteStream::getLongStrings()
std::vector<rowgroup::StringStoreBufSPType>& ByteStream::getLongStrings()
{
return longStrings;
}
const std::vector<std::shared_ptr<uint8_t[]>>& ByteStream::getLongStrings() const
const std::vector<rowgroup::StringStoreBufSPType>& ByteStream::getLongStrings() const
{
return longStrings;
}
void ByteStream::setLongStrings(const std::vector<std::shared_ptr<uint8_t[]>>& other)
void ByteStream::setLongStrings(const std::vector<rowgroup::StringStoreBufSPType>& other)
{
longStrings = other;
}

View File

@@ -37,6 +37,7 @@
#include "serializeable.h"
#include "any.hpp"
#include "countingallocator.h"
#include "buffertypes.h"
class ByteStreamTestSuite;
@@ -437,9 +438,9 @@ class ByteStream : public Serializeable
3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings.
// Methods to get and set `long strings`.
EXPORT std::vector<std::shared_ptr<uint8_t[]>>& getLongStrings();
EXPORT const std::vector<std::shared_ptr<uint8_t[]>>& getLongStrings() const;
EXPORT void setLongStrings(const std::vector<std::shared_ptr<uint8_t[]>>& other);
EXPORT std::vector<rowgroup::StringStoreBufSPType>& getLongStrings();
EXPORT const std::vector<rowgroup::StringStoreBufSPType>& getLongStrings() const;
EXPORT void setLongStrings(const std::vector<rowgroup::StringStoreBufSPType>& other);
friend class ::ByteStreamTestSuite;
@@ -474,7 +475,7 @@ class ByteStream : public Serializeable
BSBufType* fCurOutPtr; // the point in fBuf where data is extracted from next
uint32_t fMaxLen; // how big fBuf is currently
// Stores `long strings`.
std::vector<std::shared_ptr<uint8_t[]>> longStrings;
std::vector<rowgroup::StringStoreBufSPType> longStrings;
allocators::CountingAllocator<BSBufType>* allocator = nullptr;
};

View File

@@ -515,7 +515,7 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
return SBS(new ByteStream(0U));
res->advanceInputPtr(msglen);
std::vector<std::shared_ptr<uint8_t[]>> longStrings;
std::vector<rowgroup::StringStoreBufSPType> longStrings;
try
{
for (uint32_t i = 0; i < longStringSize; ++i)
@@ -527,7 +527,8 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
return SBS(new ByteStream(0U));
// Allocate new memory for the `long string`.
std::shared_ptr<uint8_t[]> longString(
// WIP must account this allocation also.
rowgroup::StringStoreBufSPType longString(
new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]);
uint8_t* longStringData = longString.get();

View File

@@ -0,0 +1,30 @@
/*
Copyright (c) 2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA.
*/
#pragma once
#include <boost/smart_ptr/shared_ptr.hpp>
#include <cstdint>
namespace rowgroup
{
using RGDataBufType = uint8_t[];
using StringStoreBufType = uint8_t[];
using StringStoreBufSPType = boost::shared_ptr<uint8_t[]>;
} // namespace rowgroup

View File

@@ -96,19 +96,19 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
if (mem.size() > 0)
lastMC = (MemChunk*)mem.back().get();
std::cout << "StringStore::storeString len " << len << std::endl;
// std::cout << "StringStore::storeString len " << len << std::endl;
if ((len + 4) >= CHUNK_SIZE)
{
auto allocSize = len + sizeof(MemChunk) + 4;
// if (alloc)
// {
// cout << "StringStore::storeString longStrings with alloc " << std::endl;
// longStrings.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, allocSize));
// }
// else
if (alloc)
{
cout << "StringStore::storeString longStrings no alloc " << std::endl;
longStrings.emplace_back(std::make_shared<uint8_t[]>(allocSize));
// cout << "StringStore::storeString longStrings with alloc " << std::endl;
longStrings.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, allocSize));
}
else
{
// cout << "StringStore::storeString longStrings no alloc " << std::endl;
longStrings.emplace_back(boost::make_shared<uint8_t[]>(allocSize));
}
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[len + sizeof(MemChunk) + 4]);
lastMC = reinterpret_cast<MemChunk*>(longStrings.back().get());
@@ -127,13 +127,13 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
// if (lastMC)
if (alloc)
{
cout << "StringStore::storeString with alloc " << std::endl;
// cout << "StringStore::storeString with alloc " << std::endl;
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// boost::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
}
else
{
cout << "StringStore::storeString no alloc " << std::endl;
// cout << "StringStore::storeString no alloc " << std::endl;
mem.emplace_back(boost::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk)));
// mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
@@ -196,7 +196,7 @@ void StringStore::deserialize(ByteStream& bs)
// mem.clear();
bs >> count;
// mem.resize(count);
mem.reserve(count);
bs >> tmp8;
empty = (bool)tmp8;
@@ -208,12 +208,12 @@ void StringStore::deserialize(ByteStream& bs)
if (alloc)
{
cout << "StringStore::deserialize with alloc " << std::endl;
// cout << "StringStore::deserialize with alloc " << std::endl;
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk)));
}
else
{
cout << "StringStore::deserialize no alloc " << std::endl;
// cout << "StringStore::deserialize no alloc " << std::endl;
mem.emplace_back(boost::make_shared<uint8_t[]>(size + sizeof(MemChunk)));
}
// mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
@@ -231,7 +231,7 @@ void StringStore::deserialize(ByteStream& bs)
void StringStore::clear()
{
vector<boost::shared_ptr<uint8_t[]> > emptyv;
vector<std::shared_ptr<uint8_t[]> > emptyv2;
vector<StringStoreBufSPType> emptyv2;
mem.swap(emptyv);
longStrings.swap(emptyv2);
empty = true;
@@ -401,12 +401,12 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
{
if (alloc)
{
cout << "RGData::reinit with alloc " << std::endl;
// cout << "RGData::reinit with alloc " << std::endl;
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount));
}
else
{
cout << "RGData::reinit no alloc " << std::endl;
// cout << "RGData::reinit no alloc " << std::endl;
rowData.reset(new uint8_t[rg.getDataSize(rowCount)]);
}

View File

@@ -54,6 +54,7 @@
#include "collation.h"
#include "common/hashfamily.h"
#include "buffertypes.h"
// Workaround for my_global.h #define of isnan(X) causing a std::std namespace
@@ -127,9 +128,9 @@ inline T derefFromTwoVectorPtrs(const std::vector<T>* outer, const std::vector<T
return outer->operator[](outerIdx);
}
using RGDataBufType = uint8_t[];
// using RGDataBufType = std::vector<uint8_t>;
using StringStoreBufType = uint8_t[];
// using RGDataBufType = uint8_t[];
// using StringStoreBufType = uint8_t[];
// using StringStoreBufSPType = boost::shared_ptr<uint8_t[]>;
class StringStore
{
@@ -186,7 +187,7 @@ class StringStore
std::vector<boost::shared_ptr<uint8_t[]>> mem;
// To store strings > 64KB (BLOB/TEXT)
std::vector<std::shared_ptr<uint8_t[]>> longStrings;
std::vector<StringStoreBufSPType> longStrings;
bool empty = true;
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex;