1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(): propagate long strings SP type change

This commit is contained in:
drrtuy
2024-12-04 23:45:05 +00:00
parent 4e86123a5a
commit 71ed9cabe0
7 changed files with 71 additions and 30 deletions

View File

@ -19,6 +19,7 @@
#include <atomic> #include <atomic>
#include <cstddef> #include <cstddef>
#include "countingallocator.h" #include "countingallocator.h"
#include "rowgroup.h"
using namespace allocators; using namespace allocators;
@ -108,6 +109,13 @@ TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)
ptr.reset(); ptr.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
size_t allocSize = 16ULL * rowgroup::rgCommonSize;
auto buf = boost::allocate_shared<rowgroup::RGDataBufType>(allocator, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);
buf.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
} }
// Test 5: Thread Safety - Concurrent Allocations and Deallocations // Test 5: Thread Safety - Concurrent Allocations and Deallocations

View File

@ -179,17 +179,17 @@ void ByteStream::growBuf(BSSizeType toSize)
} }
} }
std::vector<std::shared_ptr<uint8_t[]>>& ByteStream::getLongStrings() std::vector<rowgroup::StringStoreBufSPType>& ByteStream::getLongStrings()
{ {
return longStrings; return longStrings;
} }
const std::vector<std::shared_ptr<uint8_t[]>>& ByteStream::getLongStrings() const const std::vector<rowgroup::StringStoreBufSPType>& ByteStream::getLongStrings() const
{ {
return longStrings; return longStrings;
} }
void ByteStream::setLongStrings(const std::vector<std::shared_ptr<uint8_t[]>>& other) void ByteStream::setLongStrings(const std::vector<rowgroup::StringStoreBufSPType>& other)
{ {
longStrings = other; longStrings = other;
} }

View File

@ -38,6 +38,7 @@
#include "any.hpp" #include "any.hpp"
#include "nullstring.h" #include "nullstring.h"
#include "countingallocator.h" #include "countingallocator.h"
#include "buffertypes.h"
class ByteStreamTestSuite; class ByteStreamTestSuite;
@ -447,9 +448,9 @@ class ByteStream : public Serializeable
3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings. 3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings.
// Methods to get and set `long strings`. // Methods to get and set `long strings`.
EXPORT std::vector<std::shared_ptr<uint8_t[]>>& getLongStrings(); EXPORT std::vector<rowgroup::StringStoreBufSPType>& getLongStrings();
EXPORT const std::vector<std::shared_ptr<uint8_t[]>>& getLongStrings() const; EXPORT const std::vector<rowgroup::StringStoreBufSPType>& getLongStrings() const;
EXPORT void setLongStrings(const std::vector<std::shared_ptr<uint8_t[]>>& other); EXPORT void setLongStrings(const std::vector<rowgroup::StringStoreBufSPType>& other);
friend class ::ByteStreamTestSuite; friend class ::ByteStreamTestSuite;
@ -484,7 +485,7 @@ class ByteStream : public Serializeable
BSBufType* fCurOutPtr; // the point in fBuf where data is extracted from next BSBufType* fCurOutPtr; // the point in fBuf where data is extracted from next
BSSizeType fMaxLen; // how big fBuf is currently BSSizeType fMaxLen; // how big fBuf is currently
// Stores `long strings`. // Stores `long strings`.
std::vector<std::shared_ptr<uint8_t[]>> longStrings; std::vector<rowgroup::StringStoreBufSPType> longStrings;
allocators::CountingAllocator<BSBufType>* allocator = nullptr; allocators::CountingAllocator<BSBufType>* allocator = nullptr;
}; };

View File

@ -508,7 +508,7 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
return SBS(new ByteStream(0U)); return SBS(new ByteStream(0U));
res->advanceInputPtr(msglen); res->advanceInputPtr(msglen);
std::vector<std::shared_ptr<uint8_t[]>> longStrings; std::vector<rowgroup::StringStoreBufSPType> longStrings;
try try
{ {
for (uint32_t i = 0; i < longStringSize; ++i) for (uint32_t i = 0; i < longStringSize; ++i)
@ -520,7 +520,8 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
return SBS(new ByteStream(0U)); return SBS(new ByteStream(0U));
// Allocate new memory for the `long string`. // Allocate new memory for the `long string`.
std::shared_ptr<uint8_t[]> longString( // WIP must account this allocation also.
rowgroup::StringStoreBufSPType longString(
new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]); new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]);
uint8_t* longStringData = longString.get(); uint8_t* longStringData = longString.get();

View File

@ -0,0 +1,30 @@
/*
Copyright (c) 2024 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA.
*/
#pragma once
#include <boost/smart_ptr/shared_ptr.hpp>
#include <cstdint>
namespace rowgroup
{
using RGDataBufType = uint8_t[];
using StringStoreBufType = uint8_t[];
using StringStoreBufSPType = boost::shared_ptr<uint8_t[]>;
} // namespace rowgroup

View File

@ -96,19 +96,19 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
if (mem.size() > 0) if (mem.size() > 0)
lastMC = (MemChunk*)mem.back().get(); lastMC = (MemChunk*)mem.back().get();
std::cout << "StringStore::storeString len " << len << std::endl; // std::cout << "StringStore::storeString len " << len << std::endl;
if ((len + 4) >= CHUNK_SIZE) if ((len + 4) >= CHUNK_SIZE)
{ {
auto allocSize = len + sizeof(MemChunk) + 4; auto allocSize = len + sizeof(MemChunk) + 4;
// if (alloc) if (alloc)
// {
// cout << "StringStore::storeString longStrings with alloc " << std::endl;
// longStrings.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, allocSize));
// }
// else
{ {
cout << "StringStore::storeString longStrings no alloc " << std::endl; // cout << "StringStore::storeString longStrings with alloc " << std::endl;
longStrings.emplace_back(std::make_shared<uint8_t[]>(allocSize)); longStrings.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, allocSize));
}
else
{
// cout << "StringStore::storeString longStrings no alloc " << std::endl;
longStrings.emplace_back(boost::make_shared<uint8_t[]>(allocSize));
} }
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[len + sizeof(MemChunk) + 4]); // std::shared_ptr<uint8_t[]> newOne(new uint8_t[len + sizeof(MemChunk) + 4]);
lastMC = reinterpret_cast<MemChunk*>(longStrings.back().get()); lastMC = reinterpret_cast<MemChunk*>(longStrings.back().get());
@ -127,13 +127,13 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
// if (lastMC) // if (lastMC)
if (alloc) if (alloc)
{ {
cout << "StringStore::storeString with alloc " << std::endl; // cout << "StringStore::storeString with alloc " << std::endl;
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk))); mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// boost::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); // boost::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
} }
else else
{ {
cout << "StringStore::storeString no alloc " << std::endl; // cout << "StringStore::storeString no alloc " << std::endl;
mem.emplace_back(boost::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk))); mem.emplace_back(boost::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk)));
// mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk))); // mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); // std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
@ -196,7 +196,7 @@ void StringStore::deserialize(ByteStream& bs)
// mem.clear(); // mem.clear();
bs >> count; bs >> count;
// mem.resize(count); mem.reserve(count);
bs >> tmp8; bs >> tmp8;
empty = (bool)tmp8; empty = (bool)tmp8;
@ -208,12 +208,12 @@ void StringStore::deserialize(ByteStream& bs)
if (alloc) if (alloc)
{ {
cout << "StringStore::deserialize with alloc " << std::endl; // cout << "StringStore::deserialize with alloc " << std::endl;
mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk))); mem.emplace_back(boost::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk)));
} }
else else
{ {
cout << "StringStore::deserialize no alloc " << std::endl; // cout << "StringStore::deserialize no alloc " << std::endl;
mem.emplace_back(boost::make_shared<uint8_t[]>(size + sizeof(MemChunk))); mem.emplace_back(boost::make_shared<uint8_t[]>(size + sizeof(MemChunk)));
} }
// mem[i].reset(new uint8_t[size + sizeof(MemChunk)]); // mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
@ -231,7 +231,7 @@ void StringStore::deserialize(ByteStream& bs)
void StringStore::clear() void StringStore::clear()
{ {
vector<boost::shared_ptr<uint8_t[]> > emptyv; vector<boost::shared_ptr<uint8_t[]> > emptyv;
vector<std::shared_ptr<uint8_t[]> > emptyv2; vector<StringStoreBufSPType> emptyv2;
mem.swap(emptyv); mem.swap(emptyv);
longStrings.swap(emptyv2); longStrings.swap(emptyv2);
empty = true; empty = true;
@ -380,12 +380,12 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
{ {
if (alloc) if (alloc)
{ {
cout << "RGData::reinit with alloc " << std::endl; // cout << "RGData::reinit with alloc " << std::endl;
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount)); rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount));
} }
else else
{ {
cout << "RGData::reinit no alloc " << std::endl; // cout << "RGData::reinit no alloc " << std::endl;
rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); rowData.reset(new uint8_t[rg.getDataSize(rowCount)]);
} }

View File

@ -54,6 +54,7 @@
#include "collation.h" #include "collation.h"
#include "common/hashfamily.h" #include "common/hashfamily.h"
#include "buffertypes.h"
#include <cstdlib> #include <cstdlib>
#include "execinfo.h" #include "execinfo.h"
@ -131,9 +132,9 @@ inline T derefFromTwoVectorPtrs(const std::vector<T>* outer, const std::vector<T
return outer->operator[](outerIdx); return outer->operator[](outerIdx);
} }
using RGDataBufType = uint8_t[]; // using RGDataBufType = uint8_t[];
// using RGDataBufType = std::vector<uint8_t>; // using StringStoreBufType = uint8_t[];
using StringStoreBufType = uint8_t[]; // using StringStoreBufSPType = boost::shared_ptr<uint8_t[]>;
class StringStore class StringStore
{ {
@ -193,7 +194,7 @@ class StringStore
std::vector<boost::shared_ptr<uint8_t[]>> mem; std::vector<boost::shared_ptr<uint8_t[]>> mem;
// To store strings > 64KB (BLOB/TEXT) // To store strings > 64KB (BLOB/TEXT)
std::vector<std::shared_ptr<uint8_t[]>> longStrings; std::vector<StringStoreBufSPType> longStrings;
bool empty = true; bool empty = true;
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex; boost::mutex fMutex;