diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 079757126..b7632f301 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -80,6 +80,10 @@ if (WITH_UNITTESTS) target_link_libraries(comparators_tests ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) add_test(NAME columnstore:comparators_tests COMMAND comparators_tests) + add_executable(bytestream bytestream.cpp) + target_link_libraries(bytestream ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) + add_test(NAME columnstore:bytestream COMMAND bytestream) + # standalone EM routines test add_executable(brm_em_standalone brm-em-standalone.cpp) target_link_libraries(brm_em_standalone ${ENGINE_LDFLAGS} ${ENGINE_WRITE_LIBS} ${CPPUNIT_LIBRARIES} cppunit) diff --git a/tests/bytestream.cpp b/tests/bytestream.cpp new file mode 100644 index 000000000..af2f846cb --- /dev/null +++ b/tests/bytestream.cpp @@ -0,0 +1,942 @@ +/* Copyright (C) 2024 MariaDB Corporation. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include +#include +#include +#include +using namespace std; +#include +#include +#include +#include +#include +#include + +#include + +#include "bytestream.h" +using namespace messageqcpp; +#include "configcpp.h" +using namespace config; +#include "mcs_decimal.h" + +class ByteStreamTestSuite : public CppUnit::TestFixture +{ + CPPUNIT_TEST_SUITE(ByteStreamTestSuite); + + CPPUNIT_TEST(bs_1); + CPPUNIT_TEST(bs_1_1); + CPPUNIT_TEST(bs_1_2); + CPPUNIT_TEST(bs_2); + CPPUNIT_TEST(bs_3); + CPPUNIT_TEST(bs_4); + CPPUNIT_TEST_EXCEPTION(bs_5_1, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_2, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_3, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_4, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_5, std::underflow_error); + CPPUNIT_TEST_EXCEPTION(bs_5_6, std::underflow_error); + CPPUNIT_TEST(bs_6); + CPPUNIT_TEST(bs_7); + CPPUNIT_TEST(bs_8); + CPPUNIT_TEST_EXCEPTION(bs_9, std::underflow_error); + CPPUNIT_TEST(bs_10); + CPPUNIT_TEST(bs_12); + CPPUNIT_TEST(bs_13); + CPPUNIT_TEST(bs_14); + CPPUNIT_TEST(bs_15); + CPPUNIT_TEST(bs_16); + CPPUNIT_TEST_SUITE_END(); + + private: + ByteStream::byte b; + ByteStream::doublebyte d; + ByteStream::quadbyte q; + ByteStream::octbyte o; + + uint8_t u8; + uint16_t u16; + uint32_t u32; + uint64_t u64; + uint128_t u128; + int8_t i8; + int16_t i16; + int32_t i32; + int64_t i64; + int128_t i128; + + ByteStream bs; + ByteStream bs1; + + ByteStream::byte* bap; + ByteStream::byte* bap1; + + int len; + + public: + void setUp() + { + bs.reset(); + bs1.reset(); + bap = 0; + bap1 = 0; + } + + void tearDown() + { + bs.reset(); + bs1.reset(); + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + } + + void bs_1() + { + bs.reset(); + + o = 0xdeadbeefbadc0ffeLL; + bs << o; + CPPUNIT_ASSERT(bs.length() == 8); + o = 0; + bs >> o; + CPPUNIT_ASSERT(o == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 0); + + q = 0xdeadbeef; + bs << q; + CPPUNIT_ASSERT(bs.length() == 4); + q = 0; + bs >> q; + CPPUNIT_ASSERT(q == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 0); + + d = 0xf00f; + bs << d; + CPPUNIT_ASSERT(bs.length() == 2); + d = 0; + bs >> d; + CPPUNIT_ASSERT(d == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 0); + + b = 0x0f; + bs << b; + CPPUNIT_ASSERT(bs.length() == 1); + b = 0; + bs >> b; + CPPUNIT_ASSERT(b == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + + o = 0xdeadbeefbadc0ffeLL; + bs << o; + CPPUNIT_ASSERT(bs.length() == 8); + o = 0; + + q = 0xdeadbeef; + bs << q; + CPPUNIT_ASSERT(bs.length() == 12); + q = 0; + + d = 0xf00f; + bs << d; + CPPUNIT_ASSERT(bs.length() == 14); + d = 0; + + b = 0x0f; + bs << b; + CPPUNIT_ASSERT(bs.length() == 15); + b = 0; + + bs >> o; + CPPUNIT_ASSERT(o == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 7); + bs >> q; + CPPUNIT_ASSERT(q == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 3); + bs >> d; + CPPUNIT_ASSERT(d == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 1); + bs >> b; + CPPUNIT_ASSERT(b == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_1_1() + { + bs.reset(); + + o = 0xdeadbeefbadc0ffeLL; + bs << o; + CPPUNIT_ASSERT(bs.length() == 8); + o = 0; + + q = 0xdeadbeef; + bs << q; + CPPUNIT_ASSERT(bs.length() == 12); + q = 0; + + d = 0xf00f; + bs << d; + CPPUNIT_ASSERT(bs.length() == 14); + d = 0; + + b = 0x0f; + bs << b; + CPPUNIT_ASSERT(bs.length() == 15); + b = 0; + + ByteStream bbs1; + bbs1 << bs; + CPPUNIT_ASSERT(bbs1.length() == bs.length() + sizeof(messageqcpp::BSSizeType)); + bs.reset(); + bbs1 >> bs; + CPPUNIT_ASSERT(bbs1.length() == 0); + CPPUNIT_ASSERT(bs.length() == 15); + + bs >> o; + CPPUNIT_ASSERT(o == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 7); + bs >> q; + CPPUNIT_ASSERT(q == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 3); + bs >> d; + CPPUNIT_ASSERT(d == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 1); + bs >> b; + CPPUNIT_ASSERT(b == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_1_2() + { + bs.reset(); + + i64 = -2401053089477160962; + bs << i64; + CPPUNIT_ASSERT(bs.length() == 8); + i64 = 0; + + i32 = -559038737; + bs << i32; + CPPUNIT_ASSERT(bs.length() == 12); + i32 = 0; + + i16 = -4081; + bs << i16; + CPPUNIT_ASSERT(bs.length() == 14); + i16 = 0; + + i8 = 15; + bs << i8; + CPPUNIT_ASSERT(bs.length() == 15); + i8 = 0; + + bs >> i64; + CPPUNIT_ASSERT(i64 == -2401053089477160962); + CPPUNIT_ASSERT(bs.length() == 7); + + bs >> i32; + CPPUNIT_ASSERT(i32 == -559038737); + CPPUNIT_ASSERT(bs.length() == 3); + + bs >> i16; + CPPUNIT_ASSERT(i16 == -4081); + CPPUNIT_ASSERT(bs.length() == 1); + + bs >> i8; + CPPUNIT_ASSERT(i8 == 15); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_2() + { + int i; + + bs.reset(); + srand(time(0)); + + for (i = 0; i < 10240; i++) + { + bs << (uint32_t)rand(); + } + + bs1 = bs; + + uint32_t q1; + + for (i = 0; i < 10240; i++) + { + bs >> u32; + bs1 >> q1; + CPPUNIT_ASSERT(u32 == q1); + } + + bs.reset(); + bs1.reset(); + } + + void bs_3() + { + uint8_t ba[1024] = { + 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, + }; + + bs.load(ba, 8); + CPPUNIT_ASSERT(bs.length() == 8); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x12); + bs >> u16; + CPPUNIT_ASSERT(u16 == 0x5634); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdebc9a78); + + CPPUNIT_ASSERT(bs.length() == 1); + + bs.reset(); + CPPUNIT_ASSERT(bs.length() == 0); + + bs.load(ba, 8); + len = bs.length(); + CPPUNIT_ASSERT(len == 8); + bap = new ByteStream::byte[len]; + // bs >> bap; + memcpy(bap, bs.buf(), len); + CPPUNIT_ASSERT(memcmp(ba, bap, len) == 0); + delete[] bap; + bap = 0; + + bs.reset(); + + for (u32 = 0; u32 < 20480; u32++) + { + bs << u32; + } + + len = bs.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(u32))); + bap = new ByteStream::byte[len]; + // bs >> bap; + memcpy(bap, bs.buf(), len); + + bs.reset(); + + for (u32 = 0; u32 < 20480; u32++) + { + bs << u32; + } + + len = bs.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(q))); + bap1 = new ByteStream::byte[len]; + // bs >> bap1; + memcpy(bap1, bs.buf(), len); + + CPPUNIT_ASSERT(memcmp(bap1, bap, len) == 0); + + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + bs.reset(); + } + void bs_4() + { + for (i32 = 0; i32 < 20480; i32++) + { + bs << i32; + } + + ByteStream bs2(bs); + len = bs2.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(i32))); + bap = new ByteStream::byte[len]; + // bs2 >> bap; + memcpy(bap, bs2.buf(), len); + + bs1 = bs2; + len = bs1.length(); + CPPUNIT_ASSERT(len == (20480 * sizeof(i32))); + bap1 = new ByteStream::byte[len]; + // bs1 >> bap1; + memcpy(bap1, bs1.buf(), len); + + CPPUNIT_ASSERT(memcmp(bap1, bap, len) == 0); + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + bs.reset(); + bs1.reset(); + bs2.reset(); + } + + void bs_5_1() + { + bs.reset(); + + u8 = 0x0f; + bs << u8; + + for (;;) + bs >> u32; + } + + void bs_5_2() + { + bs.reset(); + + u8 = 0x0f; + bs << u8; + + for (;;) + bs >> u16; + } + + void bs_5_3() + { + bs.reset(); + + u8 = 0x0f; + bs << u8; + + for (;;) + bs >> u8; + } + + void bs_5_4() + { + bs.reset(); + + i8 = 0x0f; + bs << i8; + + for (;;) + bs >> i32; + } + + void bs_5_5() + { + bs.reset(); + + i8 = 0x0f; + bs << i8; + + for (;;) + bs >> i16; + } + + void bs_5_6() + { + bs.reset(); + + i8 = 0x0f; + bs << i8; + + for (;;) + bs >> i8; + } + + void bs_6() + { + u8 = 0x1a; + bs << u8; + u8 = 0x2b; + bs << u8; + u8 = 0x3c; + bs << u8; + + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x1a); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x2b); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x3c); + + bs.reset(); + + u8 = 12; + bs << u8; + u8 = 3; + bs << u8; + u8 = 0; + bs << u8; + u8 = 2; + bs << u8; + + ByteStream bs3(bs); + + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 12); + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 3); + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 0); + bs3 >> u8; + CPPUNIT_ASSERT(u8 == 2); + } + + void bs_7() + { + size_t i; + + bs.reset(); + bap = new ByteStream::byte[ByteStream::BlockSize * 2]; + ByteStream::byte* bapp; + + for (bapp = &bap[0], i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0xa5; + + bs.append(bap, ByteStream::BlockSize); + CPPUNIT_ASSERT(bs.length() == (ByteStream::BlockSize * 1)); + + for (bapp = &bap[0], i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0x5a; + + bs.append(bap, ByteStream::BlockSize); + CPPUNIT_ASSERT(bs.length() == (ByteStream::BlockSize * 2)); + + for (bapp = &bap[0], i = 0; i < ByteStream::BlockSize * 2; bapp++, i++) + *bapp = 0x55; + + bs.append(bap, ByteStream::BlockSize * 2); + CPPUNIT_ASSERT(bs.length() == (ByteStream::BlockSize * 4)); + delete[] bap; + bap = new ByteStream::byte[bs.length()]; + // bs >> bap; + memcpy(bap, bs.buf(), bs.length()); + bap1 = new ByteStream::byte[bs.length()]; + + for (bapp = &bap1[0], i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0xa5; + + for (i = 0; i < ByteStream::BlockSize; bapp++, i++) + *bapp = 0x5a; + + for (i = 0; i < ByteStream::BlockSize * 2; bapp++, i++) + *bapp = 0x55; + + CPPUNIT_ASSERT(memcmp(bap, bap1, bs.length()) == 0); + delete[] bap; + bap = 0; + delete[] bap1; + bap1 = 0; + } + + void bs_8() + { + bs.reset(); + string s; + s = "This is a test"; + bs << s; + string s1; + bs >> s1; + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(bs.length() == 0); + + ifstream ifs; + ifs.open("../CMakeLists.txt"); + int ifs_len; + ifs.seekg(0, ios::end); + ifs_len = ifs.tellg(); + ifs.seekg(0, ios::beg); + boost::scoped_array buf(new char[ifs_len + 1]); + ifs.read(buf.get(), ifs_len); + buf[ifs_len] = 0; + ifs.close(); + bs.reset(); + s = buf.get(); + bs << s; + bs >> s1; + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(bs.length() == 0); + + u8 = 0xa5; + bs << u8; + u16 = 0x5aa5; + bs << u16; + u32 = 0xdeadbeef; + bs << u32; + bs << s; + s += s1; + bs << s; + s += s1; + bs << s; + bs << u32; + bs << u16; + bs << u8; + + bs >> u8; + CPPUNIT_ASSERT(u8 == 0xa5); + bs >> u16; + CPPUNIT_ASSERT(u16 == 0x5aa5); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> s; + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(s.length() == (s1.length() * 1)); + bs >> s; + CPPUNIT_ASSERT(s.length() == (s1.length() * 2)); + bs >> s; + CPPUNIT_ASSERT(s.length() == (s1.length() * 3)); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> u16; + CPPUNIT_ASSERT(u16 == 0x5aa5); + bs >> u8; + CPPUNIT_ASSERT(u8 == 0xa5); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_9() + { + bs.reset(); + // Load up a bogus string (too short) + u32 = 100; + bs << u32; + bs.append(reinterpret_cast("This is a test"), 14); + string s; + // Should throw underflow + bs >> s; + } + + void bs_10() + { + bs.reset(); + bs1.reset(); + u32 = 0xdeadbeef; + bs << u32; + CPPUNIT_ASSERT(bs.length() == 4); + CPPUNIT_ASSERT(bs1.length() == 0); + bs.swap(bs1); + CPPUNIT_ASSERT(bs1.length() == 4); + CPPUNIT_ASSERT(bs.length() == 0); + bs1 >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + + bs.reset(); + bs1.reset(); + u32 = 0xdeadbeef; + bs << u32; + bs1 << u32; + bs += bs1; + CPPUNIT_ASSERT(bs.length() == 8); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + + bs.reset(); + bs1.reset(); + ByteStream bs2; + u32 = 0xdeadbeef; + bs1 << u32; + bs2 << u32; + bs = bs1 + bs2; + CPPUNIT_ASSERT(bs.length() == 8); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + } + + void bs_12() + { + bs.reset(); + + i128 = 10 * 100000000000000000000000000000000000000_xxl; + bs << i128; + CPPUNIT_ASSERT(bs.length() == 16); + i128 = 0; + bs >> i128; + CPPUNIT_ASSERT(i128 == static_cast(10 * 100000000000000000000000000000000000000_xxl)); + CPPUNIT_ASSERT(bs.length() == 0); + + u128 = 10 * 100000000000000000000000000000000000000_xxl; + bs << u128; + CPPUNIT_ASSERT(bs.length() == 16); + u128 = 0; + bs >> u128; + CPPUNIT_ASSERT(u128 == 10 * 100000000000000000000000000000000000000_xxl); + CPPUNIT_ASSERT(bs.length() == 0); + + u64 = 0xdeadbeefbadc0ffeLL; + bs << u64; + CPPUNIT_ASSERT(bs.length() == 8); + u64 = 0; + bs.peek(u64); + CPPUNIT_ASSERT(u64 == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 8); + u64 = 0; + bs >> u64; + CPPUNIT_ASSERT(u64 == 0xdeadbeefbadc0ffeLL); + CPPUNIT_ASSERT(bs.length() == 0); + + u16 = 0xf00f; + bs << u16; + CPPUNIT_ASSERT(bs.length() == 2); + u16 = 0; + bs.peek(u16); + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 2); + u16 = 0; + bs >> u16; + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 0); + + u8 = 0x0f; + bs << u8; + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs.peek(u8); + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + + u32 = 0xdeadbeef; + bs << u32; + CPPUNIT_ASSERT(bs.length() == 4); + u32 = 0; + + u16 = 0xf00f; + bs << u16; + CPPUNIT_ASSERT(bs.length() == 6); + u16 = 0; + + u8 = 0x0f; + bs << u8; + CPPUNIT_ASSERT(bs.length() == 7); + u8 = 0; + + bs.peek(u32); + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 7); + u32 = 0; + bs >> u32; + CPPUNIT_ASSERT(u32 == 0xdeadbeef); + CPPUNIT_ASSERT(bs.length() == 3); + u16 = 0; + bs.peek(u16); + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 3); + u16 = 0; + bs >> u16; + CPPUNIT_ASSERT(u16 == 0xf00f); + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs.peek(u8); + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 1); + u8 = 0; + bs >> u8; + CPPUNIT_ASSERT(u8 == 0x0f); + CPPUNIT_ASSERT(bs.length() == 0); + + string s; + s = "This is a test"; + bs << s; + string s1; + bs.peek(s1); + CPPUNIT_ASSERT(s == s1); + CPPUNIT_ASSERT(bs.length() == s1.size() + 4); + CPPUNIT_ASSERT(!s1.empty()); + string s2; + bs >> s2; + CPPUNIT_ASSERT(s == s2); + CPPUNIT_ASSERT(bs.length() == 0); + } + + void bs_13() + { + string s; + ifstream ifs; + ifs.open("../CMakeLists.txt"); + int ifs_len; + ifs.seekg(0, ios::end); + ifs_len = ifs.tellg(); + ifs.seekg(0, ios::beg); + boost::scoped_array buf(new char[ifs_len + 1]); + ifs.read(buf.get(), ifs_len); + buf[ifs_len] = 0; + ifs.close(); + bs.reset(); + s = buf.get(); + bs << s; + ofstream of("bs_13.dat"); + of << bs; + of.close(); + ifs.open("./bs_13.dat"); + ifs.seekg(0, ios::end); + int ifs_len1; + ifs_len1 = ifs.tellg(); + // will be longer than orig file because string length is encoded into stream + CPPUNIT_ASSERT((ifs_len + (int)sizeof(ByteStream::quadbyte)) == ifs_len1); + ifs.seekg(0, ios::beg); + boost::scoped_array buf1(new char[ifs_len1]); + bs1.reset(); + ifs >> bs1; + ifs.close(); + CPPUNIT_ASSERT(bs.length() == bs1.length()); + string s1; + bs1 >> s1; + CPPUNIT_ASSERT(s == s1); + } + + void bs_14() + { + ByteStream bs1(0); + ByteStream bs2(bs1); + CPPUNIT_ASSERT(bs2.fBuf == 0); + ByteStream bs3(0); + bs3 = bs1; + CPPUNIT_ASSERT(bs3.fBuf == 0); + } + + void bs_15() + { + ByteStream b1, b2, empty; + uint8_t u8; + + CPPUNIT_ASSERT(b1 == b2); + CPPUNIT_ASSERT(b2 == b1); + CPPUNIT_ASSERT(b2 == empty); + CPPUNIT_ASSERT(b1 == empty); + + CPPUNIT_ASSERT(!(b1 != b2)); + CPPUNIT_ASSERT(!(b2 != b1)); + CPPUNIT_ASSERT(!(b2 != empty)); + CPPUNIT_ASSERT(!(b1 != empty)); + + b1 << "Woo hoo"; + + CPPUNIT_ASSERT(b1 != b2); + CPPUNIT_ASSERT(b2 != b1); + CPPUNIT_ASSERT(b1 != empty); + + CPPUNIT_ASSERT(!(b1 == b2)); + CPPUNIT_ASSERT(!(b2 == b1)); + CPPUNIT_ASSERT(!(b1 == empty)); + + b2 << "Woo hoo"; + + CPPUNIT_ASSERT(b1 == b2); + CPPUNIT_ASSERT(b2 == b1); + CPPUNIT_ASSERT(!(b1 != b2)); + CPPUNIT_ASSERT(!(b2 != b1)); + + b1 >> u8; + + CPPUNIT_ASSERT(b1 != b2); + CPPUNIT_ASSERT(b2 != b1); + CPPUNIT_ASSERT(!(b1 == b2)); + CPPUNIT_ASSERT(!(b2 == b1)); + + b1 << u8; + + CPPUNIT_ASSERT(b1 != b2); + CPPUNIT_ASSERT(b2 != b1); + CPPUNIT_ASSERT(!(b1 == b2)); + CPPUNIT_ASSERT(!(b2 == b1)); + + b2 >> u8; + b2 << u8; + + CPPUNIT_ASSERT(b1 == b2); + CPPUNIT_ASSERT(b2 == b1); + CPPUNIT_ASSERT(!(b1 != b2)); + CPPUNIT_ASSERT(!(b2 != b1)); + } + + void bs_16() + { + int i; + uint32_t len; + + bs.reset(); + srand(time(0)); + + for (i = 0; i < 10240; i++) + { + bs << (ByteStream::quadbyte)rand(); + } + + boost::scoped_array bp(new ByteStream::byte[bs.length()]); + ByteStream::byte* bpp = bp.get(); + boost::scoped_array bp1(new ByteStream::byte[bs.length()]); + ByteStream::byte* bpp1 = bp1.get(); + + len = bs.length(); + CPPUNIT_ASSERT(len == 10240 * 4); + bs.peek(bpp); + CPPUNIT_ASSERT(bs.length() == len); + CPPUNIT_ASSERT(memcmp(bpp, bs.buf(), len) == 0); + + bs >> bpp1; + CPPUNIT_ASSERT(bs.length() == 0); + CPPUNIT_ASSERT(memcmp(bpp, bpp1, len) == 0); + + bs.reset(); + } +}; + +static string normServ; +static string brokeServ; +static string writeServ; +volatile static bool keepRunning; +volatile static bool isRunning; +volatile static bool leakCheck; + +#define TS_NS(x) (x) +#define TS_US(x) ((x) * 1000) +#define TS_MS(x) ((x) * 1000000) + +CPPUNIT_TEST_SUITE_REGISTRATION(ByteStreamTestSuite); + +#include +#include + +#include + +void setupSignalHandlers() +{ + struct sigaction ign; + + memset(&ign, 0, sizeof(ign)); + ign.sa_handler = SIG_IGN; + + sigaction(SIGPIPE, &ign, 0); +} + +int main(int argc, char** argv) +{ + setupSignalHandlers(); + + leakCheck = false; + + if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0) + leakCheck = true; + + CppUnit::TextUi::TestRunner runner; + CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); + runner.addTest(registry.makeTest()); + bool wasSuccessful = runner.run("", false); + return (wasSuccessful ? 0 : 1); +} diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index 6fad9767f..565b6eb1c 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -1,6 +1,6 @@ /* Copyright (C) 2014 InfiniDB, Inc. - Copyright (C) 2019 MariaDB Corporation + Copyright (C) 2019-2024 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -46,7 +46,7 @@ namespace messageqcpp /* Copies only the data left to be read */ void ByteStream::doCopy(const ByteStream& rhs) { - uint32_t rlen = rhs.length(); + BSSizeType rlen = rhs.length(); if (fMaxLen < rlen) { @@ -94,7 +94,7 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs) return *this; } -ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0) +ByteStream::ByteStream(BSSizeType initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0) { if (initSize > 0) growBuf(initSize); @@ -102,13 +102,13 @@ ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0) void ByteStream::add(const uint8_t b) { - if (fBuf == 0 || (static_cast(fCurInPtr - fBuf) == fMaxLen + ISSOverhead)) + if (fBuf == 0 || (static_cast(fCurInPtr - fBuf) == fMaxLen + ISSOverhead)) growBuf(); *fCurInPtr++ = b; } -void ByteStream::growBuf(uint32_t toSize) +void ByteStream::growBuf(BSSizeType toSize) { if (fBuf == 0) { @@ -138,8 +138,8 @@ void ByteStream::growBuf(uint32_t toSize) toSize = std::max(toSize, fMaxLen * 2); uint8_t* t = new uint8_t[toSize + ISSOverhead]; - uint32_t curOutOff = fCurOutPtr - fBuf; - uint32_t curInOff = fCurInPtr - fBuf; + BSSizeType curOutOff = fCurOutPtr - fBuf; + BSSizeType curInOff = fCurInPtr - fBuf; memcpy(t, fBuf, fCurInPtr - fBuf); #ifdef ZERO_ON_NEW memset(t + (fCurInPtr - fBuf), 0, (toSize + ISSOverhead) - (fCurInPtr - fBuf)); @@ -169,7 +169,7 @@ void ByteStream::setLongStrings(const std::vector>& o ByteStream& ByteStream::operator<<(const int8_t b) { - if (fBuf == 0 || (fCurInPtr - fBuf + 1U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(b) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int8_t*)fCurInPtr) = b; @@ -187,7 +187,7 @@ ByteStream& ByteStream::operator<<(const uint8_t b) ByteStream& ByteStream::operator<<(const int16_t d) { - if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(d) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int16_t*)fCurInPtr) = d; @@ -198,7 +198,7 @@ ByteStream& ByteStream::operator<<(const int16_t d) ByteStream& ByteStream::operator<<(const uint16_t d) { - if (fBuf == 0 || (fCurInPtr - fBuf + 2U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(d) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((uint16_t*)fCurInPtr) = d; @@ -209,7 +209,7 @@ ByteStream& ByteStream::operator<<(const uint16_t d) ByteStream& ByteStream::operator<<(const int32_t q) { - if (fBuf == 0 || (fCurInPtr - fBuf + 4U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(q) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int32_t*)fCurInPtr) = q; @@ -220,7 +220,7 @@ ByteStream& ByteStream::operator<<(const int32_t q) ByteStream& ByteStream::operator<<(const uint32_t q) { - if (fBuf == 0 || (fCurInPtr - fBuf + 4U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(q) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((uint32_t*)fCurInPtr) = q; @@ -231,7 +231,7 @@ ByteStream& ByteStream::operator<<(const uint32_t q) ByteStream& ByteStream::operator<<(const int64_t o) { - if (fBuf == 0 || (fCurInPtr - fBuf + 8U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(o) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((int64_t*)fCurInPtr) = o; @@ -242,7 +242,7 @@ ByteStream& ByteStream::operator<<(const int64_t o) ByteStream& ByteStream::operator<<(const uint64_t o) { - if (fBuf == 0 || (fCurInPtr - fBuf + 8U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(o) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); *((uint64_t*)fCurInPtr) = o; @@ -251,20 +251,20 @@ ByteStream& ByteStream::operator<<(const uint64_t o) return *this; } -ByteStream& ByteStream::operator<<(const uint128_t& o) +ByteStream& ByteStream::operator<<(const uint128_t& h) { - if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(h) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); - datatypes::TSInt128::storeUnaligned(fCurInPtr, o); + datatypes::TSInt128::storeUnaligned(fCurInPtr, h); fCurInPtr += 16; return *this; } -ByteStream& ByteStream::operator<<(const int128_t& o) +ByteStream& ByteStream::operator<<(const int128_t& h) { - if (fBuf == 0 || (fCurInPtr - fBuf + 16U > fMaxLen + ISSOverhead)) + if (fBuf == 0 || (fCurInPtr - fBuf + sizeof(h) > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); - datatypes::TSInt128::storeUnaligned(fCurInPtr, o); + datatypes::TSInt128::storeUnaligned(fCurInPtr, h); fCurInPtr += 16; return *this; } @@ -475,18 +475,18 @@ void ByteStream::peek(uint64_t& o) const o = *((uint64_t*)fCurOutPtr); } -void ByteStream::peek(uint128_t& o) const +void ByteStream::peek(uint128_t& h) const { if (length() < 16) throw underflow_error("ByteStream>uint128_t: not enough data in stream to fill datatype"); - datatypes::TSInt128::assignPtrPtr(&o, fCurOutPtr); + datatypes::TSInt128::assignPtrPtr(&h, fCurOutPtr); } -void ByteStream::peek(int128_t& o) const +void ByteStream::peek(int128_t& h) const { if (length() < 16) throw underflow_error("ByteStream>int128_t: not enough data in stream to fill datatype"); - datatypes::TSInt128::assignPtrPtr(&o, fCurOutPtr); + datatypes::TSInt128::assignPtrPtr(&h, fCurOutPtr); } void ByteStream::peek(string& s) const @@ -519,7 +519,7 @@ void ByteStream::peek(string& s) const throw logging::ProtocolError("expected a string"); // we know len >= 0 by now... - if (length() < static_cast(len + 4)) + if (length() < static_cast(len + 4)) { #if DEBUG_DUMP_STRINGS_LESS_THAN > 0 cerr << "bs: wanted " << len + 4 << " bytes, but there are only " << length() << " remaining" << endl; @@ -531,13 +531,13 @@ void ByteStream::peek(string& s) const s.assign((char*)&fCurOutPtr[4], len); } -void ByteStream::load(const uint8_t* bp, uint32_t len) +void ByteStream::load(const uint8_t* bp, BSSizeType len) { // Do all the stuff that could throw an exception first if (bp == 0 && len != 0) throw invalid_argument("ByteStream::load: bp cannot equal 0 when len is not equal to 0"); - uint32_t newMaxLen = (len + BlockSize - 1) / BlockSize * BlockSize; + BSSizeType newMaxLen = (len + BlockSize - 1) / BlockSize * BlockSize; if (len > fMaxLen) { @@ -551,7 +551,7 @@ void ByteStream::load(const uint8_t* bp, uint32_t len) fCurInPtr = fBuf + len + ISSOverhead; } -void ByteStream::append(const uint8_t* bp, uint32_t len) +void ByteStream::append(const uint8_t* bp, BSSizeType len) { if (len == 0) return; @@ -559,7 +559,7 @@ void ByteStream::append(const uint8_t* bp, uint32_t len) if (bp == 0) throw invalid_argument("ByteStream::append: bp cannot equal 0 when len is not equal to 0"); - uint32_t newSize = static_cast(fCurInPtr - fBuf + len); + BSSizeType newSize = static_cast(fCurInPtr - fBuf + len); if (fBuf == 0 || (newSize > fMaxLen)) growBuf(newSize); @@ -635,7 +635,7 @@ void ByteStream::serialize(ByteStream& bs) const void ByteStream::deserialize(ByteStream& bs) { - uint32_t len; + BSSizeType len; restart(); bs >> len; @@ -643,9 +643,9 @@ void ByteStream::deserialize(ByteStream& bs) bs.advance(len); } -void ByteStream::needAtLeast(size_t amount) +void ByteStream::needAtLeast(BSSizeType amount) { - size_t currentSpace; + BSSizeType currentSpace; currentSpace = fMaxLen - (fCurInPtr - (fBuf + ISSOverhead)); @@ -656,7 +656,7 @@ void ByteStream::needAtLeast(size_t amount) ByteStream& ByteStream::operator<<(const ByteStream& bs) { - uint32_t len = bs.length(); + BSSizeType len = bs.length(); *this << len; @@ -668,20 +668,20 @@ ByteStream& ByteStream::operator<<(const ByteStream& bs) ByteStream& ByteStream::operator>>(ByteStream& bs) { peek(bs); - fCurOutPtr += 4 + bs.length(); + fCurOutPtr += sizeof(BSSizeType) + bs.length(); return *this; } void ByteStream::peek(ByteStream& bs) const { - uint32_t len; + BSSizeType len; peek(len); if (length() < len) throw underflow_error("ByteStream>ByteStream: not enough data in stream to fill datatype"); - bs.load(&fCurOutPtr[4], len); + bs.load(&fCurOutPtr[sizeof(len)], len); } ByteStream& ByteStream::operator<<(const uuid& u) @@ -707,7 +707,7 @@ void ByteStream::peek(uuid& u) const ByteStream& ByteStream::operator<<(const float f) { - int sz = sizeof(float); + const constexpr BSSizeType sz = sizeof(float); if (fBuf == 0 || (fCurInPtr - fBuf + sz > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); @@ -719,7 +719,7 @@ ByteStream& ByteStream::operator<<(const float f) } ByteStream& ByteStream::operator<<(const double d) { - int sz = sizeof(double); + const constexpr BSSizeType sz = sizeof(double); if (fBuf == 0 || (fCurInPtr - fBuf + sz > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); @@ -731,7 +731,7 @@ ByteStream& ByteStream::operator<<(const double d) } ByteStream& ByteStream::operator<<(const long double d) { - int sz = sizeof(long double); + const constexpr BSSizeType sz = sizeof(long double); if (fBuf == 0 || (fCurInPtr - fBuf + sz > fMaxLen + ISSOverhead)) growBuf(fMaxLen + BlockSize); diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index 872956167..ff9606298 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -45,6 +45,7 @@ class ByteStreamTestSuite; namespace messageqcpp { typedef boost::shared_ptr SBS; +using BSSizeType = uint64_t; /** * @brief A class to marshall bytes as a stream @@ -76,11 +77,11 @@ class ByteStream : public Serializeable /** * default ctor */ - EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best + EXPORT explicit ByteStream(BSSizeType initSize = 8192); // multiples of pagesize are best /** * ctor with a uint8_t array and len initializer */ - inline ByteStream(const uint8_t* bp, const uint32_t len); + inline ByteStream(const uint8_t* bp, const BSSizeType len); /** * copy ctor */ @@ -337,12 +338,12 @@ class ByteStream : public Serializeable /** * load the stream from an array. Clears out any previous data. */ - EXPORT void load(const uint8_t* bp, uint32_t len); + EXPORT void load(const uint8_t* bp, BSSizeType len); /** * append bytes to the end of the stream. */ - EXPORT void append(const uint8_t* bp, uint32_t len); + EXPORT void append(const uint8_t* bp, BSSizeType len); /** * equality check on buffer contents. @@ -378,19 +379,19 @@ class ByteStream : public Serializeable * advance the output ptr without having to extract bytes * @warning be careful advancing near 4GB! */ - inline void advance(uint32_t amt); + inline void advance(BSSizeType amt); /** * returns the length of the queue (in bytes) * @warning do not attempt to make a ByteStream bigger than 4GB! */ - inline uint32_t length() const; + inline BSSizeType length() const; inline bool empty() const; /** * returns the length of the queue, including header overhead (in bytes) */ - inline uint32_t lengthWithHdrOverhead() const; + inline BSSizeType lengthWithHdrOverhead() const; /** * clears the stream. Releases any current stream and sets all pointers to 0. The state of the object @@ -422,7 +423,7 @@ class ByteStream : public Serializeable /** * Get the allocated size of the buffer. */ - inline uint32_t getBufferSize() const; + inline BSSizeType getBufferSize() const; /** * Serializeable interface @@ -437,10 +438,10 @@ class ByteStream : public Serializeable /** * memory allocation chunk size */ - EXPORT static const uint32_t BlockSize = 4096; + EXPORT static const BSSizeType BlockSize = 4096; /** size of the space we want in front of the data */ - EXPORT static const uint32_t ISSOverhead = + EXPORT static const BSSizeType ISSOverhead = 3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings. // Methods to get and set `long strings`. @@ -458,7 +459,7 @@ class ByteStream : public Serializeable /** * adds another BlockSize bytes to the internal buffer */ - void growBuf(uint32_t toSize = 0); + void growBuf(BSSizeType toSize = 0); /** * handles member copying from one ByteStream to another */ @@ -476,9 +477,8 @@ class ByteStream : public Serializeable uint8_t* fBuf; /// the start of the allocated buffer uint8_t* fCurInPtr; // the point in fBuf where data is inserted next uint8_t* fCurOutPtr; // the point in fBuf where data is extracted from next - uint32_t fMaxLen; // how big fBuf is currently - // Stores `long strings`. - std::vector> longStrings; + BSSizeType fMaxLen; // how big fBuf is currently + std::vector> longStrings; // Stores `long strings`. }; template @@ -527,7 +527,7 @@ static const uint8_t BS_BLOB = 9; static const uint8_t BS_SERIALIZABLE = 10; static const uint8_t BS_UUID = 11; -inline ByteStream::ByteStream(const uint8_t* bp, const uint32_t len) : fBuf(0), fMaxLen(0) +inline ByteStream::ByteStream(const uint8_t* bp, const BSSizeType len) : fBuf(0), fMaxLen(0) { load(bp, len); } @@ -544,15 +544,15 @@ inline uint8_t* ByteStream::buf() { return fCurOutPtr; } -inline uint32_t ByteStream::length() const +inline BSSizeType ByteStream::length() const { - return (uint32_t)(fCurInPtr - fCurOutPtr); + return static_cast(fCurInPtr - fCurOutPtr); } inline bool ByteStream::empty() const { return (length() == 0); } -inline uint32_t ByteStream::lengthWithHdrOverhead() const +inline BSSizeType ByteStream::lengthWithHdrOverhead() const { return (length() + ISSOverhead); } @@ -570,7 +570,7 @@ inline void ByteStream::rewind() { fCurOutPtr = fBuf + ISSOverhead; } -inline void ByteStream::advance(uint32_t adv) +inline void ByteStream::advance(BSSizeType adv) { // fCurOutPtr is always >= fBuf, so fCurOutPtr - fBuf is >= 0, and this difference is always <= 32 bits // there is an edge condition not detected here: if fCurOutPtr - fBuf is nearly 4GB and you try to @@ -619,7 +619,7 @@ inline ByteStream& ByteStream::operator=(const SBS& rhs) return *this; } -inline uint32_t ByteStream::getBufferSize() const +inline BSSizeType ByteStream::getBufferSize() const { return fMaxLen; } @@ -738,12 +738,6 @@ void deserializeSet(ByteStream& bs, std::set& s) s.insert(tmp); } } -/* -template<> -struct ByteStream::_ByteStreamType<1, ByteStream::byte>> -{ - typedef ByteStream::byte type; -}*/ } // namespace messageqcpp