diff --git a/dbcon/joblist/lbidlist.cpp b/dbcon/joblist/lbidlist.cpp index 34a5f23d5..ac97e49af 100644 --- a/dbcon/joblist/lbidlist.cpp +++ b/dbcon/joblist/lbidlist.cpp @@ -20,6 +20,7 @@ * ******************************************************************************/ #include +#include "bytestream.h" #include "primitivemsg.h" #include "blocksize.h" #include "lbidlist.h" @@ -700,7 +701,7 @@ bool LBIDList::CasualPartitionPredicate(const BRM::EMCasualPartition_t& cpRange, const execplan::CalpontSystemCatalog::ColType& ct, const uint8_t BOP, bool isDict) { - int length = bs->length(), pos = 0; + messageqcpp::BSSizeType length = bs->length(), pos = 0; const char* MsgDataPtr = (const char*)bs->buf(); bool scan = true; int64_t value = 0; diff --git a/dbcon/joblist/primitivemsg.h b/dbcon/joblist/primitivemsg.h index 479e1535a..f06a7e99c 100644 --- a/dbcon/joblist/primitivemsg.h +++ b/dbcon/joblist/primitivemsg.h @@ -282,6 +282,8 @@ struct ISMPacketHeader uint32_t Interleave; uint16_t Flags; uint8_t Command; + // !!! This attribute is used to store a sum which arg type is potentially uint64_t. + // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint16_t Size; unsigned Type : 4; unsigned MsgCount : 4; diff --git a/dbcon/joblist/rowestimator.cpp b/dbcon/joblist/rowestimator.cpp index fa7e8920c..1b2b2f18a 100644 --- a/dbcon/joblist/rowestimator.cpp +++ b/dbcon/joblist/rowestimator.cpp @@ -19,6 +19,7 @@ * $Id: rowestimator.cpp 5642 2009-08-10 21:04:59Z wweeks $ * ******************************************************************************/ +#include #include #include "primitivemsg.h" #include "blocksize.h" @@ -292,7 +293,7 @@ float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry, const m // For example, there are two operations for "col1 > 5 and col1 < 10": // 1) col1 > 5 // 2) col2 < 10 - int length = bs->length(), pos = 0; + messageqcpp::BSSizeType length = bs->length(), pos = 0; const char* msgDataPtr = (const char*)bs->buf(); int64_t value = 0; int128_t bigValue = 0; @@ -301,6 +302,7 @@ float RowEstimator::estimateRowReturnFactor(const BRM::EMEntry& emEntry, const m for (int i = 0; i < comparisonLimit; i++) { + assert(ct.colWidth >= 0); pos += ct.colWidth + 2; // predicate + op + lcf // TODO: Stole this condition from lbidlist. diff --git a/dbcon/mysql/ha_mcs_execplan.cpp b/dbcon/mysql/ha_mcs_execplan.cpp index 874e2fb70..2645be69e 100644 --- a/dbcon/mysql/ha_mcs_execplan.cpp +++ b/dbcon/mysql/ha_mcs_execplan.cpp @@ -4294,7 +4294,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (from_tzinfo) { serializeTimezoneInfo(bs, from_tzinfo); - uint32_t length = bs.length(); + messageqcpp::BSSizeType length = bs.length(); uint8_t* buf = new uint8_t[length]; bs >> buf; tzinfo = string((char*)buf, length); @@ -4306,7 +4306,7 @@ ReturnedColumn* buildFunctionColumn(Item_func* ifp, gp_walk_info& gwi, bool& non if (to_tzinfo) { serializeTimezoneInfo(bs, to_tzinfo); - uint32_t length = bs.length(); + messageqcpp::BSSizeType length = bs.length(); uint8_t* buf = new uint8_t[length]; bs >> buf; tzinfo = string((char*)buf, length); diff --git a/primitives/primproc/columncommand.h b/primitives/primproc/columncommand.h index cc08f6edb..be99261c4 100644 --- a/primitives/primproc/columncommand.h +++ b/primitives/primproc/columncommand.h @@ -172,6 +172,8 @@ class ColumnCommand : public Command // the length of base prim msg, which is everything up to the // rid array for the pCol message + // !!! This attribute is used to store a sum which arg type is potentially uint64_t. + // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint32_t baseMsgLength; uint64_t lbid; diff --git a/primitives/primproc/dictstep.h b/primitives/primproc/dictstep.h index 5200b80c6..9c5120b98 100644 --- a/primitives/primproc/dictstep.h +++ b/primitives/primproc/dictstep.h @@ -151,6 +151,8 @@ class DictStep : public Command int compressionType; messageqcpp::ByteStream filterString; uint32_t filterCount; + // !!! This attribute is used to store a sum which arg type is potentially uint64_t. + // As of 23.02.10 uint32_t here is always enough for the purpose of this attribute though. uint32_t bufferSize; uint32_t charsetNumber; uint16_t inputRidCount; diff --git a/tests/bytestream.cpp b/tests/bytestream.cpp index af2f846cb..76e60826b 100644 --- a/tests/bytestream.cpp +++ b/tests/bytestream.cpp @@ -19,13 +19,14 @@ #include #include #include +#include + using namespace std; #include #include #include #include #include -#include #include @@ -538,6 +539,17 @@ class ByteStreamTestSuite : public CppUnit::TestFixture bap1 = 0; } + std::string getString() + { + static const std::string s( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore " + "et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut " + "aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse " + "cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in " + "culpa qui officia deserunt mollit anim id est laborum."); + return s; + } + void bs_8() { bs.reset(); @@ -549,18 +561,8 @@ class ByteStreamTestSuite : public CppUnit::TestFixture CPPUNIT_ASSERT(s == s1); CPPUNIT_ASSERT(bs.length() == 0); - ifstream ifs; - ifs.open("../CMakeLists.txt"); - int ifs_len; - ifs.seekg(0, ios::end); - ifs_len = ifs.tellg(); - ifs.seekg(0, ios::beg); - boost::scoped_array buf(new char[ifs_len + 1]); - ifs.read(buf.get(), ifs_len); - buf[ifs_len] = 0; - ifs.close(); bs.reset(); - s = buf.get(); + s = getString(); bs << s; bs >> s1; CPPUNIT_ASSERT(s == s1); @@ -766,31 +768,21 @@ class ByteStreamTestSuite : public CppUnit::TestFixture void bs_13() { - string s; - ifstream ifs; - ifs.open("../CMakeLists.txt"); - int ifs_len; - ifs.seekg(0, ios::end); - ifs_len = ifs.tellg(); - ifs.seekg(0, ios::beg); - boost::scoped_array buf(new char[ifs_len + 1]); - ifs.read(buf.get(), ifs_len); - buf[ifs_len] = 0; - ifs.close(); bs.reset(); - s = buf.get(); + std::string s = getString(); bs << s; ofstream of("bs_13.dat"); of << bs; of.close(); + + ifstream ifs; ifs.open("./bs_13.dat"); ifs.seekg(0, ios::end); - int ifs_len1; - ifs_len1 = ifs.tellg(); + size_t ifs_len1 = ifs.tellg(); // will be longer than orig file because string length is encoded into stream - CPPUNIT_ASSERT((ifs_len + (int)sizeof(ByteStream::quadbyte)) == ifs_len1); + CPPUNIT_ASSERT((s.size() + sizeof(ByteStream::quadbyte)) == ifs_len1); ifs.seekg(0, ios::beg); - boost::scoped_array buf1(new char[ifs_len1]); + std::unique_ptr buf1(new char[ifs_len1]); bs1.reset(); ifs >> bs1; ifs.close(); @@ -878,9 +870,9 @@ class ByteStreamTestSuite : public CppUnit::TestFixture bs << (ByteStream::quadbyte)rand(); } - boost::scoped_array bp(new ByteStream::byte[bs.length()]); + std::unique_ptr bp(new ByteStream::byte[bs.length()]); ByteStream::byte* bpp = bp.get(); - boost::scoped_array bp1(new ByteStream::byte[bs.length()]); + std::unique_ptr bp1(new ByteStream::byte[bs.length()]); ByteStream::byte* bpp1 = bp1.get(); len = bs.length(); @@ -897,13 +889,6 @@ class ByteStreamTestSuite : public CppUnit::TestFixture } }; -static string normServ; -static string brokeServ; -static string writeServ; -volatile static bool keepRunning; -volatile static bool isRunning; -volatile static bool leakCheck; - #define TS_NS(x) (x) #define TS_US(x) ((x) * 1000) #define TS_MS(x) ((x) * 1000000) @@ -929,11 +914,6 @@ int main(int argc, char** argv) { setupSignalHandlers(); - leakCheck = false; - - if (argc > 1 && strcmp(argv[1], "--leakcheck") == 0) - leakCheck = true; - CppUnit::TextUi::TestRunner runner; CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); runner.addTest(registry.makeTest()); diff --git a/utils/cloudio/SocketPool.cpp b/utils/cloudio/SocketPool.cpp index f461e39f7..bf0c22fbd 100644 --- a/utils/cloudio/SocketPool.cpp +++ b/utils/cloudio/SocketPool.cpp @@ -16,6 +16,7 @@ MA 02110-1301, USA. */ #include "SocketPool.h" +#include "bytestream.h" #include "configcpp.h" #include "logger.h" #include "messageFormat.h" @@ -87,8 +88,8 @@ SocketPool::~SocketPool() int SocketPool::send_recv(messageqcpp::ByteStream& in, messageqcpp::ByteStream* out) { - uint count = 0; - uint length = in.length(); + messageqcpp::BSSizeType count = 0; + messageqcpp::BSSizeType length = in.length(); int sock = -1; const uint8_t* inbuf = in.buf(); ssize_t err = 0; diff --git a/utils/joiner/joinpartition.cpp b/utils/joiner/joinpartition.cpp index bccf89b38..362842b55 100644 --- a/utils/joiner/joinpartition.cpp +++ b/utils/joiner/joinpartition.cpp @@ -16,6 +16,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ +#include "bytestream.h" #define _CRT_RAND_S // for win rand_s #include #include @@ -804,7 +805,7 @@ uint64_t JoinPartition::writeByteStream(int which, ByteStream& bs) } uint64_t ret = 0; - size_t len = bs.length(); + BSSizeType len = bs.length(); idbassert(len != 0); fs.seekp(offset); diff --git a/utils/messageqcpp/compressed_iss.cpp b/utils/messageqcpp/compressed_iss.cpp index 82fdf0514..c194a41ad 100644 --- a/utils/messageqcpp/compressed_iss.cpp +++ b/utils/messageqcpp/compressed_iss.cpp @@ -20,6 +20,7 @@ * * ***********************************************************************/ +#include "bytestream.h" #include "mcsconfig.h" #include @@ -117,7 +118,7 @@ const SBS CompressedInetStreamSocket::read(const struct timespec* timeout, bool* void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats) { - size_t len = msg.length(); + BSSizeType len = msg.length(); if (useCompression && (len > 512)) { @@ -126,6 +127,9 @@ void CompressedInetStreamSocket::write(const ByteStream& msg, Stats* stats) alg->compress((char*)msg.buf(), len, (char*)smsg.getInputPtr() + HEADER_SIZE, &outLen); // Save original len. + // !!! + // !!! Reducing BS size type from 64bit down to 32 and potentially loosing data. + // !!! *(uint32_t*)smsg.getInputPtr() = len; smsg.advanceInputPtr(outLen + HEADER_SIZE); diff --git a/utils/messageqcpp/inetstreamsocket.cpp b/utils/messageqcpp/inetstreamsocket.cpp index 1e94e0b66..7df141192 100644 --- a/utils/messageqcpp/inetstreamsocket.cpp +++ b/utils/messageqcpp/inetstreamsocket.cpp @@ -572,6 +572,9 @@ void InetStreamSocket::write(SBS msg, Stats* stats) void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stats* stats) const { + // !!! + // !!! Reducing BS size type from 64bit down to 32 and potentially loosing data. + // !!! uint32_t msglen = msg.length(); uint32_t magic = whichMagic; uint32_t* realBuf; diff --git a/versioning/BRM/slavecomm.cpp b/versioning/BRM/slavecomm.cpp index b4f648eb7..27e0eece0 100644 --- a/versioning/BRM/slavecomm.cpp +++ b/versioning/BRM/slavecomm.cpp @@ -400,8 +400,7 @@ void SlaveComm::do_createStripeColumnExtents(ByteStream& msg) if (printOnly) { - cout << "createStripeColumnExtents(). " - << "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl; + cout << "createStripeColumnExtents(). " << "DBRoot=" << dbRoot << "; Part#=" << partitionNum << endl; for (uint32_t i = 0; i < cols.size(); i++) cout << "StripeColExt arg " << i + 1 << ": oid=" << cols[i].oid << " width=" << cols[i].width << endl; @@ -2183,6 +2182,9 @@ void SlaveComm::saveDelta() { try { + // !!! + // !!! Reducing BS size type from 64bit down to 32 and potentially loosing data. + // !!! const uint32_t deltaLen = delta.length(); const uint32_t bufferSize = sizeof(deltaLen) + deltaLen; std::unique_ptr buffer(new char[bufferSize]); diff --git a/writeengine/server/we_dataloader.cpp b/writeengine/server/we_dataloader.cpp index 25ae1fbe8..ce96f28ee 100644 --- a/writeengine/server/we_dataloader.cpp +++ b/writeengine/server/we_dataloader.cpp @@ -437,7 +437,7 @@ void WEDataLoader::pushData2Cpimport(ByteStream& Ibs) { if (Ibs.length() > 0) { - int aLen = Ibs.length(); + messageqcpp::BSSizeType aLen = Ibs.length(); char* pStart = reinterpret_cast(Ibs.buf()); char* pEnd = pStart + aLen; char* pPtr = pStart; diff --git a/writeengine/server/we_dataloader.h b/writeengine/server/we_dataloader.h index 86ce2904d..ad43609c7 100644 --- a/writeengine/server/we_dataloader.h +++ b/writeengine/server/we_dataloader.h @@ -30,6 +30,7 @@ #pragma once +#include "bytestream.h" #include "rwlock_local.h" #include "resourcemanager.h" @@ -58,32 +59,32 @@ class WEDataLoader : public Observer public: bool setupCpimport(); // fork the cpimport void teardownCpimport(bool useStoredWaitPidStatus); // @bug 4267 - void pushData2Cpimport(ByteStream& Ibs); // push data to cpimport from the queue + void pushData2Cpimport(messageqcpp::ByteStream& Ibs); // push data to cpimport from the queue void closeWritePipe(); void str2Argv(std::string CmdLine, std::vector& V); public: - void onReceiveKeepAlive(ByteStream& Ibs); - void onReceiveData(ByteStream& Ibs); - void onReceiveEod(ByteStream& Ibs); // end of data - void onReceiveMode(ByteStream& Ibs); + void onReceiveKeepAlive(messageqcpp::ByteStream& Ibs); + void onReceiveData(messageqcpp::ByteStream& Ibs); + void onReceiveEod(messageqcpp::ByteStream& Ibs); // end of data + void onReceiveMode(messageqcpp::ByteStream& Ibs); // void onReceiveCmd(messageqcpp::SBS bs);// {(ByteStream& Ibs); - void onReceiveCmd(ByteStream& bs); // {(ByteStream& Ibs); - void onReceiveAck(ByteStream& Ibs); - void onReceiveNak(ByteStream& Ibs); - void onReceiveError(ByteStream& Ibs); + void onReceiveCmd(messageqcpp::ByteStream& bs); // {(ByteStream& Ibs); + void onReceiveAck(messageqcpp::ByteStream& Ibs); + void onReceiveNak(messageqcpp::ByteStream& Ibs); + void onReceiveError(messageqcpp::ByteStream& Ibs); - void onReceiveJobId(ByteStream& Ibs); - void onReceiveJobData(ByteStream& Ibs); - void onReceiveImportFileName(ByteStream& Ibs); - void onReceiveCmdLineArgs(ByteStream& Ibs); + void onReceiveJobId(messageqcpp::ByteStream& Ibs); + void onReceiveJobData(messageqcpp::ByteStream& Ibs); + void onReceiveImportFileName(messageqcpp::ByteStream& Ibs); + void onReceiveCmdLineArgs(messageqcpp::ByteStream& Ibs); void onReceiveStartCpimport(); - void onReceiveBrmRptFileName(ByteStream& Ibs); - void onReceiveCleanup(ByteStream& Ibs); - void onReceiveRollback(ByteStream& Ibs); + void onReceiveBrmRptFileName(messageqcpp::ByteStream& Ibs); + void onReceiveCleanup(messageqcpp::ByteStream& Ibs); + void onReceiveRollback(messageqcpp::ByteStream& Ibs); - void onReceiveErrFileRqst(ByteStream& Ibs); - void onReceiveBadFileRqst(ByteStream& Ibs); + void onReceiveErrFileRqst(messageqcpp::ByteStream& Ibs); + void onReceiveBadFileRqst(messageqcpp::ByteStream& Ibs); void onCpimportSuccess(); void onCpimportFailure(); @@ -103,11 +104,11 @@ class WEDataLoader : public Observer { fMode = Mode; } - void updateTxBytes(unsigned int Tx) + void updateTxBytes(messageqcpp::BSSizeType Tx) { fTxBytes += Tx; } - void updateRxBytes(unsigned int Rx) + void updateRxBytes(messageqcpp::BSSizeType Rx) { fRxBytes += Rx; } @@ -132,11 +133,11 @@ class WEDataLoader : public Observer fObjId = ObjId; } - unsigned int getTxBytes() + messageqcpp::BSSizeType getTxBytes() { return fTxBytes; } - unsigned int getRxBytes() + messageqcpp::BSSizeType getRxBytes() { return fRxBytes; } @@ -172,8 +173,8 @@ class WEDataLoader : public Observer int fMode; std::ofstream fDataDumpFile; std::ofstream fJobFile; - unsigned int fTxBytes; - unsigned int fRxBytes; + messageqcpp::BSSizeType fTxBytes; + messageqcpp::BSSizeType fRxBytes; char fPmId; int fObjId; // Object Identifier for logging diff --git a/writeengine/splitter/we_sdhandler.cpp b/writeengine/splitter/we_sdhandler.cpp index b076c91c2..d4eb6af6e 100644 --- a/writeengine/splitter/we_sdhandler.cpp +++ b/writeengine/splitter/we_sdhandler.cpp @@ -2289,7 +2289,7 @@ int WESDHandler::getNextDbrPm2Send() int WESDHandler::leastDataSendPm() { - unsigned int aTx = 0; + messageqcpp::BSSizeType aTx = 0; int aPmId = 0; for (int aCnt = 1; aCnt <= fPmCount; aCnt++) diff --git a/writeengine/splitter/we_splclient.cpp b/writeengine/splitter/we_splclient.cpp index 8bcbdcd56..b4eee7e16 100644 --- a/writeengine/splitter/we_splclient.cpp +++ b/writeengine/splitter/we_splclient.cpp @@ -208,7 +208,7 @@ void WESplClient::send() messageqcpp::SBS aSbs = fSendQueue.front(); fSendQueue.pop(); aLock.unlock(); - int aLen = (*aSbs).length(); + messageqcpp::BSSizeType aLen = (*aSbs).length(); if (aLen > 0) { @@ -241,7 +241,7 @@ void WESplClient::recv() rm_ts.tv_sec = fRdSecTo; // 0 when data sending otherwise 1- second rm_ts.tv_nsec = 20000000; // 20 milliSec bool isTimeOut = false; - int aLen = 0; + messageqcpp::BSSizeType aLen = 0; try { diff --git a/writeengine/splitter/we_splclient.h b/writeengine/splitter/we_splclient.h index 51db86399..e38ee0d25 100644 --- a/writeengine/splitter/we_splclient.h +++ b/writeengine/splitter/we_splclient.h @@ -29,7 +29,6 @@ #pragma once -#include "threadsafequeue.h" #include "resourcemanager.h" #include "we_messages.h" @@ -122,11 +121,11 @@ class WESplClient { return fRowTx; } - uint32_t getBytesRcv() const + messageqcpp::BSSizeType getBytesRcv() const { return fBytesRcv; } - uint32_t getBytesTx() + messageqcpp::BSSizeType getBytesTx() { boost::mutex::scoped_lock aLock(fTxMutex); return fBytesTx; @@ -214,17 +213,17 @@ class WESplClient { return fIpAddress; } - void setBytesRcv(uint32_t BytesRcv) + void setBytesRcv(messageqcpp::BSSizeType BytesRcv) { fBytesRcv = BytesRcv; } - void setBytesTx(uint32_t BytesTx) + void setBytesTx(messageqcpp::BSSizeType BytesTx) { boost::mutex::scoped_lock aLock(fTxMutex); fBytesTx = BytesTx; aLock.unlock(); } - void updateBytesTx(uint32_t fBytes) + void updateBytesTx(messageqcpp::BSSizeType fBytes) { boost::mutex::scoped_lock aLock(fTxMutex); fBytesTx += fBytes; @@ -358,8 +357,8 @@ class WESplClient int fDataRqstCnt; // Data request count long fRdSecTo; // read timeout sec unsigned int fRowTx; // No. Of Rows Transmitted - uint32_t fBytesTx; - uint32_t fBytesRcv; + messageqcpp::BSSizeType fBytesTx; + messageqcpp::BSSizeType fBytesRcv; time_t fLastInTime; time_t fStartTime; bool fSend;