1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

feat(bytestream,serdes): Distribute BS buf size data type change to avoid implicit data type narrowing

This commit is contained in:
drrtuy
2024-08-27 16:35:14 +00:00
committed by Leonid Fedorov
parent a947f7341c
commit 6f6e69815d
17 changed files with 88 additions and 88 deletions

View File

@ -20,6 +20,7 @@
*
******************************************************************************/
#include <iostream>
#include "bytestream.h"
#include "primitivemsg.h"
#include "blocksize.h"
#include "lbidlist.h"
@ -700,7 +701,7 @@ bool LBIDList::CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange,
const execplan::CalpontSystemCatalog::ColType& ct, const uint8_t BOP,
bool isDict)
{
int length = bs->length(), pos = 0;
messageqcpp::BSSizeType length = bs->length(), pos = 0;
const char* MsgDataPtr = (const char*)bs->buf();
bool scan = true;
int64_t value = 0;

View File

@ -282,6 +282,8 @@ struct ISMPacketHeader
uint32_t Interleave;
uint16_t Flags;
uint8_t Command;
// !!! This attribute is used to store a sum which arg type is potentially uint64_t.
// As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though.
uint16_t Size;
unsigned Type : 4;
unsigned MsgCount : 4;

View File

@ -19,6 +19,7 @@
* $Id: rowestimator.cpp 5642 2009-08-10 21:04:59Z wweeks $
*
******************************************************************************/
#include <cassert>
#include <iostream>
#include "primitivemsg.h"
#include "blocksize.h"
@ -292,7 +293,7 @@ float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry, const m
// For example, there are two operations for "col1 > 5 and col1 < 10":
// 1) col1 > 5
// 2) col2 < 10
int length = bs->length(), pos = 0;
messageqcpp::BSSizeType length = bs->length(), pos = 0;
const char* msgDataPtr = (const char*)bs->buf();
int64_t value = 0;
int128_t bigValue = 0;
@ -301,6 +302,7 @@ float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry, const m
for (int i = 0; i < comparisonLimit; i++)
{
assert(ct.colWidth >= 0);
pos += ct.colWidth + 2; // predicate + op + lcf
// TODO: Stole this condition from lbidlist.

View File

@ -4294,7 +4294,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non
if (from_tzinfo)
{
serializeTimezoneInfo(bs, from_tzinfo);
uint32_t length = bs.length();
messageqcpp::BSSizeType length = bs.length();
uint8_t* buf = new uint8_t[length];
bs >> buf;
tzinfo = string((char*)buf, length);
@ -4306,7 +4306,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non
if (to_tzinfo)
{
serializeTimezoneInfo(bs, to_tzinfo);
uint32_t length = bs.length();
messageqcpp::BSSizeType length = bs.length();
uint8_t* buf = new uint8_t[length];
bs >> buf;
tzinfo = string((char*)buf, length);

View File

@ -172,6 +172,8 @@ class ColumnCommand : public Command
// the length of base prim msg, which is everything up to the
// rid array for the pCol message
// !!! This attribute is used to store a sum which arg type is potentially uint64_t.
// As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though.
uint32_t baseMsgLength;
uint64_t lbid;

View File

@ -151,6 +151,8 @@ class DictStep : public Command
int compressionType;
messageqcpp::ByteStream filterString;
uint32_t filterCount;
// !!! This attribute is used to store a sum which arg type is potentially uint64_t.
// As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though.
uint32_t bufferSize;
uint32_t charsetNumber;
uint16_t inputRidCount;

View File

@ -19,13 +19,14 @@
#include <stdexcept>
#include <iostream>
#include <fstream>
#include <memory>
using namespace std;
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/scoped_array.hpp>
#include <cppunit/extensions/HelperMacros.h>
@ -538,6 +539,17 @@ class ByteStreamTestSuite : public CppUnit::TestFixture
bap1 = 0;
}
std::string getString()
{
static const std::string s(
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore "
"et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
"aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse "
"cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in "
"culpa qui officia deserunt mollit anim id est laborum.");
return s;
}
void bs_8()
{
bs.reset();
@ -549,18 +561,8 @@ class ByteStreamTestSuite : public CppUnit::TestFixture
CPPUNIT_ASSERT(s == s1);
CPPUNIT_ASSERT(bs.length() == 0);
ifstream ifs;
ifs.open("../CMakeLists.txt");
int ifs_len;
ifs.seekg(0, ios::end);
ifs_len = ifs.tellg();
ifs.seekg(0, ios::beg);
boost::scoped_array<char> buf(new char[ifs_len + 1]);
ifs.read(buf.get(), ifs_len);
buf[ifs_len] = 0;
ifs.close();
bs.reset();
s = buf.get();
s = getString();
bs << s;
bs >> s1;
CPPUNIT_ASSERT(s == s1);
@ -766,31 +768,21 @@ class ByteStreamTestSuite : public CppUnit::TestFixture
void bs_13()
{
string s;
ifstream ifs;
ifs.open("../CMakeLists.txt");
int ifs_len;
ifs.seekg(0, ios::end);
ifs_len = ifs.tellg();
ifs.seekg(0, ios::beg);
boost::scoped_array<char> buf(new char[ifs_len + 1]);
ifs.read(buf.get(), ifs_len);
buf[ifs_len] = 0;
ifs.close();
bs.reset();
s = buf.get();
std::string s = getString();
bs << s;
ofstream of("bs_13.dat");
of << bs;
of.close();
ifstream ifs;
ifs.open("./bs_13.dat");
ifs.seekg(0, ios::end);
int ifs_len1;
ifs_len1 = ifs.tellg();
size_t ifs_len1 = ifs.tellg();
// will be longer than orig file because string length is encoded into stream
CPPUNIT_ASSERT((ifs_len + (int)sizeof(ByteStream::quadbyte)) == ifs_len1);
CPPUNIT_ASSERT((s.size() + sizeof(ByteStream::quadbyte)) == ifs_len1);
ifs.seekg(0, ios::beg);
boost::scoped_array<char> buf1(new char[ifs_len1]);
std::unique_ptr<char[]> buf1(new char[ifs_len1]);
bs1.reset();
ifs >> bs1;
ifs.close();
@ -878,9 +870,9 @@ class ByteStreamTestSuite : public CppUnit::TestFixture
bs << (ByteStream::quadbyte)rand();
}
boost::scoped_array<ByteStream::byte> bp(new ByteStream::byte[bs.length()]);
std::unique_ptr<ByteStream::byte[]> bp(new ByteStream::byte[bs.length()]);
ByteStream::byte* bpp = bp.get();
boost::scoped_array<ByteStream::byte> bp1(new ByteStream::byte[bs.length()]);
std::unique_ptr<ByteStream::byte[]> bp1(new ByteStream::byte[bs.length()]);
ByteStream::byte* bpp1 = bp1.get();
len = bs.length();
@ -897,13 +889,6 @@ class ByteStreamTestSuite : public CppUnit::TestFixture
}
};
static string normServ;
static string brokeServ;
static string writeServ;
volatile static bool keepRunning;
volatile static bool isRunning;
volatile static bool leakCheck;
#define TS_NS(x) (x)
#define TS_US(x) ((x) * 1000)
#define TS_MS(x) ((x) * 1000000)
@ -929,11 +914,6 @@ int main(int argc, char** argv)
{
setupSignalHandlers();
leakCheck = false;
if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0)
leakCheck = true;
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());

View File

@ -16,6 +16,7 @@
MA 02110-1301, USA. */
#include "SocketPool.h"
#include "bytestream.h"
#include "configcpp.h"
#include "logger.h"
#include "messageFormat.h"
@ -87,8 +88,8 @@ SocketPool::~SocketPool()
int SocketPool::send_recv(messageqcpp::ByteStream& in, messageqcpp::ByteStream* out)
{
uint count = 0;
uint length = in.length();
messageqcpp::BSSizeType count = 0;
messageqcpp::BSSizeType length = in.length();
int sock = -1;
const uint8_t* inbuf = in.buf();
ssize_t err = 0;

View File

@ -16,6 +16,7 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "bytestream.h"
#define _CRT_RAND_S // for win rand_s
#include <unistd.h>
#include <boost/filesystem.hpp>
@ -804,7 +805,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs)
}
uint64_t ret = 0;
size_t len = bs.length();
BSSizeType len = bs.length();
idbassert(len != 0);
fs.seekp(offset);

View File

@ -20,6 +20,7 @@
*
*
***********************************************************************/
#include "bytestream.h"
#include "mcsconfig.h"
#include <stdexcept>
@ -117,7 +118,7 @@ const SBS CompressedInetStreamSocket::read(const struct timespec* timeout, bool*
void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats)
{
size_t len = msg.length();
BSSizeType len = msg.length();
if (useCompression && (len > 512))
{
@ -126,6 +127,9 @@ void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats)
alg->compress((char*)msg.buf(), len, (char*)smsg.getInputPtr() + HEADER_SIZE, &outLen);
// Save original len.
// !!!
// !!! Reducing BS size type from 64bit down to 32 and potentially loosing data.
// !!!
*(uint32_t*)smsg.getInputPtr() = len;
smsg.advanceInputPtr(outLen + HEADER_SIZE);

View File

@ -572,6 +572,9 @@ void InetStreamSocket::write(SBS msg, Stats* stats)
void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stats* stats) const
{
// !!!
// !!! Reducing BS size type from 64bit down to 32 and potentially loosing data.
// !!!
uint32_t msglen = msg.length();
uint32_t magic = whichMagic;
uint32_t* realBuf;

View File

@ -400,8 +400,7 @@ void SlaveComm::do_createStripeColumnExtents(ByteStream& msg)
if (printOnly)
{
cout << "createStripeColumnExtents(). "
<< "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl;
cout << "createStripeColumnExtents(). " << "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl;
for (uint32_t i = 0; i < cols.size(); i++)
cout << "StripeColExt arg " << i + 1 << ": oid=" << cols[i].oid << " width=" << cols[i].width << endl;
@ -2183,6 +2182,9 @@ void SlaveComm::saveDelta()
{
try
{
// !!!
// !!! Reducing BS size type from 64bit down to 32 and potentially loosing data.
// !!!
const uint32_t deltaLen = delta.length();
const uint32_t bufferSize = sizeof(deltaLen) + deltaLen;
std::unique_ptr<char[]> buffer(new char[bufferSize]);

View File

@ -437,7 +437,7 @@ void WEDataLoader::pushData2Cpimport(ByteStream& Ibs)
{
if (Ibs.length() > 0)
{
int aLen = Ibs.length();
messageqcpp::BSSizeType aLen = Ibs.length();
char* pStart = reinterpret_cast<char*>(Ibs.buf());
char* pEnd = pStart + aLen;
char* pPtr = pStart;

View File

@ -30,6 +30,7 @@
#pragma once
#include "bytestream.h"
#include "rwlock_local.h"
#include "resourcemanager.h"
@ -58,32 +59,32 @@ class WEDataLoader : public Observer
public:
bool setupCpimport(); // fork the cpimport
void teardownCpimport(bool useStoredWaitPidStatus); // @bug 4267
void pushData2Cpimport(ByteStream& Ibs); // push data to cpimport from the queue
void pushData2Cpimport(messageqcpp::ByteStream& Ibs); // push data to cpimport from the queue
void closeWritePipe();
void str2Argv(std::string CmdLine, std::vector<char*>& V);
public:
void onReceiveKeepAlive(ByteStream& Ibs);
void onReceiveData(ByteStream& Ibs);
void onReceiveEod(ByteStream& Ibs); // end of data
void onReceiveMode(ByteStream& Ibs);
void onReceiveKeepAlive(messageqcpp::ByteStream& Ibs);
void onReceiveData(messageqcpp::ByteStream& Ibs);
void onReceiveEod(messageqcpp::ByteStream& Ibs); // end of data
void onReceiveMode(messageqcpp::ByteStream& Ibs);
// void onReceiveCmd(messageqcpp::SBS bs);// {(ByteStream& Ibs);
void onReceiveCmd(ByteStream& bs); // {(ByteStream& Ibs);
void onReceiveAck(ByteStream& Ibs);
void onReceiveNak(ByteStream& Ibs);
void onReceiveError(ByteStream& Ibs);
void onReceiveCmd(messageqcpp::ByteStream& bs); // {(ByteStream& Ibs);
void onReceiveAck(messageqcpp::ByteStream& Ibs);
void onReceiveNak(messageqcpp::ByteStream& Ibs);
void onReceiveError(messageqcpp::ByteStream& Ibs);
void onReceiveJobId(ByteStream& Ibs);
void onReceiveJobData(ByteStream& Ibs);
void onReceiveImportFileName(ByteStream& Ibs);
void onReceiveCmdLineArgs(ByteStream& Ibs);
void onReceiveJobId(messageqcpp::ByteStream& Ibs);
void onReceiveJobData(messageqcpp::ByteStream& Ibs);
void onReceiveImportFileName(messageqcpp::ByteStream& Ibs);
void onReceiveCmdLineArgs(messageqcpp::ByteStream& Ibs);
void onReceiveStartCpimport();
void onReceiveBrmRptFileName(ByteStream& Ibs);
void onReceiveCleanup(ByteStream& Ibs);
void onReceiveRollback(ByteStream& Ibs);
void onReceiveBrmRptFileName(messageqcpp::ByteStream& Ibs);
void onReceiveCleanup(messageqcpp::ByteStream& Ibs);
void onReceiveRollback(messageqcpp::ByteStream& Ibs);
void onReceiveErrFileRqst(ByteStream& Ibs);
void onReceiveBadFileRqst(ByteStream& Ibs);
void onReceiveErrFileRqst(messageqcpp::ByteStream& Ibs);
void onReceiveBadFileRqst(messageqcpp::ByteStream& Ibs);
void onCpimportSuccess();
void onCpimportFailure();
@ -103,11 +104,11 @@ class WEDataLoader : public Observer
{
fMode = Mode;
}
void updateTxBytes(unsigned int Tx)
void updateTxBytes(messageqcpp::BSSizeType Tx)
{
fTxBytes += Tx;
}
void updateRxBytes(unsigned int Rx)
void updateRxBytes(messageqcpp::BSSizeType Rx)
{
fRxBytes += Rx;
}
@ -132,11 +133,11 @@ class WEDataLoader : public Observer
fObjId = ObjId;
}
unsigned int getTxBytes()
messageqcpp::BSSizeType getTxBytes()
{
return fTxBytes;
}
unsigned int getRxBytes()
messageqcpp::BSSizeType getRxBytes()
{
return fRxBytes;
}
@ -172,8 +173,8 @@ class WEDataLoader : public Observer
int fMode;
std::ofstream fDataDumpFile;
std::ofstream fJobFile;
unsigned int fTxBytes;
unsigned int fRxBytes;
messageqcpp::BSSizeType fTxBytes;
messageqcpp::BSSizeType fRxBytes;
char fPmId;
int fObjId; // Object Identifier for logging

View File

@ -2289,7 +2289,7 @@ int WESDHandler::getNextDbrPm2Send()
int WESDHandler::leastDataSendPm()
{
unsigned int aTx = 0;
messageqcpp::BSSizeType aTx = 0;
int aPmId = 0;
for (int aCnt = 1; aCnt <= fPmCount; aCnt++)

View File

@ -208,7 +208,7 @@ void WESplClient::send()
messageqcpp::SBS aSbs = fSendQueue.front();
fSendQueue.pop();
aLock.unlock();
int aLen = (*aSbs).length();
messageqcpp::BSSizeType aLen = (*aSbs).length();
if (aLen > 0)
{
@ -241,7 +241,7 @@ void WESplClient::recv()
rm_ts.tv_sec = fRdSecTo; // 0 when data sending otherwise 1- second
rm_ts.tv_nsec = 20000000; // 20 milliSec
bool isTimeOut = false;
int aLen = 0;
messageqcpp::BSSizeType aLen = 0;
try
{

View File

@ -29,7 +29,6 @@
#pragma once
#include "threadsafequeue.h"
#include "resourcemanager.h"
#include "we_messages.h"
@ -122,11 +121,11 @@ class WESplClient
{
return fRowTx;
}
uint32_t getBytesRcv() const
messageqcpp::BSSizeType getBytesRcv() const
{
return fBytesRcv;
}
uint32_t getBytesTx()
messageqcpp::BSSizeType getBytesTx()
{
boost::mutex::scoped_lock aLock(fTxMutex);
return fBytesTx;
@ -214,17 +213,17 @@ class WESplClient
{
return fIpAddress;
}
void setBytesRcv(uint32_t BytesRcv)
void setBytesRcv(messageqcpp::BSSizeType BytesRcv)
{
fBytesRcv = BytesRcv;
}
void setBytesTx(uint32_t BytesTx)
void setBytesTx(messageqcpp::BSSizeType BytesTx)
{
boost::mutex::scoped_lock aLock(fTxMutex);
fBytesTx = BytesTx;
aLock.unlock();
}
void updateBytesTx(uint32_t fBytes)
void updateBytesTx(messageqcpp::BSSizeType fBytes)
{
boost::mutex::scoped_lock aLock(fTxMutex);
fBytesTx += fBytes;
@ -358,8 +357,8 @@ class WESplClient
int fDataRqstCnt; // Data request count
long fRdSecTo; // read timeout sec
unsigned int fRowTx; // No. Of Rows Transmitted
uint32_t fBytesTx;
uint32_t fBytesRcv;
messageqcpp::BSSizeType fBytesTx;
messageqcpp::BSSizeType fBytesRcv;
time_t fLastInTime;
time_t fStartTime;
bool fSend;