1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Made socket pool write the header in one write call. Added some debugging

printouts (commented).
This commit is contained in:
Patrick LeBlanc
2019-02-07 09:57:36 -06:00
parent 92f609f718
commit 4f26ce089c

View File

@ -90,11 +90,12 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
const uint8_t *inbuf = in.buf(); const uint8_t *inbuf = in.buf();
int err = 0; int err = 0;
/* TODO: make these writes not send SIGPIPE /* TODO: make these writes not send SIGPIPE */
TODO: turn at least the header bits into a single write */ storagemanager::sm_msg_header hdr;
err = ::write(sock, &storagemanager::SM_MSG_START, sizeof(storagemanager::SM_MSG_START)); hdr.type = storagemanager::SM_MSG_START;
sm_check_error; hdr.payloadLen = length;
err = ::write(sock, &length, sizeof(length)); //cout << "SP sending msg on sock " << sock << " with length = " << length << endl;
err = ::write(sock, &hdr, sizeof(hdr));
sm_check_error; sm_check_error;
while (count < length) while (count < length)
{ {
@ -103,7 +104,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
count += err; count += err;
in.advance(err); in.advance(err);
} }
//cout << "SP sent the msg" << endl; //cout << "SP sent msg with length = " << length << endl;
out->restart(); out->restart();
uint8_t *outbuf; uint8_t *outbuf;
@ -114,6 +115,7 @@ int SocketPool::send_recv(messageqcpp::ByteStream &in, messageqcpp::ByteStream *
while (1) while (1)
{ {
//cout << "SP receiving msg on sock " << sock << endl;
// here remainingBytes means the # of bytes from the previous message // here remainingBytes means the # of bytes from the previous message
err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes); err = ::read(sock, &window[remainingBytes], 8192 - remainingBytes);
sm_check_error; sm_check_error;
@ -221,6 +223,7 @@ int SocketPool::getSocket()
void SocketPool::returnSocket(const int sock) void SocketPool::returnSocket(const int sock)
{ {
boost::mutex::scoped_lock lock(mutex); boost::mutex::scoped_lock lock(mutex);
//cout << "returning socket " << sock << endl;
freeSockets.push_back(sock); freeSockets.push_back(sock);
socketAvailable.notify_one(); socketAvailable.notify_one();
} }
@ -228,6 +231,7 @@ void SocketPool::returnSocket(const int sock)
void SocketPool::remoteClosed(const int sock) void SocketPool::remoteClosed(const int sock)
{ {
boost::mutex::scoped_lock lock(mutex); boost::mutex::scoped_lock lock(mutex);
//cout << "closing socket " << sock << endl;
::close(sock); ::close(sock);
for (vector<int>::iterator i = allSockets.begin(); i != allSockets.end(); ++i) for (vector<int>::iterator i = allSockets.begin(); i != allSockets.end(); ++i)
if (*i == sock) if (*i == sock)