diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.cpp b/dbcon/joblist/batchprimitiveprocessor-jl.cpp index 7d94daa8f..1163661e8 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.cpp +++ b/dbcon/joblist/batchprimitiveprocessor-jl.cpp @@ -72,7 +72,6 @@ BatchPrimitiveProcessorJL::BatchPrimitiveProcessorJL(const ResourceManager* rm) , LBIDTrace(false) , tupleLength(0) , status(0) - , sendRowGroups(false) , valueColumn(0) , sendTupleJoinRowGroupData(false) , bop(BOP_AND) @@ -147,7 +146,7 @@ void BatchPrimitiveProcessorJL::addFilterStep(const pDictionaryStep& step) tableOID = step.tableOid(); - if (filterCount == 0 && !sendRowGroups) + if (filterCount == 0) { sendAbsRids = true; sendValues = true; @@ -244,7 +243,7 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& step) if (utils::isWide(cc->getWidth())) wideColumnsWidths |= cc->getWidth(); - if (filterCount == 0 && !sendRowGroups) + if (filterCount == 0) sendValues = true; idbassert(sessionID == step.sessionId()); @@ -283,7 +282,7 @@ void BatchPrimitiveProcessorJL::addProjectStep(const PassThruStep& p, const pDic projectCount++; needStrValues = true; - if (filterCount == 0 && !sendRowGroups) + if (filterCount == 0) { sendValues = true; sendAbsRids = true; @@ -1054,9 +1053,6 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const if (tJoiners.size() > 0) flags |= HAS_JOINER; - if (sendRowGroups) - flags |= HAS_ROWGROUP; - if (sendTupleJoinRowGroupData) flags |= JOIN_ROWGROUP_DATA; @@ -1071,12 +1067,6 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const bs << bop; bs << (uint8_t)(forHJ ? 1 : 0); - if (sendRowGroups) - { - bs << valueColumn; - bs << inputRG; - } - if (ot == ROW_GROUP) { bs << projectionRG; @@ -1248,6 +1238,7 @@ void BatchPrimitiveProcessorJL::createBPP(ByteStream& bs) const * (projection count)x run msgs for projection Commands */ +// The deser counterpart function is BPP::resetBPP void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isExeMgrDEC) { ISMPacketHeader ism; @@ -1289,35 +1280,28 @@ void BatchPrimitiveProcessorJL::runBPP(ByteStream& bs, uint32_t pmNum, bool isEx bs << sentByEM; if (_hasScan) + { idbassert(ridCount == 0); - else if (!sendRowGroups) + } + else + { idbassert(ridCount > 0 && (ridMap != 0 || sendAbsRids)); - else - idbassert(inputRG.getRowCount() > 0); - - if (sendRowGroups) - { - uint32_t rgSize = inputRG.getDataSize(); - bs << rgSize; - bs.append(inputRG.getData(), rgSize); } + + bs << ridCount; + + if (sendAbsRids) + bs.append((uint8_t*)absRids.get(), ridCount << 3); else { - bs << ridCount; - - if (sendAbsRids) - bs.append((uint8_t*)absRids.get(), ridCount << 3); - else - { - bs << ridMap; - bs << baseRid; - bs.append((uint8_t*)relRids, ridCount << 1); - } - - if (sendValues) - bs.append((uint8_t*)values, ridCount << 3); + bs << ridMap; + bs << baseRid; + bs.append((uint8_t*)relRids, ridCount << 1); } + if (sendValues) + bs.append((uint8_t*)values, ridCount << 3); + for (i = 0; i < filterCount; i++) filterSteps[i]->runCommand(bs); @@ -1667,7 +1651,6 @@ void BatchPrimitiveProcessorJL::setJoinedRowGroup(const rowgroup::RowGroup& rg) void BatchPrimitiveProcessorJL::setInputRowGroup(const rowgroup::RowGroup& rg) { - sendRowGroups = true; sendAbsRids = false; sendValues = false; inputRG = rg; diff --git a/dbcon/joblist/batchprimitiveprocessor-jl.h b/dbcon/joblist/batchprimitiveprocessor-jl.h index 365ea4ad0..2b9e400b0 100644 --- a/dbcon/joblist/batchprimitiveprocessor-jl.h +++ b/dbcon/joblist/batchprimitiveprocessor-jl.h @@ -343,7 +343,6 @@ class BatchPrimitiveProcessorJL /* for RowGroup return type */ rowgroup::RowGroup inputRG, projectionRG; - bool sendRowGroups; uint32_t valueColumn; /* for PM Aggregation */ diff --git a/dbcon/joblist/lbidlist.cpp b/dbcon/joblist/lbidlist.cpp index 34a5f23d5..ac97e49af 100644 --- a/dbcon/joblist/lbidlist.cpp +++ b/dbcon/joblist/lbidlist.cpp @@ -20,6 +20,7 @@ * ******************************************************************************/ #include +#include "bytestream.h" #include "primitivemsg.h" #include "blocksize.h" #include "lbidlist.h" @@ -700,7 +701,7 @@ bool LBIDList::CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange, const execplan::CalpontSystemCatalog::ColType& ct, const uint8_t BOP, bool isDict) { - int length = bs->length(), pos = 0; + messageqcpp::BSSizeType length = bs->length(), pos = 0; const char* MsgDataPtr = (const char*)bs->buf(); bool scan = true; int64_t value = 0; diff --git a/dbcon/joblist/primitivemsg.h b/dbcon/joblist/primitivemsg.h index 479e1535a..f06a7e99c 100644 --- a/dbcon/joblist/primitivemsg.h +++ b/dbcon/joblist/primitivemsg.h @@ -282,6 +282,8 @@ struct ISMPacketHeader uint32_t Interleave; uint16_t Flags; uint8_t Command; + // !!! This attribute is used to store a sum which arg type is potentially uint64_t. + // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint16_t Size; unsigned Type : 4; unsigned MsgCount : 4; diff --git a/dbcon/joblist/resourcemanager.cpp b/dbcon/joblist/resourcemanager.cpp index e162f1a1f..e61da3c4d 100644 --- a/dbcon/joblist/resourcemanager.cpp +++ b/dbcon/joblist/resourcemanager.cpp @@ -368,6 +368,7 @@ bool ResourceManager::getMemory(int64_t amount, boost::shared_ptr& sess return (ret1 && ret2); } // Don't care about session memory +// The amount type is unsafe if amount close to max that is unrealistic in 2024. bool ResourceManager::getMemory(int64_t amount, bool patience) { bool ret1 = (atomicops::atomicSub(&totalUmMemLimit, amount) >= 0); diff --git a/dbcon/joblist/rowestimator.cpp b/dbcon/joblist/rowestimator.cpp index fa7e8920c..1b2b2f18a 100644 --- a/dbcon/joblist/rowestimator.cpp +++ b/dbcon/joblist/rowestimator.cpp @@ -19,6 +19,7 @@ * $Id: rowestimator.cpp 5642 2009-08-10 21:04:59Z wweeks $ * ******************************************************************************/ +#include #include #include "primitivemsg.h" #include "blocksize.h" @@ -292,7 +293,7 @@ float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry, const m // For example, there are two operations for "col1 > 5 and col1 < 10": // 1) col1 > 5 // 2) col2 < 10 - int length = bs->length(), pos = 0; + messageqcpp::BSSizeType length = bs->length(), pos = 0; const char* msgDataPtr = (const char*)bs->buf(); int64_t value = 0; int128_t bigValue = 0; @@ -301,6 +302,7 @@ float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry, const m for (int i = 0; i < comparisonLimit; i++) { + assert(ct.colWidth >= 0); pos += ct.colWidth + 2; // predicate + op + lcf // TODO: Stole this condition from lbidlist. diff --git a/dbcon/joblist/tuplehashjoin.cpp b/dbcon/joblist/tuplehashjoin.cpp index 13059e8b2..6e6a60c3d 100644 --- a/dbcon/joblist/tuplehashjoin.cpp +++ b/dbcon/joblist/tuplehashjoin.cpp @@ -417,6 +417,7 @@ void TupleHashJoinStep::smallRunnerFcn(uint32_t index, uint threadID, uint64_t* smallRG.initRow(&r); try { + // Very unfortunate choice for the type b/c of RM::getMemory type. ssize_t rgSize; bool gotMem; goto next; diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 8616cd4d1..6ac70dc77 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -4300,7 +4300,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (from_tzinfo) { serializeTimezoneInfo(bs, from_tzinfo); - uint32_t length = bs.length(); + messageqcpp::BSSizeType length = bs.length(); uint8_t* buf = new uint8_t[length]; bs >> buf; tzinfo = string((char*)buf, length); @@ -4312,7 +4312,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (to_tzinfo) { serializeTimezoneInfo(bs, to_tzinfo); - uint32_t length = bs.length(); + messageqcpp::BSSizeType length = bs.length(); uint8_t* buf = new uint8_t[length]; bs >> buf; tzinfo = string((char*)buf, length); diff --git a/primitives/primproc/columncommand.h b/primitives/primproc/columncommand.h index cc08f6edb..be99261c4 100644 --- a/primitives/primproc/columncommand.h +++ b/primitives/primproc/columncommand.h @@ -172,6 +172,8 @@ class ColumnCommand : public Command // the length of base prim msg, which is everything up to the // rid array for the pCol message + // !!! This attribute is used to store a sum which arg type is potentially uint64_t. + // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint32_t baseMsgLength; uint64_t lbid; diff --git a/primitives/primproc/dictstep.h b/primitives/primproc/dictstep.h index 5200b80c6..9c5120b98 100644 --- a/primitives/primproc/dictstep.h +++ b/primitives/primproc/dictstep.h @@ -151,6 +151,8 @@ class DictStep : public Command int compressionType; messageqcpp::ByteStream filterString; uint32_t filterCount; + // !!! This attribute is used to store a sum which arg type is potentially uint64_t. + // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint32_t bufferSize; uint32_t charsetNumber; uint16_t inputRidCount; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 079757126..b7632f301 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -80,6 +80,10 @@ if (WITH_UNITTESTS) target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) add_test(NAME columnstore:comparators_tests COMMAND comparators_tests) + add_executable(bytestream bytestream.cpp) + target_link_libraries(bytestream ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) + add_test(NAME columnstore:bytestream COMMAND bytestream) + # standalone EM routines test add_executable(brm_em_standalone brm-em-standalone.cpp) target_link_libraries(brm_em_standalone ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) diff --git a/tests/bytestream.cpp b/tests/bytestream.cpp new file mode 100644 index 000000000..76e60826b --- /dev/null +++ b/tests/bytestream.cpp @@ -0,0 +1,922 @@ +/* Copyright (C) 2024 MariaDB Corporation. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +#include + +using namespace std; +#include +#include +#include +#include +#include + +#include + +#include "bytestream.h" +using namespace messageqcpp; +#include "configcpp.h" +using namespace config; +#include "mcs_decimal.h" + +class ByteStreamTestSuite : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(ByteStreamTestSuite); + + CPPUNIT_TEST(bs_1); + CPPUNIT_TEST(bs_1_1); + CPPUNIT_TEST(bs_1_2); + CPPUNIT_TEST(bs_2); + CPPUNIT_TEST(bs_3); + CPPUNIT_TEST(bs_4); + CPPUNIT_TEST_EXCEPTION(bs_5_1, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_2, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_3, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_4, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_5, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_6, std::underflow_error); + CPPUNIT_TEST(bs_6); + CPPUNIT_TEST(bs_7); + CPPUNIT_TEST(bs_8); + CPPUNIT_TEST_EXCEPTION(bs_9, std::underflow_error); + CPPUNIT_TEST(bs_10); + CPPUNIT_TEST(bs_12); + CPPUNIT_TEST(bs_13); + CPPUNIT_TEST(bs_14); + CPPUNIT_TEST(bs_15); + CPPUNIT_TEST(bs_16); + CPPUNIT_TEST_SUITE_END(); + + private: + ByteStream::byte b; + ByteStream::doublebyte d; + ByteStream::quadbyte q; + ByteStream::octbyte o; + + uint8_t u8; + uint16_t u16; + uint32_t u32; + uint64_t u64; + uint128_t u128; + int8_t i8; + int16_t i16; + int32_t i32; + int64_t i64; + int128_t i128; + + ByteStream bs; + ByteStream bs1; + + ByteStream::byte* bap; + ByteStream::byte* bap1; + + int len; + + public: + void setUp() + { + bs.reset(); + bs1.reset(); + bap = 0; + bap1 = 0; + } + + void tearDown() + { + bs.reset(); + bs1.reset(); + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + } + + void bs_1() + { + bs.reset(); + + o = 0xdeadbeefbadc0ffeLL; + bs << o; + CPPUNIT_ASSERT(bs.length() == 8); + o = 0; + bs >> o; + CPPUNIT_ASSERT(o == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 0); + + q = 0xdeadbeef; + bs << q; + CPPUNIT_ASSERT(bs.length() == 4); + q = 0; + bs >> q; + CPPUNIT_ASSERT(q == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 0); + + d = 0xf00f; + bs << d; + CPPUNIT_ASSERT(bs.length() == 2); + d = 0; + bs >> d; + CPPUNIT_ASSERT(d == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 0); + + b = 0x0f; + bs << b; + CPPUNIT_ASSERT(bs.length() == 1); + b = 0; + bs >> b; + CPPUNIT_ASSERT(b == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + + o = 0xdeadbeefbadc0ffeLL; + bs << o; + CPPUNIT_ASSERT(bs.length() == 8); + o = 0; + + q = 0xdeadbeef; + bs << q; + CPPUNIT_ASSERT(bs.length() == 12); + q = 0; + + d = 0xf00f; + bs << d; + CPPUNIT_ASSERT(bs.length() == 14); + d = 0; + + b = 0x0f; + bs << b; + CPPUNIT_ASSERT(bs.length() == 15); + b = 0; + + bs >> o; + CPPUNIT_ASSERT(o == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 7); + bs >> q; + CPPUNIT_ASSERT(q == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 3); + bs >> d; + CPPUNIT_ASSERT(d == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 1); + bs >> b; + CPPUNIT_ASSERT(b == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_1_1() + { + bs.reset(); + + o = 0xdeadbeefbadc0ffeLL; + bs << o; + CPPUNIT_ASSERT(bs.length() == 8); + o = 0; + + q = 0xdeadbeef; + bs << q; + CPPUNIT_ASSERT(bs.length() == 12); + q = 0; + + d = 0xf00f; + bs << d; + CPPUNIT_ASSERT(bs.length() == 14); + d = 0; + + b = 0x0f; + bs << b; + CPPUNIT_ASSERT(bs.length() == 15); + b = 0; + + ByteStream bbs1; + bbs1 << bs; + CPPUNIT_ASSERT(bbs1.length() == bs.length() + sizeof(messageqcpp::BSSizeType)); + bs.reset(); + bbs1 >> bs; + CPPUNIT_ASSERT(bbs1.length() == 0); + CPPUNIT_ASSERT(bs.length() == 15); + + bs >> o; + CPPUNIT_ASSERT(o == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 7); + bs >> q; + CPPUNIT_ASSERT(q == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 3); + bs >> d; + CPPUNIT_ASSERT(d == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 1); + bs >> b; + CPPUNIT_ASSERT(b == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_1_2() + { + bs.reset(); + + i64 = -2401053089477160962; + bs << i64; + CPPUNIT_ASSERT(bs.length() == 8); + i64 = 0; + + i32 = -559038737; + bs << i32; + CPPUNIT_ASSERT(bs.length() == 12); + i32 = 0; + + i16 = -4081; + bs << i16; + CPPUNIT_ASSERT(bs.length() == 14); + i16 = 0; + + i8 = 15; + bs << i8; + CPPUNIT_ASSERT(bs.length() == 15); + i8 = 0; + + bs >> i64; + CPPUNIT_ASSERT(i64 == -2401053089477160962); + CPPUNIT_ASSERT(bs.length() == 7); + + bs >> i32; + CPPUNIT_ASSERT(i32 == -559038737); + CPPUNIT_ASSERT(bs.length() == 3); + + bs >> i16; + CPPUNIT_ASSERT(i16 == -4081); + CPPUNIT_ASSERT(bs.length() == 1); + + bs >> i8; + CPPUNIT_ASSERT(i8 == 15); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_2() + { + int i; + + bs.reset(); + srand(time(0)); + + for (i = 0; i < 10240; i++) + { + bs << (uint32_t)rand(); + } + + bs1 = bs; + + uint32_t q1; + + for (i = 0; i < 10240; i++) + { + bs >> u32; + bs1 >> q1; + CPPUNIT_ASSERT(u32 == q1); + } + + bs.reset(); + bs1.reset(); + } + + void bs_3() + { + uint8_t ba[1024] = { + 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, + }; + + bs.load(ba, 8); + CPPUNIT_ASSERT(bs.length() == 8); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x12); + bs >> u16; + CPPUNIT_ASSERT(u16 == 0x5634); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdebc9a78); + + CPPUNIT_ASSERT(bs.length() == 1); + + bs.reset(); + CPPUNIT_ASSERT(bs.length() == 0); + + bs.load(ba, 8); + len = bs.length(); + CPPUNIT_ASSERT(len == 8); + bap = new ByteStream::byte[len]; + // bs >> bap; + memcpy(bap, bs.buf(), len); + CPPUNIT_ASSERT(memcmp(ba, bap, len) == 0); + delete[] bap; + bap = 0; + + bs.reset(); + + for (u32 = 0; u32 < 20480; u32++) + { + bs << u32; + } + + len = bs.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(u32))); + bap = new ByteStream::byte[len]; + // bs >> bap; + memcpy(bap, bs.buf(), len); + + bs.reset(); + + for (u32 = 0; u32 < 20480; u32++) + { + bs << u32; + } + + len = bs.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(q))); + bap1 = new ByteStream::byte[len]; + // bs >> bap1; + memcpy(bap1, bs.buf(), len); + + CPPUNIT_ASSERT(memcmp(bap1, bap, len) == 0); + + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + bs.reset(); + } + void bs_4() + { + for (i32 = 0; i32 < 20480; i32++) + { + bs << i32; + } + + ByteStream bs2(bs); + len = bs2.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(i32))); + bap = new ByteStream::byte[len]; + // bs2 >> bap; + memcpy(bap, bs2.buf(), len); + + bs1 = bs2; + len = bs1.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(i32))); + bap1 = new ByteStream::byte[len]; + // bs1 >> bap1; + memcpy(bap1, bs1.buf(), len); + + CPPUNIT_ASSERT(memcmp(bap1, bap, len) == 0); + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + bs.reset(); + bs1.reset(); + bs2.reset(); + } + + void bs_5_1() + { + bs.reset(); + + u8 = 0x0f; + bs << u8; + + for (;;) + bs >> u32; + } + + void bs_5_2() + { + bs.reset(); + + u8 = 0x0f; + bs << u8; + + for (;;) + bs >> u16; + } + + void bs_5_3() + { + bs.reset(); + + u8 = 0x0f; + bs << u8; + + for (;;) + bs >> u8; + } + + void bs_5_4() + { + bs.reset(); + + i8 = 0x0f; + bs << i8; + + for (;;) + bs >> i32; + } + + void bs_5_5() + { + bs.reset(); + + i8 = 0x0f; + bs << i8; + + for (;;) + bs >> i16; + } + + void bs_5_6() + { + bs.reset(); + + i8 = 0x0f; + bs << i8; + + for (;;) + bs >> i8; + } + + void bs_6() + { + u8 = 0x1a; + bs << u8; + u8 = 0x2b; + bs << u8; + u8 = 0x3c; + bs << u8; + + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x1a); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x2b); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x3c); + + bs.reset(); + + u8 = 12; + bs << u8; + u8 = 3; + bs << u8; + u8 = 0; + bs << u8; + u8 = 2; + bs << u8; + + ByteStream bs3(bs); + + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 12); + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 3); + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 0); + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 2); + } + + void bs_7() + { + size_t i; + + bs.reset(); + bap = new ByteStream::byte[ByteStream::BlockSize * 2]; + ByteStream::byte* bapp; + + for (bapp = &bap[0], i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0xa5; + + bs.append(bap, ByteStream::BlockSize); + CPPUNIT_ASSERT(bs.length() == (ByteStream::BlockSize * 1)); + + for (bapp = &bap[0], i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0x5a; + + bs.append(bap, ByteStream::BlockSize); + CPPUNIT_ASSERT(bs.length() == (ByteStream::BlockSize * 2)); + + for (bapp = &bap[0], i = 0; i < ByteStream::BlockSize * 2; bapp++, i++) + *bapp = 0x55; + + bs.append(bap, ByteStream::BlockSize * 2); + CPPUNIT_ASSERT(bs.length() == (ByteStream::BlockSize * 4)); + delete[] bap; + bap = new ByteStream::byte[bs.length()]; + // bs >> bap; + memcpy(bap, bs.buf(), bs.length()); + bap1 = new ByteStream::byte[bs.length()]; + + for (bapp = &bap1[0], i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0xa5; + + for (i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0x5a; + + for (i = 0; i < ByteStream::BlockSize * 2; bapp++, i++) + *bapp = 0x55; + + CPPUNIT_ASSERT(memcmp(bap, bap1, bs.length()) == 0); + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + } + + std::string getString() + { + static const std::string s( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore " + "et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut " + "aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse " + "cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in " + "culpa qui officia deserunt mollit anim id est laborum."); + return s; + } + + void bs_8() + { + bs.reset(); + string s; + s = "This is a test"; + bs << s; + string s1; + bs >> s1; + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(bs.length() == 0); + + bs.reset(); + s = getString(); + bs << s; + bs >> s1; + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(bs.length() == 0); + + u8 = 0xa5; + bs << u8; + u16 = 0x5aa5; + bs << u16; + u32 = 0xdeadbeef; + bs << u32; + bs << s; + s += s1; + bs << s; + s += s1; + bs << s; + bs << u32; + bs << u16; + bs << u8; + + bs >> u8; + CPPUNIT_ASSERT(u8 == 0xa5); + bs >> u16; + CPPUNIT_ASSERT(u16 == 0x5aa5); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> s; + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(s.length() == (s1.length() * 1)); + bs >> s; + CPPUNIT_ASSERT(s.length() == (s1.length() * 2)); + bs >> s; + CPPUNIT_ASSERT(s.length() == (s1.length() * 3)); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> u16; + CPPUNIT_ASSERT(u16 == 0x5aa5); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0xa5); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_9() + { + bs.reset(); + // Load up a bogus string (too short) + u32 = 100; + bs << u32; + bs.append(reinterpret_cast("This is a test"), 14); + string s; + // Should throw underflow + bs >> s; + } + + void bs_10() + { + bs.reset(); + bs1.reset(); + u32 = 0xdeadbeef; + bs << u32; + CPPUNIT_ASSERT(bs.length() == 4); + CPPUNIT_ASSERT(bs1.length() == 0); + bs.swap(bs1); + CPPUNIT_ASSERT(bs1.length() == 4); + CPPUNIT_ASSERT(bs.length() == 0); + bs1 >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + + bs.reset(); + bs1.reset(); + u32 = 0xdeadbeef; + bs << u32; + bs1 << u32; + bs += bs1; + CPPUNIT_ASSERT(bs.length() == 8); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + + bs.reset(); + bs1.reset(); + ByteStream bs2; + u32 = 0xdeadbeef; + bs1 << u32; + bs2 << u32; + bs = bs1 + bs2; + CPPUNIT_ASSERT(bs.length() == 8); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + } + + void bs_12() + { + bs.reset(); + + i128 = 10 * 100000000000000000000000000000000000000_xxl; + bs << i128; + CPPUNIT_ASSERT(bs.length() == 16); + i128 = 0; + bs >> i128; + CPPUNIT_ASSERT(i128 == static_cast(10 * 100000000000000000000000000000000000000_xxl)); + CPPUNIT_ASSERT(bs.length() == 0); + + u128 = 10 * 100000000000000000000000000000000000000_xxl; + bs << u128; + CPPUNIT_ASSERT(bs.length() == 16); + u128 = 0; + bs >> u128; + CPPUNIT_ASSERT(u128 == 10 * 100000000000000000000000000000000000000_xxl); + CPPUNIT_ASSERT(bs.length() == 0); + + u64 = 0xdeadbeefbadc0ffeLL; + bs << u64; + CPPUNIT_ASSERT(bs.length() == 8); + u64 = 0; + bs.peek(u64); + CPPUNIT_ASSERT(u64 == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 8); + u64 = 0; + bs >> u64; + CPPUNIT_ASSERT(u64 == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 0); + + u16 = 0xf00f; + bs << u16; + CPPUNIT_ASSERT(bs.length() == 2); + u16 = 0; + bs.peek(u16); + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 2); + u16 = 0; + bs >> u16; + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 0); + + u8 = 0x0f; + bs << u8; + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs.peek(u8); + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + + u32 = 0xdeadbeef; + bs << u32; + CPPUNIT_ASSERT(bs.length() == 4); + u32 = 0; + + u16 = 0xf00f; + bs << u16; + CPPUNIT_ASSERT(bs.length() == 6); + u16 = 0; + + u8 = 0x0f; + bs << u8; + CPPUNIT_ASSERT(bs.length() == 7); + u8 = 0; + + bs.peek(u32); + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 7); + u32 = 0; + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 3); + u16 = 0; + bs.peek(u16); + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 3); + u16 = 0; + bs >> u16; + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs.peek(u8); + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + + string s; + s = "This is a test"; + bs << s; + string s1; + bs.peek(s1); + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(bs.length() == s1.size() + 4); + CPPUNIT_ASSERT(!s1.empty()); + string s2; + bs >> s2; + CPPUNIT_ASSERT(s == s2); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_13() + { + bs.reset(); + std::string s = getString(); + bs << s; + ofstream of("bs_13.dat"); + of << bs; + of.close(); + + ifstream ifs; + ifs.open("./bs_13.dat"); + ifs.seekg(0, ios::end); + size_t ifs_len1 = ifs.tellg(); + // will be longer than orig file because string length is encoded into stream + CPPUNIT_ASSERT((s.size() + sizeof(ByteStream::quadbyte)) == ifs_len1); + ifs.seekg(0, ios::beg); + std::unique_ptr buf1(new char[ifs_len1]); + bs1.reset(); + ifs >> bs1; + ifs.close(); + CPPUNIT_ASSERT(bs.length() == bs1.length()); + string s1; + bs1 >> s1; + CPPUNIT_ASSERT(s == s1); + } + + void bs_14() + { + ByteStream bs1(0); + ByteStream bs2(bs1); + CPPUNIT_ASSERT(bs2.fBuf == 0); + ByteStream bs3(0); + bs3 = bs1; + CPPUNIT_ASSERT(bs3.fBuf == 0); + } + + void bs_15() + { + ByteStream b1, b2, empty; + uint8_t u8; + + CPPUNIT_ASSERT(b1 == b2); + CPPUNIT_ASSERT(b2 == b1); + CPPUNIT_ASSERT(b2 == empty); + CPPUNIT_ASSERT(b1 == empty); + + CPPUNIT_ASSERT(!(b1 != b2)); + CPPUNIT_ASSERT(!(b2 != b1)); + CPPUNIT_ASSERT(!(b2 != empty)); + CPPUNIT_ASSERT(!(b1 != empty)); + + b1 << "Woo hoo"; + + CPPUNIT_ASSERT(b1 != b2); + CPPUNIT_ASSERT(b2 != b1); + CPPUNIT_ASSERT(b1 != empty); + + CPPUNIT_ASSERT(!(b1 == b2)); + CPPUNIT_ASSERT(!(b2 == b1)); + CPPUNIT_ASSERT(!(b1 == empty)); + + b2 << "Woo hoo"; + + CPPUNIT_ASSERT(b1 == b2); + CPPUNIT_ASSERT(b2 == b1); + CPPUNIT_ASSERT(!(b1 != b2)); + CPPUNIT_ASSERT(!(b2 != b1)); + + b1 >> u8; + + CPPUNIT_ASSERT(b1 != b2); + CPPUNIT_ASSERT(b2 != b1); + CPPUNIT_ASSERT(!(b1 == b2)); + CPPUNIT_ASSERT(!(b2 == b1)); + + b1 << u8; + + CPPUNIT_ASSERT(b1 != b2); + CPPUNIT_ASSERT(b2 != b1); + CPPUNIT_ASSERT(!(b1 == b2)); + CPPUNIT_ASSERT(!(b2 == b1)); + + b2 >> u8; + b2 << u8; + + CPPUNIT_ASSERT(b1 == b2); + CPPUNIT_ASSERT(b2 == b1); + CPPUNIT_ASSERT(!(b1 != b2)); + CPPUNIT_ASSERT(!(b2 != b1)); + } + + void bs_16() + { + int i; + uint32_t len; + + bs.reset(); + srand(time(0)); + + for (i = 0; i < 10240; i++) + { + bs << (ByteStream::quadbyte)rand(); + } + + std::unique_ptr bp(new ByteStream::byte[bs.length()]); + ByteStream::byte* bpp = bp.get(); + std::unique_ptr bp1(new ByteStream::byte[bs.length()]); + ByteStream::byte* bpp1 = bp1.get(); + + len = bs.length(); + CPPUNIT_ASSERT(len == 10240 * 4); + bs.peek(bpp); + CPPUNIT_ASSERT(bs.length() == len); + CPPUNIT_ASSERT(memcmp(bpp, bs.buf(), len) == 0); + + bs >> bpp1; + CPPUNIT_ASSERT(bs.length() == 0); + CPPUNIT_ASSERT(memcmp(bpp, bpp1, len) == 0); + + bs.reset(); + } +}; + +#define TS_NS(x) (x) +#define TS_US(x) ((x) * 1000) +#define TS_MS(x) ((x) * 1000000) + +CPPUNIT_TEST_SUITE_REGISTRATION(ByteStreamTestSuite); + +#include +#include + +#include + +void setupSignalHandlers() +{ + struct sigaction ign; + + memset(&ign, 0, sizeof(ign)); + ign.sa_handler = SIG_IGN; + + sigaction(SIGPIPE, &ign, 0); +} + +int main(int argc, char** argv) +{ + setupSignalHandlers(); + + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest(registry.makeTest()); + bool wasSuccessful = runner.run("", false); + return (wasSuccessful ? 0 : 1); +} diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index f461e39f7..bf0c22fbd 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -16,6 +16,7 @@ MA 02110-1301, USA. */ #include "SocketPool.h" +#include "bytestream.h" #include "configcpp.h" #include "logger.h" #include "messageFormat.h" @@ -87,8 +88,8 @@ SocketPool::~SocketPool() int SocketPool::send_recv(messageqcpp::ByteStream& in, messageqcpp::ByteStream* out) { - uint count = 0; - uint length = in.length(); + messageqcpp::BSSizeType count = 0; + messageqcpp::BSSizeType length = in.length(); int sock = -1; const uint8_t* inbuf = in.buf(); ssize_t err = 0; diff --git a/utils/joiner/joinpartition.cpp b/utils/joiner/joinpartition.cpp index bccf89b38..362842b55 100644 --- a/utils/joiner/joinpartition.cpp +++ b/utils/joiner/joinpartition.cpp @@ -16,6 +16,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include "bytestream.h" #define _CRT_RAND_S // for win rand_s #include #include @@ -804,7 +805,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs) } uint64_t ret = 0; - size_t len = bs.length(); + BSSizeType len = bs.length(); idbassert(len != 0); fs.seekp(offset); diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index 6fad9767f..565b6eb1c 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -1,6 +1,6 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2019 MariaDB Corporation + Copyright (C) 2019-2024 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -46,7 +46,7 @@ namespace messageqcpp /* Copies only the data left to be read */ void ByteStream::doCopy(const ByteStream& rhs) { - uint32_t rlen = rhs.length(); + BSSizeType rlen = rhs.length(); if (fMaxLen < rlen) { @@ -94,7 +94,7 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs) return *this; } -ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0) +ByteStream::ByteStream(BSSizeType initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0) { if (initSize > 0) growBuf(initSize); @@ -102,13 +102,13 @@ ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0) void ByteStream::add(const uint8_t b) { - if (fBuf == 0 || (static_cast(fCurInPtr - fBuf) == fMaxLen + ISSOverhead)) + if (fBuf == 0 || (static_cast(fCurInPtr - fBuf) == fMaxLen + ISSOverhead)) growBuf(); *fCurInPtr++ = b; } -void ByteStream::growBuf(uint32_t toSize) +void ByteStream::growBuf(BSSizeType toSize) { if (fBuf == 0) { @@ -138,8 +138,8 @@ void ByteStream::growBuf(uint32_t toSize) toSize = std::max(toSize, fMaxLen * 2); uint8_t* t = new uint8_t[toSize + ISSOverhead]; - uint32_t curOutOff = fCurOutPtr - fBuf; - uint32_t curInOff = fCurInPtr - fBuf; + BSSizeType curOutOff = fCurOutPtr - fBuf; + BSSizeType curInOff = fCurInPtr - fBuf; memcpy(t, fBuf, fCurInPtr - fBuf); #ifdef ZERO_ON_NEW memset(t + (fCurInPtr - fBuf), 0, (toSize + ISSOverhead) - (fCurInPtr - fBuf)); @@ -169,7 +169,7 @@ void ByteStream::setLongStrings(const std::vector>& o ByteStream& ByteStream::operator<<(const int8_t b) { - if (fBuf == 0 || (fCurInPtr - fBuf + 1U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(b) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int8_t*)fCurInPtr) = b; @@ -187,7 +187,7 @@ ByteStream& ByteStream::operator<<(const uint8_t b) ByteStream& ByteStream::operator<<(const int16_t d) { - if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(d) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int16_t*)fCurInPtr) = d; @@ -198,7 +198,7 @@ ByteStream& ByteStream::operator<<(const int16_t d) ByteStream& ByteStream::operator<<(const uint16_t d) { - if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(d) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((uint16_t*)fCurInPtr) = d; @@ -209,7 +209,7 @@ ByteStream& ByteStream::operator<<(const uint16_t d) ByteStream& ByteStream::operator<<(const int32_t q) { - if (fBuf == 0 || (fCurInPtr - fBuf + 4U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(q) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int32_t*)fCurInPtr) = q; @@ -220,7 +220,7 @@ ByteStream& ByteStream::operator<<(const int32_t q) ByteStream& ByteStream::operator<<(const uint32_t q) { - if (fBuf == 0 || (fCurInPtr - fBuf + 4U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(q) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((uint32_t*)fCurInPtr) = q; @@ -231,7 +231,7 @@ ByteStream& ByteStream::operator<<(const uint32_t q) ByteStream& ByteStream::operator<<(const int64_t o) { - if (fBuf == 0 || (fCurInPtr - fBuf + 8U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(o) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int64_t*)fCurInPtr) = o; @@ -242,7 +242,7 @@ ByteStream& ByteStream::operator<<(const int64_t o) ByteStream& ByteStream::operator<<(const uint64_t o) { - if (fBuf == 0 || (fCurInPtr - fBuf + 8U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(o) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((uint64_t*)fCurInPtr) = o; @@ -251,20 +251,20 @@ ByteStream& ByteStream::operator<<(const uint64_t o) return *this; } -ByteStream& ByteStream::operator<<(const uint128_t& o) +ByteStream& ByteStream::operator<<(const uint128_t& h) { - if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(h) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); - datatypes::TSInt128::storeUnaligned(fCurInPtr, o); + datatypes::TSInt128::storeUnaligned(fCurInPtr, h); fCurInPtr += 16; return *this; } -ByteStream& ByteStream::operator<<(const int128_t& o) +ByteStream& ByteStream::operator<<(const int128_t& h) { - if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(h) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); - datatypes::TSInt128::storeUnaligned(fCurInPtr, o); + datatypes::TSInt128::storeUnaligned(fCurInPtr, h); fCurInPtr += 16; return *this; } @@ -475,18 +475,18 @@ void ByteStream::peek(uint64_t& o) const o = *((uint64_t*)fCurOutPtr); } -void ByteStream::peek(uint128_t& o) const +void ByteStream::peek(uint128_t& h) const { if (length() < 16) throw underflow_error("ByteStream>uint128_t: not enough data in stream to fill datatype"); - datatypes::TSInt128::assignPtrPtr(&o, fCurOutPtr); + datatypes::TSInt128::assignPtrPtr(&h, fCurOutPtr); } -void ByteStream::peek(int128_t& o) const +void ByteStream::peek(int128_t& h) const { if (length() < 16) throw underflow_error("ByteStream>int128_t: not enough data in stream to fill datatype"); - datatypes::TSInt128::assignPtrPtr(&o, fCurOutPtr); + datatypes::TSInt128::assignPtrPtr(&h, fCurOutPtr); } void ByteStream::peek(string& s) const @@ -519,7 +519,7 @@ void ByteStream::peek(string& s) const throw logging::ProtocolError("expected a string"); // we know len >= 0 by now... - if (length() < static_cast(len + 4)) + if (length() < static_cast(len + 4)) { #if DEBUG_DUMP_STRINGS_LESS_THAN > 0 cerr << "bs: wanted " << len + 4 << " bytes, but there are only " << length() << " remaining" << endl; @@ -531,13 +531,13 @@ void ByteStream::peek(string& s) const s.assign((char*)&fCurOutPtr[4], len); } -void ByteStream::load(const uint8_t* bp, uint32_t len) +void ByteStream::load(const uint8_t* bp, BSSizeType len) { // Do all the stuff that could throw an exception first if (bp == 0 && len != 0) throw invalid_argument("ByteStream::load: bp cannot equal 0 when len is not equal to 0"); - uint32_t newMaxLen = (len + BlockSize - 1) / BlockSize * BlockSize; + BSSizeType newMaxLen = (len + BlockSize - 1) / BlockSize * BlockSize; if (len > fMaxLen) { @@ -551,7 +551,7 @@ void ByteStream::load(const uint8_t* bp, uint32_t len) fCurInPtr = fBuf + len + ISSOverhead; } -void ByteStream::append(const uint8_t* bp, uint32_t len) +void ByteStream::append(const uint8_t* bp, BSSizeType len) { if (len == 0) return; @@ -559,7 +559,7 @@ void ByteStream::append(const uint8_t* bp, uint32_t len) if (bp == 0) throw invalid_argument("ByteStream::append: bp cannot equal 0 when len is not equal to 0"); - uint32_t newSize = static_cast(fCurInPtr - fBuf + len); + BSSizeType newSize = static_cast(fCurInPtr - fBuf + len); if (fBuf == 0 || (newSize > fMaxLen)) growBuf(newSize); @@ -635,7 +635,7 @@ void ByteStream::serialize(ByteStream& bs) const void ByteStream::deserialize(ByteStream& bs) { - uint32_t len; + BSSizeType len; restart(); bs >> len; @@ -643,9 +643,9 @@ void ByteStream::deserialize(ByteStream& bs) bs.advance(len); } -void ByteStream::needAtLeast(size_t amount) +void ByteStream::needAtLeast(BSSizeType amount) { - size_t currentSpace; + BSSizeType currentSpace; currentSpace = fMaxLen - (fCurInPtr - (fBuf + ISSOverhead)); @@ -656,7 +656,7 @@ void ByteStream::needAtLeast(size_t amount) ByteStream& ByteStream::operator<<(const ByteStream& bs) { - uint32_t len = bs.length(); + BSSizeType len = bs.length(); *this << len; @@ -668,20 +668,20 @@ ByteStream& ByteStream::operator<<(const ByteStream& bs) ByteStream& ByteStream::operator>>(ByteStream& bs) { peek(bs); - fCurOutPtr += 4 + bs.length(); + fCurOutPtr += sizeof(BSSizeType) + bs.length(); return *this; } void ByteStream::peek(ByteStream& bs) const { - uint32_t len; + BSSizeType len; peek(len); if (length() < len) throw underflow_error("ByteStream>ByteStream: not enough data in stream to fill datatype"); - bs.load(&fCurOutPtr[4], len); + bs.load(&fCurOutPtr[sizeof(len)], len); } ByteStream& ByteStream::operator<<(const uuid& u) @@ -707,7 +707,7 @@ void ByteStream::peek(uuid& u) const ByteStream& ByteStream::operator<<(const float f) { - int sz = sizeof(float); + const constexpr BSSizeType sz = sizeof(float); if (fBuf == 0 || (fCurInPtr - fBuf + sz > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); @@ -719,7 +719,7 @@ ByteStream& ByteStream::operator<<(const float f) } ByteStream& ByteStream::operator<<(const double d) { - int sz = sizeof(double); + const constexpr BSSizeType sz = sizeof(double); if (fBuf == 0 || (fCurInPtr - fBuf + sz > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); @@ -731,7 +731,7 @@ ByteStream& ByteStream::operator<<(const double d) } ByteStream& ByteStream::operator<<(const long double d) { - int sz = sizeof(long double); + const constexpr BSSizeType sz = sizeof(long double); if (fBuf == 0 || (fCurInPtr - fBuf + sz > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index 872956167..ff9606298 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -45,6 +45,7 @@ class ByteStreamTestSuite; namespace messageqcpp { typedef boost::shared_ptr SBS; +using BSSizeType = uint64_t; /** * @brief A class to marshall bytes as a stream @@ -76,11 +77,11 @@ class ByteStream : public Serializeable /** * default ctor */ - EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best + EXPORT explicit ByteStream(BSSizeType initSize = 8192); // multiples of pagesize are best /** * ctor with a uint8_t array and len initializer */ - inline ByteStream(const uint8_t* bp, const uint32_t len); + inline ByteStream(const uint8_t* bp, const BSSizeType len); /** * copy ctor */ @@ -337,12 +338,12 @@ class ByteStream : public Serializeable /** * load the stream from an array. Clears out any previous data. */ - EXPORT void load(const uint8_t* bp, uint32_t len); + EXPORT void load(const uint8_t* bp, BSSizeType len); /** * append bytes to the end of the stream. */ - EXPORT void append(const uint8_t* bp, uint32_t len); + EXPORT void append(const uint8_t* bp, BSSizeType len); /** * equality check on buffer contents. @@ -378,19 +379,19 @@ class ByteStream : public Serializeable * advance the output ptr without having to extract bytes * @warning be careful advancing near 4GB! */ - inline void advance(uint32_t amt); + inline void advance(BSSizeType amt); /** * returns the length of the queue (in bytes) * @warning do not attempt to make a ByteStream bigger than 4GB! */ - inline uint32_t length() const; + inline BSSizeType length() const; inline bool empty() const; /** * returns the length of the queue, including header overhead (in bytes) */ - inline uint32_t lengthWithHdrOverhead() const; + inline BSSizeType lengthWithHdrOverhead() const; /** * clears the stream. Releases any current stream and sets all pointers to 0. The state of the object @@ -422,7 +423,7 @@ class ByteStream : public Serializeable /** * Get the allocated size of the buffer. */ - inline uint32_t getBufferSize() const; + inline BSSizeType getBufferSize() const; /** * Serializeable interface @@ -437,10 +438,10 @@ class ByteStream : public Serializeable /** * memory allocation chunk size */ - EXPORT static const uint32_t BlockSize = 4096; + EXPORT static const BSSizeType BlockSize = 4096; /** size of the space we want in front of the data */ - EXPORT static const uint32_t ISSOverhead = + EXPORT static const BSSizeType ISSOverhead = 3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings. // Methods to get and set `long strings`. @@ -458,7 +459,7 @@ class ByteStream : public Serializeable /** * adds another BlockSize bytes to the internal buffer */ - void growBuf(uint32_t toSize = 0); + void growBuf(BSSizeType toSize = 0); /** * handles member copying from one ByteStream to another */ @@ -476,9 +477,8 @@ class ByteStream : public Serializeable uint8_t* fBuf; /// the start of the allocated buffer uint8_t* fCurInPtr; // the point in fBuf where data is inserted next uint8_t* fCurOutPtr; // the point in fBuf where data is extracted from next - uint32_t fMaxLen; // how big fBuf is currently - // Stores `long strings`. - std::vector> longStrings; + BSSizeType fMaxLen; // how big fBuf is currently + std::vector> longStrings; // Stores `long strings`. }; template @@ -527,7 +527,7 @@ static const uint8_t BS_BLOB = 9; static const uint8_t BS_SERIALIZABLE = 10; static const uint8_t BS_UUID = 11; -inline ByteStream::ByteStream(const uint8_t* bp, const uint32_t len) : fBuf(0), fMaxLen(0) +inline ByteStream::ByteStream(const uint8_t* bp, const BSSizeType len) : fBuf(0), fMaxLen(0) { load(bp, len); } @@ -544,15 +544,15 @@ inline uint8_t* ByteStream::buf() { return fCurOutPtr; } -inline uint32_t ByteStream::length() const +inline BSSizeType ByteStream::length() const { - return (uint32_t)(fCurInPtr - fCurOutPtr); + return static_cast(fCurInPtr - fCurOutPtr); } inline bool ByteStream::empty() const { return (length() == 0); } -inline uint32_t ByteStream::lengthWithHdrOverhead() const +inline BSSizeType ByteStream::lengthWithHdrOverhead() const { return (length() + ISSOverhead); } @@ -570,7 +570,7 @@ inline void ByteStream::rewind() { fCurOutPtr = fBuf + ISSOverhead; } -inline void ByteStream::advance(uint32_t adv) +inline void ByteStream::advance(BSSizeType adv) { // fCurOutPtr is always >= fBuf, so fCurOutPtr - fBuf is >= 0, and this difference is always <= 32 bits // there is an edge condition not detected here: if fCurOutPtr - fBuf is nearly 4GB and you try to @@ -619,7 +619,7 @@ inline ByteStream& ByteStream::operator=(const SBS& rhs) return *this; } -inline uint32_t ByteStream::getBufferSize() const +inline BSSizeType ByteStream::getBufferSize() const { return fMaxLen; } @@ -738,12 +738,6 @@ void deserializeSet(ByteStream& bs, std::set& s) s.insert(tmp); } } -/* -template<> -struct ByteStream::_ByteStreamType<1, ByteStream::byte>> -{ - typedef ByteStream::byte type; -}*/ } // namespace messageqcpp diff --git a/utils/messageqcpp/compressed_iss.cpp b/utils/messageqcpp/compressed_iss.cpp index 82fdf0514..c194a41ad 100644 --- a/utils/messageqcpp/compressed_iss.cpp +++ b/utils/messageqcpp/compressed_iss.cpp @@ -20,6 +20,7 @@ * * ***********************************************************************/ +#include "bytestream.h" #include "mcsconfig.h" #include @@ -117,7 +118,7 @@ const SBS CompressedInetStreamSocket::read(const struct timespec* timeout, bool* void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats) { - size_t len = msg.length(); + BSSizeType len = msg.length(); if (useCompression && (len > 512)) { @@ -126,6 +127,9 @@ void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats) alg->compress((char*)msg.buf(), len, (char*)smsg.getInputPtr() + HEADER_SIZE, &outLen); // Save original len. + // !!! + // !!! Reducing BS size type from 64bit down to 32 and potentially loosing data. + // !!! *(uint32_t*)smsg.getInputPtr() = len; smsg.advanceInputPtr(outLen + HEADER_SIZE); diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 1e94e0b66..7df141192 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -572,6 +572,9 @@ void InetStreamSocket::write(SBS msg, Stats* stats) void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stats* stats) const { + // !!! + // !!! Reducing BS size type from 64bit down to 32 and potentially loosing data. + // !!! uint32_t msglen = msg.length(); uint32_t magic = whichMagic; uint32_t* realBuf; diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index b8f835d7f..69049542a 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -31,7 +31,6 @@ #include using namespace std; - #include #include "bytestream.h" @@ -49,8 +48,6 @@ namespace rowgroup { using cscType = execplan::CalpontSystemCatalog::ColDataType; - - StringStore::~StringStore() { #if 0 @@ -302,47 +299,27 @@ void UserDataStore::deserialize(ByteStream& bs) return; } - RGData::RGData(const RowGroup& rg, uint32_t rowCount) { - // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; - rowData.reset(new uint8_t[rg.getDataSize(rowCount)]); + RGDataSizeType s = rg.getDataSize(rowCount); + rowData.reset(new uint8_t[s]); if (rg.usesStringTable() && rowCount > 0) strings.reset(new StringStore()); userDataStore.reset(); - - -#ifdef VALGRIND - /* In a PM-join, we can serialize entire tables; not every value has been - * filled in yet. Need to look into that. Valgrind complains that - * those bytes are uninitialized, this suppresses that error. - */ - memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily -#endif - memset(rowData.get(), 0, rg.getDataSize(rowCount)); // XXXPAT: make valgrind happy temporarily columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } RGData::RGData(const RowGroup& rg) { - // cout << "rgdata++ = " << __sync_add_and_fetch(&rgDataCount, 1) << endl; rowData.reset(new uint8_t[rg.getMaxDataSize()]); if (rg.usesStringTable()) strings.reset(new StringStore()); userDataStore.reset(); - -#ifdef VALGRIND - /* In a PM-join, we can serialize entire tables; not every value has been - * filled in yet. Need to look into that. Valgrind complains that - * those bytes are uninitialized, this suppresses that error. - */ - memset(rowData.get(), 0, rg.getMaxDataSize()); -#endif columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } @@ -356,14 +333,6 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount) strings.reset(new StringStore()); else strings.reset(); - -#ifdef VALGRIND - /* In a PM-join, we can serialize entire tables; not every value has been - * filled in yet. Need to look into that. Valgrind complains that - * those bytes are uninitialized, this suppresses that error. - */ - memset(rowData.get(), 0, rg.getDataSize(rowCount)); -#endif columnCount = rg.getColumnCount(); rowSize = rg.getRowSize(); } @@ -373,11 +342,11 @@ void RGData::reinit(const RowGroup& rg) reinit(rg, 8192); } -void RGData::serialize(ByteStream& bs, uint32_t amount) const +void RGData::serialize(ByteStream& bs, RGDataSizeType amount) const { // cout << "serializing!\n"; bs << (uint32_t)RGDATA_SIG; - bs << (uint32_t)amount; + bs << amount; bs << columnCount; bs << rowSize; bs.append(rowData.get(), amount); @@ -399,9 +368,10 @@ void RGData::serialize(ByteStream& bs, uint32_t amount) const bs << (uint8_t)0; } -void RGData::deserialize(ByteStream& bs, uint32_t defAmount) +void RGData::deserialize(ByteStream& bs, RGDataSizeType defAmount) { - uint32_t amount, sig; + uint32_t sig; + RGDataSizeType amount; uint8_t* buf; uint8_t tmp8; @@ -642,7 +612,7 @@ string Row::toCSV() const void Row::setToNull(uint32_t colIndex) { - setNullMark(colIndex, true); // mark as null. + setNullMark(colIndex, true); // mark as null. switch (types[colIndex]) { case CalpontSystemCatalog::TINYINT: data[offsets[colIndex]] = joblist::TINYINTNULL; break; @@ -665,11 +635,11 @@ void Row::setToNull(uint32_t colIndex) *((int32_t*)&data[offsets[colIndex]]) = static_cast(joblist::DATENULL); break; - case CalpontSystemCatalog::BIGINT: - if (precision[colIndex] != MagicPrecisionForCountAgg) - *((uint64_t*)&data[offsets[colIndex]]) = joblist::BIGINTNULL; - else // work around for count() in outer join result. - *((uint64_t*)&data[offsets[colIndex]]) = 0; + case CalpontSystemCatalog::BIGINT: + if (precision[colIndex] != MagicPrecisionForCountAgg) + *((uint64_t*)&data[offsets[colIndex]]) = joblist::BIGINTNULL; + else // work around for count() in outer join result. + *((uint64_t*)&data[offsets[colIndex]]) = 0; break; @@ -680,9 +650,13 @@ void Row::setToNull(uint32_t colIndex) *((long double*)&data[offsets[colIndex]]) = joblist::LONGDOUBLENULL; break; - case CalpontSystemCatalog::DATETIME: *((uint64_t*)&data[offsets[colIndex]]) = joblist::DATETIMENULL; break; + case CalpontSystemCatalog::DATETIME: + *((uint64_t*)&data[offsets[colIndex]]) = joblist::DATETIMENULL; + break; - case CalpontSystemCatalog::TIMESTAMP: *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMESTAMPNULL; break; + case CalpontSystemCatalog::TIMESTAMP: + *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMESTAMPNULL; + break; case CalpontSystemCatalog::TIME: *((uint64_t*)&data[offsets[colIndex]]) = joblist::TIMENULL; break; @@ -716,9 +690,7 @@ void Row::setToNull(uint32_t colIndex) case 7: case 8: *((uint64_t*)&data[offsets[colIndex]]) = joblist::CHAR8NULL; break; - default: - setNullMark(colIndex, true); - break; + default: setNullMark(colIndex, true); break; } break; @@ -751,7 +723,9 @@ void Row::setToNull(uint32_t colIndex) case CalpontSystemCatalog::UTINYINT: data[offsets[colIndex]] = joblist::UTINYINTNULL; break; - case CalpontSystemCatalog::USMALLINT: *((uint16_t*)&data[offsets[colIndex]]) = joblist::USMALLINTNULL; break; + case CalpontSystemCatalog::USMALLINT: + *((uint16_t*)&data[offsets[colIndex]]) = joblist::USMALLINTNULL; + break; case CalpontSystemCatalog::UMEDINT: case CalpontSystemCatalog::UINT: *((uint32_t*)&data[offsets[colIndex]]) = joblist::UINTNULL; break; @@ -760,8 +734,8 @@ void Row::setToNull(uint32_t colIndex) default: ostringstream os; - os << "Row::initToNull(): got bad column type (" << types[colIndex] << "). Width=" << getColumnWidth(colIndex) - << endl; + os << "Row::initToNull(): got bad column type (" << types[colIndex] + << "). Width=" << getColumnWidth(colIndex) << endl; os << toString(); throw logic_error(os.str()); } @@ -870,8 +844,8 @@ bool Row::isNullValue(uint32_t colIndex) const return strings->isNullValue(offset); } -// if (data[offsets[colIndex]] == 0) // empty string -// return true; + // if (data[offsets[colIndex]] == 0) // empty string + // return true; switch (len) { @@ -1126,7 +1100,6 @@ RowGroup::RowGroup(const RowGroup& r) offsets = &stOffsets[0]; else if (!useStringTable && !oldOffsets.empty()) offsets = &oldOffsets[0]; - } RowGroup& RowGroup::operator=(const RowGroup& r) @@ -1241,27 +1214,28 @@ void RowGroup::serializeRGData(ByteStream& bs) const rgData->serialize(bs, getDataSize()); } -uint32_t RowGroup::getDataSize() const +RGDataSizeType RowGroup::getDataSize() const { return getDataSize(getRowCount()); } -uint32_t RowGroup::getDataSize(uint64_t n) const +RGDataSizeType RowGroup::getDataSize(uint64_t n) const { - return headerSize + (n * getRowSize()); + return headerSize + (n * static_cast(getRowSize())); } -uint32_t RowGroup::getMaxDataSize() const +RGDataSizeType RowGroup::getMaxDataSize() const { - return headerSize + (8192 * getRowSize()); + return headerSize + (static_cast(rgCommonSize) * static_cast(getRowSize())); } -uint32_t RowGroup::getMaxDataSizeWithStrings() const +RGDataSizeType RowGroup::getMaxDataSizeWithStrings() const { - return headerSize + (8192 * (oldOffsets[columnCount] + columnCount)); + return headerSize + + (static_cast(rgCommonSize) * static_cast(getRowSizeWithStrings())); } -uint32_t RowGroup::getEmptySize() const +RGDataSizeType RowGroup::getEmptySize() const { return headerSize; } @@ -1331,9 +1305,8 @@ string RowGroup::toString(const std::vector& used) const os << "rowcount = " << getRowCount() << endl; if (!used.empty()) { - uint64_t cnt = - std::accumulate(used.begin(), used.end(), 0ULL, - [](uint64_t a, uint64_t bits) { return a + __builtin_popcountll(bits); }); + uint64_t cnt = std::accumulate(used.begin(), used.end(), 0ULL, [](uint64_t a, uint64_t bits) + { return a + __builtin_popcountll(bits); }); os << "sparse row count = " << cnt << endl; } os << "base rid = " << getBaseRid() << endl; diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index 113f0ae6e..62fd3a306 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -62,6 +62,7 @@ namespace rowgroup { const int16_t rgCommonSize = 8192; +using RGDataSizeType = uint64_t; /* The RowGroup family of classes encapsulate the data moved through the @@ -270,14 +271,14 @@ class RGData // amount should be the # returned by RowGroup::getDataSize() - void serialize(messageqcpp::ByteStream&, uint32_t amount) const; + void serialize(messageqcpp::ByteStream&, RGDataSizeType amount) const; // the 'hasLengthField' is there b/c PM aggregation (and possibly others) currently sends // inline data with a length field. Once that's converted to string table format, that // option can go away. - void deserialize(messageqcpp::ByteStream&, uint32_t amount = 0); // returns the # of bytes read + void deserialize(messageqcpp::ByteStream&, RGDataSizeType amount = 0); // returns the # of bytes read - inline uint64_t getStringTableMemUsage(); + inline RGDataSizeType getStringTableMemUsage(); void clear(); void reinit(const RowGroup& rg); void reinit(const RowGroup& rg, uint32_t rowCount); @@ -1499,15 +1500,15 @@ class RowGroup : public messageqcpp::Serializeable uint32_t getDBRoot() const; void setDBRoot(uint32_t); - uint32_t getDataSize() const; - uint32_t getDataSize(uint64_t n) const; - uint32_t getMaxDataSize() const; - uint32_t getMaxDataSizeWithStrings() const; - uint32_t getEmptySize() const; + RGDataSizeType getDataSize() const; + RGDataSizeType getDataSize(uint64_t n) const; + RGDataSizeType getMaxDataSize() const; + RGDataSizeType getMaxDataSizeWithStrings() const; + RGDataSizeType getEmptySize() const; // this returns the size of the row data with the string table - inline uint64_t getSizeWithStrings() const; - inline uint64_t getSizeWithStrings(uint64_t n) const; + inline RGDataSizeType getSizeWithStrings() const; + inline RGDataSizeType getSizeWithStrings(uint64_t n) const; // sets the row count to 0 and the baseRid to something // effectively initializing whatever chunk of memory @@ -1628,11 +1629,11 @@ class RowGroup : public messageqcpp::Serializeable uint32_t sTableThreshold = 20; std::shared_ptr forceInline; - static const uint32_t headerSize = 18; - static const uint32_t rowCountOffset = 0; - static const uint32_t baseRidOffset = 4; - static const uint32_t statusOffset = 12; - static const uint32_t dbRootOffset = 14; + static const uint64_t headerSize = 18; + static const uint64_t rowCountOffset = 0; + static const uint64_t baseRidOffset = 4; + static const uint64_t statusOffset = 12; + static const uint64_t dbRootOffset = 14; }; inline uint64_t convertToRid(const uint32_t& partNum, const uint16_t& segNum, const uint8_t& extentNum, @@ -1778,7 +1779,7 @@ inline uint32_t RowGroup::getRowSizeWithStrings() const return oldOffsets[columnCount] + columnCount; } -inline uint64_t RowGroup::getSizeWithStrings(uint64_t n) const +inline RGDataSizeType RowGroup::getSizeWithStrings(uint64_t n) const { if (strings == nullptr) return getDataSize(n); diff --git a/utils/rowgroup/rowstorage.cpp b/utils/rowgroup/rowstorage.cpp index fe07fe8b7..3c9a3aee0 100644 --- a/utils/rowgroup/rowstorage.cpp +++ b/utils/rowgroup/rowstorage.cpp @@ -689,6 +689,7 @@ class RowGroupStorage if (fRGDatas[rgid]) { fRowGroupOut->setData(fRGDatas[rgid].get()); + // An implicit s2u type cast. int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); if (!fMM->acquire(memSz)) { @@ -965,7 +966,7 @@ class RowGroupStorage while (rgid >= fRGDatas.size()) { - int64_t memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); + auto memSz = fRowGroupOut->getSizeWithStrings(fMaxRows); if (!fMM->acquire(memSz)) { throw logging::IDBExcept( diff --git a/utils/windowfunction/idborderby.cpp b/utils/windowfunction/idborderby.cpp index ec4008062..1c6fd9528 100644 --- a/utils/windowfunction/idborderby.cpp +++ b/utils/windowfunction/idborderby.cpp @@ -753,7 +753,7 @@ void IdbOrderBy::initialize(const RowGroup& rg) // initialize rows IdbCompare::initialize(rg); - uint64_t newSize = rg.getSizeWithStrings(fRowsPerRG); + auto newSize = rg.getSizeWithStrings(fRowsPerRG); if (fRm && !fRm->getMemory(newSize, fSessionMemLimit)) { cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__; diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index b4f648eb7..27e0eece0 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -400,8 +400,7 @@ void SlaveComm::do_createStripeColumnExtents(ByteStream& msg) if (printOnly) { - cout << "createStripeColumnExtents(). " - << "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl; + cout << "createStripeColumnExtents(). " << "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl; for (uint32_t i = 0; i < cols.size(); i++) cout << "StripeColExt arg " << i + 1 << ": oid=" << cols[i].oid << " width=" << cols[i].width << endl; @@ -2183,6 +2182,9 @@ void SlaveComm::saveDelta() { try { + // !!! + // !!! Reducing BS size type from 64bit down to 32 and potentially loosing data. + // !!! const uint32_t deltaLen = delta.length(); const uint32_t bufferSize = sizeof(deltaLen) + deltaLen; std::unique_ptr buffer(new char[bufferSize]); diff --git a/writeengine/server/we_dataloader.cpp b/writeengine/server/we_dataloader.cpp index 25ae1fbe8..ce96f28ee 100644 --- a/writeengine/server/we_dataloader.cpp +++ b/writeengine/server/we_dataloader.cpp @@ -437,7 +437,7 @@ void WEDataLoader::pushData2Cpimport(ByteStream& Ibs) { if (Ibs.length() > 0) { - int aLen = Ibs.length(); + messageqcpp::BSSizeType aLen = Ibs.length(); char* pStart = reinterpret_cast(Ibs.buf()); char* pEnd = pStart + aLen; char* pPtr = pStart; diff --git a/writeengine/server/we_dataloader.h b/writeengine/server/we_dataloader.h index 86ce2904d..ad43609c7 100644 --- a/writeengine/server/we_dataloader.h +++ b/writeengine/server/we_dataloader.h @@ -30,6 +30,7 @@ #pragma once +#include "bytestream.h" #include "rwlock_local.h" #include "resourcemanager.h" @@ -58,32 +59,32 @@ class WEDataLoader : public Observer public: bool setupCpimport(); // fork the cpimport void teardownCpimport(bool useStoredWaitPidStatus); // @bug 4267 - void pushData2Cpimport(ByteStream& Ibs); // push data to cpimport from the queue + void pushData2Cpimport(messageqcpp::ByteStream& Ibs); // push data to cpimport from the queue void closeWritePipe(); void str2Argv(std::string CmdLine, std::vector& V); public: - void onReceiveKeepAlive(ByteStream& Ibs); - void onReceiveData(ByteStream& Ibs); - void onReceiveEod(ByteStream& Ibs); // end of data - void onReceiveMode(ByteStream& Ibs); + void onReceiveKeepAlive(messageqcpp::ByteStream& Ibs); + void onReceiveData(messageqcpp::ByteStream& Ibs); + void onReceiveEod(messageqcpp::ByteStream& Ibs); // end of data + void onReceiveMode(messageqcpp::ByteStream& Ibs); // void onReceiveCmd(messageqcpp::SBS bs);// {(ByteStream& Ibs); - void onReceiveCmd(ByteStream& bs); // {(ByteStream& Ibs); - void onReceiveAck(ByteStream& Ibs); - void onReceiveNak(ByteStream& Ibs); - void onReceiveError(ByteStream& Ibs); + void onReceiveCmd(messageqcpp::ByteStream& bs); // {(ByteStream& Ibs); + void onReceiveAck(messageqcpp::ByteStream& Ibs); + void onReceiveNak(messageqcpp::ByteStream& Ibs); + void onReceiveError(messageqcpp::ByteStream& Ibs); - void onReceiveJobId(ByteStream& Ibs); - void onReceiveJobData(ByteStream& Ibs); - void onReceiveImportFileName(ByteStream& Ibs); - void onReceiveCmdLineArgs(ByteStream& Ibs); + void onReceiveJobId(messageqcpp::ByteStream& Ibs); + void onReceiveJobData(messageqcpp::ByteStream& Ibs); + void onReceiveImportFileName(messageqcpp::ByteStream& Ibs); + void onReceiveCmdLineArgs(messageqcpp::ByteStream& Ibs); void onReceiveStartCpimport(); - void onReceiveBrmRptFileName(ByteStream& Ibs); - void onReceiveCleanup(ByteStream& Ibs); - void onReceiveRollback(ByteStream& Ibs); + void onReceiveBrmRptFileName(messageqcpp::ByteStream& Ibs); + void onReceiveCleanup(messageqcpp::ByteStream& Ibs); + void onReceiveRollback(messageqcpp::ByteStream& Ibs); - void onReceiveErrFileRqst(ByteStream& Ibs); - void onReceiveBadFileRqst(ByteStream& Ibs); + void onReceiveErrFileRqst(messageqcpp::ByteStream& Ibs); + void onReceiveBadFileRqst(messageqcpp::ByteStream& Ibs); void onCpimportSuccess(); void onCpimportFailure(); @@ -103,11 +104,11 @@ class WEDataLoader : public Observer { fMode = Mode; } - void updateTxBytes(unsigned int Tx) + void updateTxBytes(messageqcpp::BSSizeType Tx) { fTxBytes += Tx; } - void updateRxBytes(unsigned int Rx) + void updateRxBytes(messageqcpp::BSSizeType Rx) { fRxBytes += Rx; } @@ -132,11 +133,11 @@ class WEDataLoader : public Observer fObjId = ObjId; } - unsigned int getTxBytes() + messageqcpp::BSSizeType getTxBytes() { return fTxBytes; } - unsigned int getRxBytes() + messageqcpp::BSSizeType getRxBytes() { return fRxBytes; } @@ -172,8 +173,8 @@ class WEDataLoader : public Observer int fMode; std::ofstream fDataDumpFile; std::ofstream fJobFile; - unsigned int fTxBytes; - unsigned int fRxBytes; + messageqcpp::BSSizeType fTxBytes; + messageqcpp::BSSizeType fRxBytes; char fPmId; int fObjId; // Object Identifier for logging diff --git a/writeengine/splitter/we_sdhandler.cpp b/writeengine/splitter/we_sdhandler.cpp index b076c91c2..d4eb6af6e 100644 --- a/writeengine/splitter/we_sdhandler.cpp +++ b/writeengine/splitter/we_sdhandler.cpp @@ -2289,7 +2289,7 @@ int WESDHandler::getNextDbrPm2Send() int WESDHandler::leastDataSendPm() { - unsigned int aTx = 0; + messageqcpp::BSSizeType aTx = 0; int aPmId = 0; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) diff --git a/writeengine/splitter/we_splclient.cpp b/writeengine/splitter/we_splclient.cpp index 8bcbdcd56..b4eee7e16 100644 --- a/writeengine/splitter/we_splclient.cpp +++ b/writeengine/splitter/we_splclient.cpp @@ -208,7 +208,7 @@ void WESplClient::send() messageqcpp::SBS aSbs = fSendQueue.front(); fSendQueue.pop(); aLock.unlock(); - int aLen = (*aSbs).length(); + messageqcpp::BSSizeType aLen = (*aSbs).length(); if (aLen > 0) { @@ -241,7 +241,7 @@ void WESplClient::recv() rm_ts.tv_sec = fRdSecTo; // 0 when data sending otherwise 1- second rm_ts.tv_nsec = 20000000; // 20 milliSec bool isTimeOut = false; - int aLen = 0; + messageqcpp::BSSizeType aLen = 0; try { diff --git a/writeengine/splitter/we_splclient.h b/writeengine/splitter/we_splclient.h index 51db86399..e38ee0d25 100644 --- a/writeengine/splitter/we_splclient.h +++ b/writeengine/splitter/we_splclient.h @@ -29,7 +29,6 @@ #pragma once -#include "threadsafequeue.h" #include "resourcemanager.h" #include "we_messages.h" @@ -122,11 +121,11 @@ class WESplClient { return fRowTx; } - uint32_t getBytesRcv() const + messageqcpp::BSSizeType getBytesRcv() const { return fBytesRcv; } - uint32_t getBytesTx() + messageqcpp::BSSizeType getBytesTx() { boost::mutex::scoped_lock aLock(fTxMutex); return fBytesTx; @@ -214,17 +213,17 @@ class WESplClient { return fIpAddress; } - void setBytesRcv(uint32_t BytesRcv) + void setBytesRcv(messageqcpp::BSSizeType BytesRcv) { fBytesRcv = BytesRcv; } - void setBytesTx(uint32_t BytesTx) + void setBytesTx(messageqcpp::BSSizeType BytesTx) { boost::mutex::scoped_lock aLock(fTxMutex); fBytesTx = BytesTx; aLock.unlock(); } - void updateBytesTx(uint32_t fBytes) + void updateBytesTx(messageqcpp::BSSizeType fBytes) { boost::mutex::scoped_lock aLock(fTxMutex); fBytesTx += fBytes; @@ -358,8 +357,8 @@ class WESplClient int fDataRqstCnt; // Data request count long fRdSecTo; // read timeout sec unsigned int fRowTx; // No. Of Rows Transmitted - uint32_t fBytesTx; - uint32_t fBytesRcv; + messageqcpp::BSSizeType fBytesTx; + messageqcpp::BSSizeType fBytesRcv; time_t fLastInTime; time_t fStartTime; bool fSend;