1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-4810 Redundant copying and wasting memory in PrimProc

This patch eliminates a copying `long string`s into the bytestream.
This commit is contained in:
Denis Khalikov
2021-08-11 15:07:19 +03:00
parent 923bbf4033
commit 7bda598fbf
6 changed files with 155 additions and 176 deletions

View File

@ -59,6 +59,8 @@ void ByteStream::doCopy(const ByteStream& rhs)
memcpy(fBuf + ISSOverhead, rhs.fCurOutPtr, rlen); memcpy(fBuf + ISSOverhead, rhs.fCurOutPtr, rlen);
fCurInPtr = fBuf + ISSOverhead + rlen; fCurInPtr = fBuf + ISSOverhead + rlen;
fCurOutPtr = fBuf + ISSOverhead; fCurOutPtr = fBuf + ISSOverhead;
// Copy `longStrings` as well.
longStrings = rhs.longStrings;
} }
ByteStream::ByteStream(const ByteStream& rhs) : ByteStream::ByteStream(const ByteStream& rhs) :
@ -87,6 +89,8 @@ ByteStream& ByteStream::operator=(const ByteStream& rhs)
delete [] fBuf; delete [] fBuf;
fBuf = fCurInPtr = fCurOutPtr = 0; fBuf = fCurInPtr = fCurOutPtr = 0;
fMaxLen = 0; fMaxLen = 0;
// Clear `longStrings`.
longStrings.clear();
} }
} }
@ -152,6 +156,18 @@ void ByteStream::growBuf(uint32_t toSize)
} }
} }
std::vector<boost::shared_array<uint8_t>>& ByteStream::getLongStrings() { return longStrings; }
const std::vector<boost::shared_array<uint8_t>>& ByteStream::getLongStrings() const
{
return longStrings;
}
void ByteStream::setLongStrings(const std::vector<boost::shared_array<uint8_t>>& other)
{
longStrings = other;
}
ByteStream& ByteStream::operator<<(const int8_t b) ByteStream& ByteStream::operator<<(const int8_t b)
{ {
if (fBuf == 0 || (fCurInPtr - fBuf + 1U > fMaxLen + ISSOverhead)) if (fBuf == 0 || (fCurInPtr - fBuf + 1U > fMaxLen + ISSOverhead))

View File

@ -28,6 +28,7 @@
#include <vector> #include <vector>
#include <set> #include <set>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/shared_array.hpp>
#include <boost/version.hpp> #include <boost/version.hpp>
#include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid.hpp>
#include <stdint.h> #include <stdint.h>
@ -445,7 +446,13 @@ public:
EXPORT static const uint32_t BlockSize = 4096; EXPORT static const uint32_t BlockSize = 4096;
/** size of the space we want in front of the data */ /** size of the space we want in front of the data */
EXPORT static const uint32_t ISSOverhead = 2 * sizeof(uint32_t); //space for the BS magic & length EXPORT static const uint32_t ISSOverhead =
3 * sizeof(uint32_t); // space for the BS magic & length & number of long strings.
// Methods to get and set `long strings`.
EXPORT std::vector<boost::shared_array<uint8_t>>& getLongStrings();
EXPORT const std::vector<boost::shared_array<uint8_t>>& getLongStrings() const;
EXPORT void setLongStrings(const std::vector<boost::shared_array<uint8_t>>& other);
friend class ::ByteStreamTestSuite; friend class ::ByteStreamTestSuite;
@ -469,6 +476,8 @@ private:
uint8_t* fCurInPtr; //the point in fBuf where data is inserted next uint8_t* fCurInPtr; //the point in fBuf where data is inserted next
uint8_t* fCurOutPtr; //the point in fBuf where data is extracted from next uint8_t* fCurOutPtr; //the point in fBuf where data is extracted from next
uint32_t fMaxLen; //how big fBuf is currently uint32_t fMaxLen; //how big fBuf is currently
// Stores `long strings`.
std::vector<boost::shared_array<uint8_t>> longStrings;
}; };
template<int W, typename T = void> template<int W, typename T = void>

View File

@ -84,6 +84,8 @@ using namespace std;
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
using boost::scoped_array; using boost::scoped_array;
#include <boost/shared_array.hpp>
#define INETSTREAMSOCKET_DLLEXPORT #define INETSTREAMSOCKET_DLLEXPORT
#include "inetstreamsocket.h" #include "inetstreamsocket.h"
#undef INETSTREAMSOCKET_DLLEXPORT #undef INETSTREAMSOCKET_DLLEXPORT
@ -94,6 +96,7 @@ using boost::scoped_array;
#include "logger.h" #include "logger.h"
#include "loggingid.h" #include "loggingid.h"
#include "idbcompress.h" #include "idbcompress.h"
#include "rowgroup.h"
// some static functions // some static functions
namespace namespace
@ -471,43 +474,19 @@ retry:
return true; return true;
} }
const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeOut, Stats* stats) const bool InetStreamSocket::readFixedSizeData(struct pollfd* pfd, uint8_t* buffer,
const size_t numberOfBytes,
const struct ::timespec* timeout, bool* isTimeOut,
Stats* stats, int64_t msecs) const
{ {
long msecs = -1; size_t bytesRead = 0;
while (bytesRead < numberOfBytes)
struct pollfd pfd[1];
pfd[0].fd = fSocketParms.sd();
pfd[0].events = POLLIN;
if (timeout != 0)
msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
// we need to read the 4-byte message length first.
uint32_t msglen;
uint8_t* msglenp = reinterpret_cast<uint8_t*>(&msglen);
size_t mlread = 0;
if (readToMagic(msecs, isTimeOut, stats) == false) //indicates a timeout or EOF
{ {
// MCOL-480 The connector calls with timeout in a loop so that ssize_t currentBytesRead;
// it can check a killed flag. This means that for a long running query, int err;
// the following fills the warning log.
// if (isTimeOut && *isTimeOut)
// {
// logIoError("InetStreamSocket::read: timeout during readToMagic", 0);
// }
return SBS(new ByteStream(0));
}
//FIXME: This seems like a lot of work to read 4 bytes...
while (mlread < sizeof(msglen))
{
ssize_t t;
if (timeout != NULL) if (timeout != NULL)
{ {
int err;
pfd[0].revents = 0; pfd[0].revents = 0;
err = poll(pfd, 1, msecs); err = poll(pfd, 1, msecs);
@ -525,152 +504,131 @@ const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeO
*isTimeOut = true; *isTimeOut = true;
logIoError("InetStreamSocket::read: timeout during first poll", 0); logIoError("InetStreamSocket::read: timeout during first poll", 0);
return SBS(new ByteStream(0)); return false;
} }
} }
#ifdef _MSC_VER #ifdef _MSC_VER
t = ::recv(fSocketParms.sd(), (char*)(msglenp + mlread), sizeof(msglen) - mlread, 0); currentBytesRead =
::recv(fSocketParms.sd(), (char*) (buffer + bytesRead),
std::min(numberOfBytes - bytesRead, reinterpret_cast<size_t>(MaxSendPacketSize));
readAmoumt, 0);
#else #else
t = ::read(fSocketParms.sd(), msglenp + mlread, sizeof(msglen) - mlread); currentBytesRead = ::read(fSocketParms.sd(), buffer + bytesRead, numberOfBytes - bytesRead);
#endif #endif
if (t == 0) if (currentBytesRead == 0)
{ {
if (timeout == NULL) if (timeout == NULL)
{ {
logIoError("InetStreamSocket::read: timeout during first read", 0); logIoError("InetStreamSocket::read: timeout during first read", 0);
return SBS(new ByteStream(0)); // don't return an incomplete message return false;
} }
else else
throw SocketClosed("InetStreamSocket::read: Remote is closed"); throw SocketClosed("InetStreamSocket::read: Remote is closed");
} }
if (t < 0) if (currentBytesRead < 0)
{ {
int e = errno; err = errno;
if (e == EINTR) if (err == EINTR)
{
continue; continue;
}
if (e == KERR_ERESTARTSYS) if (err == KERR_ERESTARTSYS)
{ {
logIoError("InetStreamSocket::read: I/O error2", e); logIoError("InetStreamSocket::read: I/O error2", err);
continue; continue;
} }
ostringstream oss; ostringstream oss;
oss << "InetStreamSocket::read: I/O error2: " << oss << "InetStreamSocket::read: I/O error2: " << strerror(err);
strerror(e);
throw runtime_error(oss.str()); throw runtime_error(oss.str());
} }
mlread += t; bytesRead += currentBytesRead;
} }
if (stats) if (stats)
stats->dataRecvd(sizeof(msglen)); stats->dataRecvd(bytesRead);
SBS res(new ByteStream(msglen)); return true;
uint8_t* bufp = res->getInputPtr(); }
size_t nread = 0; const SBS InetStreamSocket::read(const struct ::timespec* timeout, bool* isTimeOut,
Stats* stats) const
{
int64_t msecs = -1;
//Finally read the actual message... struct pollfd pfd[1];
while (nread < msglen) pfd[0].fd = fSocketParms.sd();
pfd[0].events = POLLIN;
if (timeout != 0)
msecs = timeout->tv_sec * 1000 + timeout->tv_nsec / 1000000;
if (readToMagic(msecs, isTimeOut, stats) == false) // indicates a timeout or EOF
{ {
ssize_t t; // MCOL-480 The connector calls with timeout in a loop so that
// it can check a killed flag. This means that for a long running query,
if (timeout != NULL) // the following fills the warning log.
{ // if (isTimeOut && *isTimeOut)
int err; // {
// logIoError("InetStreamSocket::read: timeout during readToMagic", 0);
pfd[0].revents = 0; // }
err = poll(pfd, 1, msecs); return SBS(new ByteStream(0));
if (err < 0 || pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))
{
ostringstream oss;
oss << "InetStreamSocket::read: I/O error3: " <<
strerror(errno);
throw runtime_error(oss.str());
}
if (err == 0) // timeout
{
if (isTimeOut)
{
logIoError("InetStreamSocket::read: timeout during second poll", 0);
*isTimeOut = true;
}
if (stats)
stats->dataRecvd(nread);
return SBS(new ByteStream(0));
}
}
#ifdef _MSC_VER
int readAmount = std::min((int)msglen - (int)nread, MaxSendPacketSize);
t = ::recv(fSocketParms.sd(), (char*)(bufp + nread), readAmount, 0);
#else
t = ::read(fSocketParms.sd(), bufp + nread, msglen - nread);
#endif
if (t == 0)
{
if (stats)
stats->dataRecvd(nread);
if (timeout == NULL)
return SBS(new ByteStream(0)); // don't return an incomplete message
else
{
logIoError("InetStreamSocket::read: timeout during second read", 0);
throw SocketClosed("InetStreamSocket::read: Remote is closed");
}
}
if (t < 0)
{
ostringstream oss;
#ifdef _MSC_VER
int e = WSAGetLastError();
oss << "InetStreamSocket::read: I/O error4: WSA: " << e;
#else
int e = errno;
if (e == EINTR)
{
continue;
}
if (e == KERR_ERESTARTSYS)
{
logIoError("InetStreamSocket::read: I/O error4", e);
continue;
}
oss << "InetStreamSocket::read: I/O error4: " <<
strerror(e);
#endif
if (stats)
stats->dataRecvd(nread);
throw runtime_error(oss.str());
}
nread += t;
} }
if (stats) // we need to read the 4-byte message length first.
stats->dataRecvd(msglen); uint32_t msglen;
if (!readFixedSizeData(pfd, reinterpret_cast<uint8_t*>(&msglen), sizeof(msglen), timeout,
isTimeOut, stats, msecs))
return SBS(new ByteStream(0));
// Read the number of the `long strings`.
uint32_t longStringSize;
if (!readFixedSizeData(pfd, reinterpret_cast<uint8_t*>(&longStringSize), sizeof(longStringSize),
timeout, isTimeOut, stats, msecs))
return SBS(new ByteStream(0));
// Read the actual data of the `ByteStream`.
SBS res(new ByteStream(msglen));
if (!readFixedSizeData(pfd, res->getInputPtr(), msglen, timeout, isTimeOut, stats, msecs))
return SBS(new ByteStream(0));
res->advanceInputPtr(msglen); res->advanceInputPtr(msglen);
std::vector<boost::shared_array<uint8_t>> longStrings;
longStrings.reserve(longStringSize);
for (uint32_t i = 0; i < longStringSize; ++i)
{
// Read `MemChunk`.
rowgroup::StringStore::MemChunk memChunk;
if (!readFixedSizeData(pfd, reinterpret_cast<uint8_t*>(&memChunk),
sizeof(rowgroup::StringStore::MemChunk), timeout, isTimeOut, stats,
msecs))
return SBS(new ByteStream(0));
// Allocate new memory for the `long string`.
boost::shared_array<uint8_t> longString(
new uint8_t[sizeof(rowgroup::StringStore::MemChunk) + memChunk.currentSize]);
uint8_t* longStringData = longString.get();
// Initialize memchunk with `current size` and `capacity`.
auto* memChunkPointer = reinterpret_cast<rowgroup::StringStore::MemChunk*>(longStringData);
memChunkPointer->currentSize = memChunk.currentSize;
memChunkPointer->capacity = memChunk.capacity;
// Read the `long string`.
if (!readFixedSizeData(pfd, memChunkPointer->data, memChunkPointer->currentSize, timeout,
isTimeOut, stats, msecs))
return SBS(new ByteStream(0));
longStrings.push_back(longString);
}
res->setLongStrings(longStrings);
return res; return res;
} }
@ -692,16 +650,32 @@ void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stat
if (msglen == 0) return; if (msglen == 0) return;
const auto& longStrings = msg.getLongStrings();
/* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there /* buf.fCurOutPtr points to the data to send; ByteStream guarantees that there
are at least 8 bytes before that for the magic & length fields */ are at least 12 bytes before that for the magic & length fields */
realBuf = (uint32_t*)msg.buf(); realBuf = (uint32_t*)msg.buf();
realBuf -= 2; realBuf -= 3;
realBuf[0] = magic; realBuf[0] = magic;
realBuf[1] = msglen; realBuf[1] = msglen;
realBuf[2] = longStrings.size();
try try
{ {
written(fSocketParms.sd(), (const uint8_t*)realBuf, msglen + sizeof(msglen) + sizeof(magic)); auto bytesToWrite = sizeof(msglen) + sizeof(magic) + sizeof(uint32_t) + msglen;
written(fSocketParms.sd(), (const uint8_t*) realBuf, bytesToWrite);
for (const auto& longString : longStrings)
{
const rowgroup::StringStore::MemChunk* memChunk =
reinterpret_cast<rowgroup::StringStore::MemChunk*>(longString.get());
const auto writeSize = memChunk->currentSize + sizeof(rowgroup::StringStore::MemChunk);
written(fSocketParms.sd(), (const uint8_t*) longString.get(), writeSize);
// For stats.
bytesToWrite += writeSize;
}
if (stats)
stats->dataSent(bytesToWrite);
} }
catch (std::exception& ex) catch (std::exception& ex)
{ {
@ -709,9 +683,6 @@ void InetStreamSocket::do_write(const ByteStream& msg, uint32_t whichMagic, Stat
errorMsg += " -- write from " + toString(); errorMsg += " -- write from " + toString();
throw runtime_error(errorMsg); throw runtime_error(errorMsg);
} }
if (stats)
stats->dataSent(msglen + sizeof(msglen) + sizeof(magic));
} }
void InetStreamSocket::write(const ByteStream& msg, Stats* stats) void InetStreamSocket::write(const ByteStream& msg, Stats* stats)

View File

@ -237,6 +237,9 @@ protected:
void do_write(const ByteStream& msg, uint32_t magic, Stats* stats = NULL) const; void do_write(const ByteStream& msg, uint32_t magic, Stats* stats = NULL) const;
ssize_t written(int fd, const uint8_t* ptr, size_t nbytes) const; ssize_t written(int fd, const uint8_t* ptr, size_t nbytes) const;
bool readFixedSizeData(struct pollfd* pfd, uint8_t* buffer, const size_t numberOfBytes,
const struct ::timespec* timeout, bool* isTimeOut, Stats* stats,
int64_t msec) const;
SocketParms fSocketParms; /// The socket parms SocketParms fSocketParms; /// The socket parms
size_t fBlocksize; size_t fBlocksize;

View File

@ -173,14 +173,7 @@ void StringStore::serialize(ByteStream& bs) const
bs.append(mc->data, mc->currentSize); bs.append(mc->data, mc->currentSize);
} }
bs << (uint64_t) longStrings.size(); bs.setLongStrings(longStrings);
for (i = 0; i < longStrings.size(); i++)
{
mc = (MemChunk*) longStrings[i].get();
bs << (uint64_t) mc->currentSize;
bs.append(mc->data, mc->currentSize);
}
} }
void StringStore::deserialize(ByteStream& bs) void StringStore::deserialize(ByteStream& bs)
@ -211,20 +204,7 @@ void StringStore::deserialize(ByteStream& bs)
bs.advance(size); bs.advance(size);
} }
bs >> count; longStrings = bs.getLongStrings();
longStrings.resize(count);
for (i = 0; i < count; i++)
{
bs >> size;
buf = bs.buf();
longStrings[i].reset(new uint8_t[size + sizeof(MemChunk)]);
mc = (MemChunk*) longStrings[i].get();
mc->capacity = mc->currentSize = size;
memcpy(mc->data, buf, size);
bs.advance(size);
}
return; return;
} }

View File

@ -175,13 +175,6 @@ public:
return fUseStoreStringMutex; return fUseStoreStringMutex;
} }
private:
std::string empty_str;
StringStore(const StringStore&);
StringStore& operator=(const StringStore&);
static const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2
// This is an overlay b/c the underlying data needs to be any size, // This is an overlay b/c the underlying data needs to be any size,
// and alloc'd in one chunk. data can't be a separate dynamic chunk. // and alloc'd in one chunk. data can't be a separate dynamic chunk.
struct MemChunk struct MemChunk
@ -191,7 +184,14 @@ private:
uint8_t data[]; uint8_t data[];
}; };
std::vector<boost::shared_array<uint8_t> > mem; private:
std::string empty_str;
StringStore(const StringStore&);
StringStore& operator=(const StringStore&);
static constexpr const uint32_t CHUNK_SIZE = 64 * 1024; // allocators like powers of 2
std::vector<boost::shared_array<uint8_t>> mem;
// To store strings > 64KB (BLOB/TEXT) // To store strings > 64KB (BLOB/TEXT)
std::vector<boost::shared_array<uint8_t> > longStrings; std::vector<boost::shared_array<uint8_t> > longStrings;