1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(RGData,StringStore): add counting allocator capabilities to those ctors used in BPP::execute()

This commit is contained in:
drrtuy
2024-11-30 18:51:29 +00:00
parent 51374aef4d
commit 5383e7c5a2
14 changed files with 305 additions and 132 deletions

View File

@ -791,7 +791,7 @@ void BatchPrimitiveProcessorJL::getRowGroupData(ByteStream& in, vector<RGData>*
if (in.length() == 0) if (in.length() == 0)
{ {
// done, return an empty RG // done, return an empty RG
rgData = RGData(org, 0); rgData = RGData(org, 0U);
org.setData(&rgData); org.setData(&rgData);
org.resetRowGroup(0); org.resetRowGroup(0);
out->push_back(rgData); out->push_back(rgData);
@ -926,7 +926,7 @@ RGData BatchPrimitiveProcessorJL::getErrorRowGroupData(uint16_t error) const
RGData ret; RGData ret;
rowgroup::RowGroup rg(projectionRG); rowgroup::RowGroup rg(projectionRG);
ret = RGData(rg, 0); ret = RGData(rg, 0U);
rg.setData(&ret); rg.setData(&ret);
// rg.convertToInlineDataInPlace(); // rg.convertToInlineDataInPlace();
rg.resetRowGroup(0); rg.resetRowGroup(0);

View File

@ -239,7 +239,7 @@ uint32_t SubAdapterStep::nextBand(messageqcpp::ByteStream& bs)
if (fEndOfResult) if (fEndOfResult)
{ {
// send an empty / error band // send an empty / error band
RGData rgData(fRowGroupDeliver, 0); RGData rgData(fRowGroupDeliver, 0U);
fRowGroupDeliver.setData(&rgData); fRowGroupDeliver.setData(&rgData);
fRowGroupDeliver.resetRowGroup(0); fRowGroupDeliver.resetRowGroup(0);
fRowGroupDeliver.setStatus(status()); fRowGroupDeliver.setStatus(status());

View File

@ -611,7 +611,7 @@ uint32_t TupleAggregateStep::nextBand_singleThread(messageqcpp::ByteStream& bs)
postStepSummaryTele(sts); postStepSummaryTele(sts);
// send an empty / error band // send an empty / error band
RGData rgData(fRowGroupOut, 0); RGData rgData(fRowGroupOut, 0U);
fRowGroupOut.setData(&rgData); fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0); fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status()); fRowGroupOut.setStatus(status());
@ -5857,7 +5857,7 @@ uint64_t TupleAggregateStep::doThreadedAggregate(ByteStream& bs, RowGroupDL* dlp
else else
{ {
// send an empty / error band // send an empty / error band
RGData rgData(fRowGroupOut, 0); RGData rgData(fRowGroupOut, 0U);
fRowGroupOut.setData(&rgData); fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0); fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status()); fRowGroupOut.setStatus(status());

View File

@ -362,7 +362,7 @@ uint32_t TupleConstantStep::nextBand(messageqcpp::ByteStream& bs)
if (fEndOfResult) if (fEndOfResult)
{ {
// send an empty / error band // send an empty / error band
RGData rgData(fRowGroupOut, 0); RGData rgData(fRowGroupOut, 0U);
fRowGroupOut.setData(&rgData); fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0); fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status()); fRowGroupOut.setStatus(status());
@ -720,7 +720,7 @@ uint32_t TupleConstantOnlyStep::nextBand(messageqcpp::ByteStream& bs)
else else
{ {
// send an empty / error band // send an empty / error band
RGData rgData(fRowGroupOut, 0); RGData rgData(fRowGroupOut, 0U);
fRowGroupOut.setData(&rgData); fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0); fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status()); fRowGroupOut.setStatus(status());
@ -809,7 +809,7 @@ void TupleConstantBooleanStep::run()
uint32_t TupleConstantBooleanStep::nextBand(messageqcpp::ByteStream& bs) uint32_t TupleConstantBooleanStep::nextBand(messageqcpp::ByteStream& bs)
{ {
// send an empty band // send an empty band
RGData rgData(fRowGroupOut, 0); RGData rgData(fRowGroupOut, 0U);
fRowGroupOut.setData(&rgData); fRowGroupOut.setData(&rgData);
fRowGroupOut.resetRowGroup(0); fRowGroupOut.resetRowGroup(0);
fRowGroupOut.setStatus(status()); fRowGroupOut.setStatus(status());

View File

@ -1590,7 +1590,7 @@ uint32_t TupleUnion::nextBand(messageqcpp::ByteStream& bs)
outputRG.setData(&mem); outputRG.setData(&mem);
else else
{ {
mem = RGData(outputRG, 0); mem = RGData(outputRG, 0U);
outputRG.setData(&mem); outputRG.setData(&mem);
outputRG.resetRowGroup(0); outputRG.resetRowGroup(0);
outputRG.setStatus(status()); outputRG.setStatus(status());

View File

@ -38,6 +38,7 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <set> #include <set>
#include "rowgroup.h"
#include "serviceexemgr.h" #include "serviceexemgr.h"
#include <cstdlib> #include <cstdlib>
using namespace std; using namespace std;
@ -327,9 +328,14 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
typelessJoin.reset(new bool[joinerCount]); typelessJoin.reset(new bool[joinerCount]);
tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]); tlSmallSideKeyLengths.reset(new uint32_t[joinerCount]);
storedKeyAllocators.reset(new PoolAllocator[joinerCount]); // storedKeyAllocators.reset(new PoolAllocator[joinerCount]);
for (uint j = 0; j < joinerCount; ++j) for (uint j = 0; j < joinerCount; ++j)
storedKeyAllocators[j].setUseLock(true); {
// storedKeyAllocators[j].setUseLock(true);
// WIP use one copy of the allocator
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<utils::PoolAllocatorBufType>();
storedKeyAllocators.emplace_back(PoolAllocator(&allocator, PoolAllocator::DEFAULT_WINDOW_SIZE, false, true));
}
joinNullValues.reset(new uint64_t[joinerCount]); joinNullValues.reset(new uint64_t[joinerCount]);
doMatchNulls.reset(new bool[joinerCount]); doMatchNulls.reset(new bool[joinerCount]);
@ -360,16 +366,16 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
if (!typelessJoin[i]) if (!typelessJoin[i])
{ {
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<TJoiner::value_type>(); auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TJoiner::value_type>();
bs >> joinNullValues[i]; bs >> joinNullValues[i];
bs >> largeSideKeyColumns[i]; bs >> largeSideKeyColumns[i];
for (uint j = 0; j < processorThreads; ++j) for (uint j = 0; j < processorThreads; ++j)
tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), alloc)); tJoiners[i][j].reset(new TJoiner(10, TupleJoiner::hasher(), allocator));
} }
else else
{ {
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<TLJoiner::value_type>(); auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<TLJoiner::value_type>();
deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]); deserializeVector<uint32_t>(bs, tlLargeSideKeyColumns[i]);
bs >> tlSmallSideKeyLengths[i]; bs >> tlSmallSideKeyLengths[i];
@ -392,7 +398,7 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
mSmallSideKeyColumnsPtr, mSmallSideRGPtr); mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i], auto tlComparator = TupleJoiner::TypelessDataComparator(&outputRG, &tlLargeSideKeyColumns[i],
mSmallSideKeyColumnsPtr, mSmallSideRGPtr); mSmallSideKeyColumnsPtr, mSmallSideRGPtr);
tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, alloc)); tlJoiners[i][j].reset(new TLJoiner(10, tlHasher, tlComparator, allocator));
} }
} }
} }
@ -1375,7 +1381,7 @@ uint32_t BatchPrimitiveProcessor::executeTupleJoin(uint32_t startRid, RowGroup&
} }
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
void BatchPrimitiveProcessor::execute(StopWatch* stopwatch) void BatchPrimitiveProcessor::execute(StopWatch* stopwatch, messageqcpp::SBS& bs)
#else #else
void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs) void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs)
#endif #endif
@ -1599,7 +1605,6 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs)
{ {
for (j = 0; j < projectCount; ++j) for (j = 0; j < projectCount; ++j)
{ {
// cout << "projectionMap[" << j << "] = " << projectionMap[j] << endl;
if (projectionMap[j] != -1) if (projectionMap[j] != -1)
{ {
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
@ -1610,11 +1615,6 @@ void BatchPrimitiveProcessor::execute(messageqcpp::SBS& bs)
projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]); projectSteps[j]->projectIntoRowGroup(outputRG, projectionMap[j]);
#endif #endif
} }
// else
// cout << " no target found for OID " <<
// projectSteps[j]->getOID()
//<< endl;
} }
if (fe2) if (fe2)
{ {
@ -2216,8 +2216,8 @@ int BatchPrimitiveProcessor::operator()()
validCPData = false; validCPData = false;
cpDataFromDictScan = false; cpDataFromDictScan = false;
auto alloc = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>(); auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(&alloc)); messageqcpp::SBS bs(new ByteStream(&allocator));
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute"); stopwatch->start("BPP() execute");
@ -2278,17 +2278,19 @@ int BatchPrimitiveProcessor::operator()()
void BatchPrimitiveProcessor::allocLargeBuffers() void BatchPrimitiveProcessor::allocLargeBuffers()
{ {
auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<rowgroup::RGDataBufType>();
if (ot == ROW_GROUP && !outRowGroupData) if (ot == ROW_GROUP && !outRowGroupData)
{ {
// outputRG.setUseStringTable(true); // outputRG.setUseStringTable(true);
outRowGroupData.reset(new RGData(outputRG)); outRowGroupData.reset(new RGData(outputRG, &allocator));
outputRG.setData(outRowGroupData.get()); outputRG.setData(outRowGroupData.get());
} }
if (fe1 && !fe1Data) if (fe1 && !fe1Data)
{ {
// fe1Input.setUseStringTable(true); // fe1Input.setUseStringTable(true);
fe1Data.reset(new RGData(fe1Input)); fe1Data.reset(new RGData(fe1Input, &allocator));
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]); // fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
fe1Input.setData(fe1Data.get()); fe1Input.setData(fe1Data.get());
} }
@ -2296,14 +2298,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (fe2 && !fe2Data) if (fe2 && !fe2Data)
{ {
// fe2Output.setUseStringTable(true); // fe2Output.setUseStringTable(true);
fe2Data.reset(new RGData(fe2Output)); fe2Data.reset(new RGData(fe2Output, &allocator));
fe2Output.setData(fe2Data.get()); fe2Output.setData(fe2Data.get());
} }
if (getTupleJoinRowGroupData && !joinedRGMem) if (getTupleJoinRowGroupData && !joinedRGMem)
{ {
// joinedRG.setUseStringTable(true); // joinedRG.setUseStringTable(true);
joinedRGMem.reset(new RGData(joinedRG)); joinedRGMem.reset(new RGData(joinedRG, &allocator));
joinedRG.setData(joinedRGMem.get()); joinedRG.setData(joinedRGMem.get());
} }
} }
@ -2471,73 +2473,6 @@ SBPP BatchPrimitiveProcessor::duplicate()
return bpp; return bpp;
} }
#if 0
bool BatchPrimitiveProcessor::operator==(const BatchPrimitiveProcessor& bpp) const
{
uint32_t i;
if (ot != bpp.ot)
return false;
if (versionInfo != bpp.versionInfo)
return false;
if (txnID != bpp.txnID)
return false;
if (sessionID != bpp.sessionID)
return false;
if (stepID != bpp.stepID)
return false;
if (uniqueID != bpp.uniqueID)
return false;
if (gotValues != bpp.gotValues)
return false;
if (gotAbsRids != bpp.gotAbsRids)
return false;
if (needStrValues != bpp.needStrValues)
return false;
if (filterCount != bpp.filterCount)
return false;
if (projectCount != bpp.projectCount)
return false;
if (sendRidsAtDelivery != bpp.sendRidsAtDelivery)
return false;
if (hasScan != bpp.hasScan)
return false;
if (hasFilterStep != bpp.hasFilterStep)
return false;
if (filtOnString != bpp.filtOnString)
return false;
if (doJoin != bpp.doJoin)
return false;
if (doJoin)
/* Join equality test is a bit out of date */
if (joiner != bpp.joiner || joinerSize != bpp.joinerSize)
return false;
for (i = 0; i < filterCount; i++)
if (*filterSteps[i] != *bpp.filterSteps[i])
return false;
return true;
}
#endif
void BatchPrimitiveProcessor::asyncLoadProjectColumns() void BatchPrimitiveProcessor::asyncLoadProjectColumns()
{ {
// relLBID is the LBID related to the primMsg->LBID, // relLBID is the LBID related to the primMsg->LBID,

View File

@ -188,7 +188,7 @@ class BatchPrimitiveProcessor
void initProcessor(); void initProcessor();
#ifdef PRIMPROC_STOPWATCH #ifdef PRIMPROC_STOPWATCH
void execute(logging::StopWatch* stopwatch); void execute(logging::StopWatch* stopwatch, messageqcpp::SBS& bs);
#else #else
void execute(messageqcpp::SBS& bs); void execute(messageqcpp::SBS& bs);
#endif #endif
@ -379,14 +379,15 @@ class BatchPrimitiveProcessor
inline void getJoinResults(const rowgroup::Row& r, uint32_t jIndex, std::vector<uint32_t>& v); inline void getJoinResults(const rowgroup::Row& r, uint32_t jIndex, std::vector<uint32_t>& v);
// these allocators hold the memory for the keys stored in tlJoiners // these allocators hold the memory for the keys stored in tlJoiners
std::shared_ptr<utils::PoolAllocator[]> storedKeyAllocators; // WIP This was a shared vec of allocators originally but it might not be necessary.
// This might give far memory allocations for keys used by JOIN hashmap.
std::vector<utils::PoolAllocator> storedKeyAllocators;
/* PM Aggregation */ /* PM Aggregation */
rowgroup::RowGroup joinedRG; // if there's a join, the rows are formatted with this rowgroup::RowGroup joinedRG; // if there's a join, the rows are formatted with this
rowgroup::SP_ROWAGG_PM_t fAggregator; rowgroup::SP_ROWAGG_PM_t fAggregator;
rowgroup::RowGroup fAggregateRG; rowgroup::RowGroup fAggregateRG;
rowgroup::RGData fAggRowGroupData; rowgroup::RGData fAggRowGroupData;
// boost::scoped_array<uint8_t> fAggRowGroupData;
/* OR hacks */ /* OR hacks */
uint8_t bop; // BOP_AND or BOP_OR uint8_t bop; // BOP_AND or BOP_OR

View File

@ -73,6 +73,7 @@ if (WITH_UNITTESTS)
target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc) target_link_libraries(fair_threadpool_test ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES} processor dbbc)
gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:) gtest_add_tests(TARGET fair_threadpool_test TEST_PREFIX columnstore:)
set_source_files_properties(counting_allocator.cpp PROPERTIES COMPILE_FLAGS "-Wno-sign-compare")
add_executable(counting_allocator counting_allocator.cpp) add_executable(counting_allocator counting_allocator.cpp)
add_dependencies(counting_allocator googletest) add_dependencies(counting_allocator googletest)
target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES}) target_link_libraries(counting_allocator ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${GTEST_LIBRARIES})
@ -88,7 +89,7 @@ if (WITH_UNITTESTS)
add_test(NAME columnstore:brm_em_standalone COMMAND brm_em_standalone) add_test(NAME columnstore:brm_em_standalone COMMAND brm_em_standalone)
set_tests_properties(columnstore:brm_em_standalone PROPERTIES DISABLED True) set_tests_properties(columnstore:brm_em_standalone PROPERTIES DISABLED True)
endif() endif()
# -Werror=sign-compare
if (WITH_MICROBENCHMARKS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")) if (WITH_MICROBENCHMARKS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Debug"))
find_package(benchmark REQUIRED) find_package(benchmark REQUIRED)
add_executable(primitives_scan_bench primitives_scan_bench.cpp) add_executable(primitives_scan_bench primitives_scan_bench.cpp)

View File

@ -16,8 +16,11 @@
MA 02110-1301, USA. */ MA 02110-1301, USA. */
#include <gtest/gtest.h> // googletest header file #include <gtest/gtest.h> // googletest header file
#include <cstring>
#include <iostream> #include <iostream>
#include <numeric>
#include "countingallocator.h"
#include "rowgroup.h" #include "rowgroup.h"
#include "columnwidth.h" #include "columnwidth.h"
#include "joblisttypes.h" #include "joblisttypes.h"
@ -28,13 +31,44 @@
using CSCDataType = execplan::CalpontSystemCatalog::ColDataType; using CSCDataType = execplan::CalpontSystemCatalog::ColDataType;
using datatypes::TSInt128; using datatypes::TSInt128;
using RGFieldsType = std::vector<uint32_t>;
rowgroup::RowGroup setupRG(const std::vector<execplan::CalpontSystemCatalog::ColDataType>& cts,
const RGFieldsType& widths, const RGFieldsType& charsets)
{
std::vector<execplan::CalpontSystemCatalog::ColDataType> types = cts;
RGFieldsType offsets{2};
for (auto w : widths)
{
offsets.push_back(offsets.back() + w);
}
RGFieldsType roids(widths.size());
std::iota(roids.begin(), roids.end(), 3000);
RGFieldsType tkeys(widths.size());
std::fill(tkeys.begin(), tkeys.end(), 1);
RGFieldsType cscale(widths.size());
std::fill(cscale.begin(), cscale.end(), 0);
RGFieldsType precision(widths.size());
std::fill(precision.begin(), precision.end(), 20);
return rowgroup::RowGroup(roids.size(), // column count
offsets, // oldOffset
roids, // column oids
tkeys, // keys
types, // types
charsets, // charset numbers
cscale, // scale
precision, // precision
20, // sTableThreshold
true // useStringTable
);
}
class RowDecimalTest : public ::testing::Test class RowDecimalTest : public ::testing::Test
{ {
protected: protected:
void SetUp() override void SetUp() override
{ {
uint32_t precision = WIDE_DEC_PRECISION; uint32_t precision = WIDE_DEC_PRECISION;
uint32_t oid = 3001; uint32_t oid = 3001;
std::vector<CSCDataType> types; std::vector<CSCDataType> types;
@ -81,7 +115,6 @@ class RowDecimalTest : public ::testing::Test
20, // sTableThreshold 20, // sTableThreshold
false // useStringTable false // useStringTable
); );
rg = rgOut = inRG; rg = rgOut = inRG;
rgD.reinit(rg); rgD.reinit(rg);
rgDOut.reinit(rgOut); rgDOut.reinit(rgOut);
@ -317,3 +350,90 @@ TEST_F(RowDecimalTest, RowEqualsCheck)
} }
} }
} }
static const constexpr int64_t MemoryAllowance = 10 * 1024 * 1024;
class RGDataTest : public ::testing::Test
{
protected:
RGDataTest()
: allocatedMemory(MemoryAllowance), alloc(allocatedMemory, MemoryAllowance / 100) {}
void SetUp() override
{
rg = setupRG({execplan::CalpontSystemCatalog::VARCHAR, execplan::CalpontSystemCatalog::UDECIMAL,
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL,
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL},
{65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8});
// rgD = rowgroup::RGData(rg, &alloc);
// rg.setData(&rgD);
// rg.initRow(&r);
// rg.getRow(0, &r);
// for (size_t i = 0; i < sValueVector.size(); i++)
// {
// // setStringField
// r.setBinaryField_offset(&sValueVector[i], sizeof(sValueVector[0]), offsets[0]);
// r.setBinaryField_offset(&anotherValueVector[i], sizeof(anotherValueVector[0]), offsets[1]);
// r.setIntField(s64ValueVector[i], 2);
// r.setIntField(s32ValueVector[i], 3);
// r.setIntField(s16ValueVector[i], 4);
// r.setIntField(s8ValueVector[i], 5);
// r.nextRow(rowSize);
// }
// rowCount = sValueVector.size();
}
// void TearDown() override {}
rowgroup::Row r;
rowgroup::RowGroup rg;
rowgroup::RGData rgD;
std::atomic<int64_t> allocatedMemory{MemoryAllowance};
allocators::CountingAllocator<rowgroup::RGDataBufType> alloc;
};
// bool useStringTable = true;
TEST_F(RGDataTest, AllocData)
{
std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl;
rgD = rowgroup::RGData(rg, &alloc);
rg.setData(&rgD);
rg.initRow(&r);
rg.getRow(0, &r);
std::cout << " test inStringTable(colIndex) " << r.inStringTable(0) << std::endl;
std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl;
auto currentAllocation = allocatedMemory.load();
EXPECT_LE(currentAllocation, MemoryAllowance - rg.getMaxDataSize());
r.setStringField(utils::ConstString{"testaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, 0);
std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl;
std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl;
EXPECT_LE(allocatedMemory.load(), currentAllocation);
currentAllocation = allocatedMemory.load();
r.nextRow();
r.setStringField(utils::ConstString{"testaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}, 0);
std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl;
std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl;
EXPECT_EQ(allocatedMemory.load(), currentAllocation);
currentAllocation = allocatedMemory.load();
r.nextRow();
std::string longString(64 * 1024 + 1000, 'a');
auto cs = utils::ConstString(longString);
std::cout << "test longString " << longString.size() << " cs len " << cs.length()<< std::endl;
r.setStringField(cs, 0);
std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl;
std::cout << " test inStringTable " << r.getColumnWidth(0) << std::endl;
EXPECT_LE(allocatedMemory.load(), currentAllocation);
rgD = rowgroup::RGData(rg);
std::cout << " test allocatedMemery " << allocatedMemory.load() << std::endl;
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
// reinit
}

View File

@ -46,8 +46,12 @@ public:
CountingAllocator(const CountingAllocator<U>& other) noexcept CountingAllocator(const CountingAllocator<U>& other) noexcept
: memoryLimitRef_(other.memoryLimitRef_) {} : memoryLimitRef_(other.memoryLimitRef_) {}
// Allocate memory for n objects of type T // Allocate memory for n objects of type T
T* allocate(std::size_t n) { template <typename U = T>
typename std::enable_if<!std::is_array<U>::value, U*>::type
allocate(std::size_t n)
{
auto memCounted = memoryLimitRef_.fetch_sub(n * sizeof(T), std::memory_order_relaxed); auto memCounted = memoryLimitRef_.fetch_sub(n * sizeof(T), std::memory_order_relaxed);
if (memCounted < memoryLimitLowerBound) { if (memCounted < memoryLimitLowerBound) {
memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed);
@ -60,8 +64,25 @@ public:
return ptr; return ptr;
} }
template <typename U = T>
typename std::enable_if<std::is_array<U>::value, typename std::remove_extent<U>::type*>::type
allocate(std::size_t n)
{
auto memCounted = memoryLimitRef_.fetch_sub(n * sizeof(T), std::memory_order_relaxed);
if (memCounted < memoryLimitLowerBound) {
memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed);
throw std::bad_alloc();
}
T ptr = static_cast<T>(::operator new[](n));
// std::cout << "[Allocate] " << n * sizeof(T) << " bytes at " << static_cast<void*>(ptr)
// << ". current timit: " << std::dec << memoryLimitRef_.load() << std::hex << " bytes.\n";
return ptr;
}
// Deallocate memory for n objects of type T // Deallocate memory for n objects of type T
void deallocate(T* ptr, std::size_t n) noexcept { void deallocate(T* ptr, std::size_t n) noexcept
{
::operator delete(ptr); ::operator delete(ptr);
memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed); memoryLimitRef_.fetch_add(n * sizeof(T), std::memory_order_relaxed);
// std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr) // std::cout << "[Deallocate] " << n * sizeof(T) << " bytes from " << static_cast<void*>(ptr)
@ -70,12 +91,14 @@ public:
// Equality operators (allocators are equal if they share the same counter) // Equality operators (allocators are equal if they share the same counter)
template <typename U> template <typename U>
bool operator==(const CountingAllocator<U>& other) const noexcept { bool operator==(const CountingAllocator<U>& other) const noexcept
{
return &memoryLimitRef_ == &other.memoryLimitRef_; return &memoryLimitRef_ == &other.memoryLimitRef_;
} }
template <typename U> template <typename U>
bool operator!=(const CountingAllocator<U>& other) const noexcept { bool operator!=(const CountingAllocator<U>& other) const noexcept
{
return !(*this == other); return !(*this == other);
} }

View File

@ -52,13 +52,13 @@ void PoolAllocator::deallocateAll()
void PoolAllocator::newBlock() void PoolAllocator::newBlock()
{ {
std::shared_ptr<uint8_t[]> next; std::shared_ptr<PoolAllocatorBufType[]> next;
capacityRemaining = allocSize; capacityRemaining = allocSize;
if (!tmpSpace || mem.size() == 0) if (!tmpSpace || mem.size() == 0)
{ {
next.reset(new uint8_t[allocSize]); next.reset(new PoolAllocatorBufType[allocSize]);
mem.push_back(next); mem.push_back(next);
nextAlloc = next.get(); nextAlloc = next.get();
} }
@ -71,7 +71,7 @@ void* PoolAllocator::allocOOB(uint64_t size)
OOBMemInfo memInfo; OOBMemInfo memInfo;
memUsage += size; memUsage += size;
memInfo.mem.reset(new uint8_t[size]); memInfo.mem.reset(new PoolAllocatorBufType[size]);
memInfo.size = size; memInfo.size = size;
void* ret = (void*)memInfo.mem.get(); void* ret = (void*)memInfo.mem.get();
oob[ret] = memInfo; oob[ret] = memInfo;

View File

@ -33,8 +33,11 @@
#include <atomic> #include <atomic>
#include "countingallocator.h"
namespace utils namespace utils
{ {
using PoolAllocatorBufType = uint8_t;
class PoolAllocator class PoolAllocator
{ {
public: public:
@ -51,6 +54,18 @@ class PoolAllocator
, lock(false) , lock(false)
{ {
} }
PoolAllocator(allocators::CountingAllocator<PoolAllocatorBufType>* allocator, unsigned windowSize = DEFAULT_WINDOW_SIZE,
bool isTmpSpace = false, bool _useLock = false)
: allocSize(windowSize)
, tmpSpace(isTmpSpace)
, capacityRemaining(0)
, memUsage(0)
, nextAlloc(0)
, useLock(_useLock)
, lock(false)
, allocator(allocator)
{
}
PoolAllocator(const PoolAllocator& p) PoolAllocator(const PoolAllocator& p)
: allocSize(p.allocSize) : allocSize(p.allocSize)
, tmpSpace(p.tmpSpace) , tmpSpace(p.tmpSpace)
@ -59,6 +74,7 @@ class PoolAllocator
, nextAlloc(0) , nextAlloc(0)
, useLock(p.useLock) , useLock(p.useLock)
, lock(false) , lock(false)
, allocator(p.allocator)
{ {
} }
virtual ~PoolAllocator() virtual ~PoolAllocator()
@ -90,21 +106,23 @@ class PoolAllocator
void* allocOOB(uint64_t size); void* allocOOB(uint64_t size);
unsigned allocSize; unsigned allocSize;
std::vector<std::shared_ptr<uint8_t[]>> mem; std::vector<std::shared_ptr<PoolAllocatorBufType[]>> mem;
bool tmpSpace; bool tmpSpace;
unsigned capacityRemaining; unsigned capacityRemaining;
uint64_t memUsage; uint64_t memUsage;
uint8_t* nextAlloc; PoolAllocatorBufType* nextAlloc;
bool useLock; bool useLock;
std::atomic<bool> lock; std::atomic<bool> lock;
struct OOBMemInfo struct OOBMemInfo
{ {
std::shared_ptr<uint8_t[]> mem; std::shared_ptr<PoolAllocatorBufType[]> mem;
uint64_t size; uint64_t size;
}; };
typedef std::map<void*, OOBMemInfo> OutOfBandMap; typedef std::map<void*, OOBMemInfo> OutOfBandMap;
OutOfBandMap oob; // for mem chunks bigger than the window size; these can be dealloc'd OutOfBandMap oob; // for mem chunks bigger than the window size; these can be dealloc'd
// WIP rename to allocator
allocators::CountingAllocator<PoolAllocatorBufType>* allocator = nullptr;
}; };
inline void* PoolAllocator::allocate(uint64_t size) inline void* PoolAllocator::allocate(uint64_t size)
@ -136,4 +154,4 @@ inline void* PoolAllocator::allocate(uint64_t size)
return ret; return ret;
} }
} // namespace utils } // namespace allocators

View File

@ -31,7 +31,6 @@
#include <iterator> #include <iterator>
using namespace std; using namespace std;
#include <numeric> #include <numeric>
#include "bytestream.h" #include "bytestream.h"
@ -49,7 +48,10 @@ namespace rowgroup
{ {
using cscType = execplan::CalpontSystemCatalog::ColDataType; using cscType = execplan::CalpontSystemCatalog::ColDataType;
StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc) : StringStore()
{
this->alloc = alloc;
}
StringStore::~StringStore() StringStore::~StringStore()
{ {
@ -94,11 +96,22 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
if (mem.size() > 0) if (mem.size() > 0)
lastMC = (MemChunk*)mem.back().get(); lastMC = (MemChunk*)mem.back().get();
std::cout << "StringStore::storeString len " << len << std::endl;
if ((len + 4) >= CHUNK_SIZE) if ((len + 4) >= CHUNK_SIZE)
{ {
std::shared_ptr<uint8_t[]> newOne(new uint8_t[len + sizeof(MemChunk) + 4]); auto allocSize = len + sizeof(MemChunk) + 4;
longStrings.push_back(newOne); if (alloc)
lastMC = (MemChunk*)longStrings.back().get(); {
cout << "StringStore::storeString longStrings with alloc " << std::endl;
longStrings.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, allocSize));
}
else
{
cout << "StringStore::storeString longStrings no alloc " << std::endl;
longStrings.emplace_back(std::make_shared<uint8_t[]>(allocSize));
}
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[len + sizeof(MemChunk) + 4]);
lastMC = reinterpret_cast<MemChunk*>(longStrings.back().get());
lastMC->capacity = lastMC->currentSize = len + 4; lastMC->capacity = lastMC->currentSize = len + 4;
memcpy(lastMC->data, &len, 4); memcpy(lastMC->data, &len, 4);
memcpy(lastMC->data + 4, data, len); memcpy(lastMC->data + 4, data, len);
@ -112,10 +125,21 @@ uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
{ {
// mem usage debugging // mem usage debugging
// if (lastMC) // if (lastMC)
// cout << "Memchunk efficiency = " << lastMC->currentSize << "/" << lastMC->capacity << endl; if (alloc)
std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]); {
mem.push_back(newOne); cout << "StringStore::storeString with alloc " << std::endl;
lastMC = (MemChunk*)mem.back().get(); mem.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// std::allocate_shared) newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
}
else
{
cout << "StringStore::storeString no alloc " << std::endl;
mem.emplace_back(std::make_shared<uint8_t[]>(CHUNK_SIZE + sizeof(MemChunk)));
// mem.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, CHUNK_SIZE + sizeof(MemChunk)));
// std::shared_ptr<uint8_t[]> newOne(new uint8_t[CHUNK_SIZE + sizeof(MemChunk)]);
}
// mem.push_back(newOne);
lastMC = reinterpret_cast<MemChunk*>(mem.back().get());
lastMC->currentSize = 0; lastMC->currentSize = 0;
lastMC->capacity = CHUNK_SIZE; lastMC->capacity = CHUNK_SIZE;
memset(lastMC->data, 0, CHUNK_SIZE); memset(lastMC->data, 0, CHUNK_SIZE);
@ -172,7 +196,7 @@ void StringStore::deserialize(ByteStream& bs)
// mem.clear(); // mem.clear();
bs >> count; bs >> count;
mem.resize(count); // mem.resize(count);
bs >> tmp8; bs >> tmp8;
empty = (bool)tmp8; empty = (bool)tmp8;
@ -181,7 +205,18 @@ void StringStore::deserialize(ByteStream& bs)
bs >> size; bs >> size;
// cout << "deserializing " << size << " bytes\n"; // cout << "deserializing " << size << " bytes\n";
buf = bs.buf(); buf = bs.buf();
mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
if (alloc)
{
cout << "StringStore::deserialize with alloc " << std::endl;
mem.emplace_back(std::allocate_shared<StringStoreBufType>(*alloc, size + sizeof(MemChunk)));
}
else
{
cout << "StringStore::deserialize no alloc " << std::endl;
mem.emplace_back(std::make_shared<uint8_t[]>(size + sizeof(MemChunk)));
}
// mem[i].reset(new uint8_t[size + sizeof(MemChunk)]);
mc = (MemChunk*)mem[i].get(); mc = (MemChunk*)mem[i].get();
mc->currentSize = size; mc->currentSize = size;
mc->capacity = size; mc->capacity = size;
@ -302,7 +337,6 @@ void UserDataStore::deserialize(ByteStream& bs)
return; return;
} }
RGData::RGData(const RowGroup& rg, uint32_t rowCount) RGData::RGData(const RowGroup& rg, uint32_t rowCount)
{ {
// cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl;
@ -313,7 +347,6 @@ RGData::RGData(const RowGroup& rg, uint32_t rowCount)
userDataStore.reset(); userDataStore.reset();
#ifdef VALGRIND #ifdef VALGRIND
/* In a PM-join, we can serialize entire tables; not every value has been /* In a PM-join, we can serialize entire tables; not every value has been
* filled in yet. Need to look into that. Valgrind complains that * filled in yet. Need to look into that. Valgrind complains that
@ -343,13 +376,44 @@ RGData::RGData(const RowGroup& rg)
#endif #endif
} }
RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc) : alloc(alloc)
{
// rowData = shared_ptr<uint8_t[]>(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize);
// });
rowData = std::allocate_shared<RGDataBufType>(*alloc, rg.getMaxDataSize());
// rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]);
if (rg.usesStringTable())
strings.reset(new StringStore(alloc));
userDataStore.reset();
#ifdef VALGRIND
/* In a PM-join, we can serialize entire tables; not every value has been
* filled in yet. Need to look into that. Valgrind complains that
* those bytes are uninitialized, this suppresses that error.
*/
memset(rowData.get(), 0, rg.getMaxDataSize());
#endif
}
void RGData::reinit(const RowGroup& rg, uint32_t rowCount) void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
{ {
rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); if (alloc)
{
cout << "RGData::reinit with alloc " << std::endl;
rowData = std::allocate_shared<RGDataBufType>(*alloc, rg.getDataSize(rowCount));
}
else
{
cout << "RGData::reinit no alloc " << std::endl;
rowData.reset(new uint8_t[rg.getDataSize(rowCount)]);
}
userDataStore.reset(); userDataStore.reset();
if (rg.usesStringTable()) if (rg.usesStringTable())
strings.reset(new StringStore()); strings.reset(new StringStore(alloc));
else else
strings.reset(); strings.reset();
@ -1314,9 +1378,8 @@ string RowGroup::toString(const std::vector<uint64_t>& used) const
os << "rowcount = " << getRowCount() << endl; os << "rowcount = " << getRowCount() << endl;
if (!used.empty()) if (!used.empty())
{ {
uint64_t cnt = uint64_t cnt = std::accumulate(used.begin(), used.end(), 0ULL, [](uint64_t a, uint64_t bits)
std::accumulate(used.begin(), used.end(), 0ULL, { return a + __builtin_popcountll(bits); });
[](uint64_t a, uint64_t bits) { return a + __builtin_popcountll(bits); });
os << "sparse row count = " << cnt << endl; os << "sparse row count = " << cnt << endl;
} }
os << "base rid = " << getBaseRid() << endl; os << "base rid = " << getBaseRid() << endl;

View File

@ -40,6 +40,7 @@
#include <execinfo.h> #include <execinfo.h>
#include "countingallocator.h"
#include "hasher.h" #include "hasher.h"
#include "joblisttypes.h" #include "joblisttypes.h"
@ -126,10 +127,15 @@ inline T derefFromTwoVectorPtrs(const std::vector<T>* outer, const std::vector<T
return outer->operator[](outerIdx); return outer->operator[](outerIdx);
} }
using RGDataBufType = uint8_t[];
// using RGDataBufType = std::vector<uint8_t>;
using StringStoreBufType = uint8_t[];
class StringStore class StringStore
{ {
public: public:
StringStore() = default; StringStore() = default;
StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc);
StringStore(const StringStore&) = delete; StringStore(const StringStore&) = delete;
StringStore(StringStore&&) = delete; StringStore(StringStore&&) = delete;
StringStore& operator=(const StringStore&) = delete; StringStore& operator=(const StringStore&) = delete;
@ -184,6 +190,7 @@ class StringStore
bool empty = true; bool empty = true;
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex; boost::mutex fMutex;
allocators::CountingAllocator<StringStoreBufType>* alloc = nullptr;
}; };
// Where we store user data for UDA(n)F // Where we store user data for UDA(n)F
@ -248,6 +255,7 @@ class UserDataStore
class RowGroup; class RowGroup;
class Row; class Row;
/* TODO: OO the rowgroup data to the extent there's no measurable performance hit. */ /* TODO: OO the rowgroup data to the extent there's no measurable performance hit. */
class RGData class RGData
{ {
@ -255,6 +263,7 @@ class RGData
RGData() = default; // useless unless followed by an = or a deserialize operation RGData() = default; // useless unless followed by an = or a deserialize operation
RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData
explicit RGData(const RowGroup& rg); explicit RGData(const RowGroup& rg);
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc);
RGData& operator=(const RGData&) = default; RGData& operator=(const RGData&) = default;
RGData& operator=(RGData&&) = default; RGData& operator=(RGData&&) = default;
RGData(const RGData&) = default; RGData(const RGData&) = default;
@ -314,9 +323,10 @@ class RGData
} }
private: private:
std::shared_ptr<uint8_t[]> rowData; std::shared_ptr<RGDataBufType> rowData;
std::shared_ptr<StringStore> strings; std::shared_ptr<StringStore> strings;
std::shared_ptr<UserDataStore> userDataStore; std::shared_ptr<UserDataStore> userDataStore;
allocators::CountingAllocator<RGDataBufType>* alloc = nullptr;
// Need sig to support backward compat. RGData can deserialize both forms. // Need sig to support backward compat. RGData can deserialize both forms.
static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data
@ -584,9 +594,9 @@ class Row
} }
const CHARSET_INFO* getCharset(uint32_t col) const; const CHARSET_INFO* getCharset(uint32_t col) const;
inline bool inStringTable(uint32_t col) const;
private: private:
inline bool inStringTable(uint32_t col) const;
private: private:
uint32_t columnCount = 0; uint32_t columnCount = 0;
@ -987,6 +997,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex
if (inStringTable(colIndex)) if (inStringTable(colIndex))
{ {
std::cout << "setStringField storeString len " << length << std::endl;
offset = strings->storeString((const uint8_t*)str.str(), length); offset = strings->storeString((const uint8_t*)str.str(), length);
*((uint64_t*)&data[offsets[colIndex]]) = offset; *((uint64_t*)&data[offsets[colIndex]]) = offset;
// cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]]) // cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]])
@ -995,6 +1006,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex
} }
else else
{ {
std::cout << "setStringField memcpy " << std::endl;
memcpy(&data[offsets[colIndex]], str.str(), length); memcpy(&data[offsets[colIndex]], str.str(), length);
memset(&data[offsets[colIndex] + length], 0, offsets[colIndex + 1] - (offsets[colIndex] + length)); memset(&data[offsets[colIndex] + length], 0, offsets[colIndex + 1] - (offsets[colIndex] + length));
} }