1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-12-20 01:42:27 +03:00

A couple possible fixes for SessionManager

Made stat not print errors to stdout.
This commit is contained in:
Patrick LeBlanc
2019-02-06 09:50:20 -06:00
parent 491ecc8269
commit 578b680db4
3 changed files with 28 additions and 16 deletions

View File

@@ -167,6 +167,7 @@ void PosixTask::consumeMsg()
while (remainingLengthInStream > 0) while (remainingLengthInStream > 0)
{ {
cout << "ERROR: eating data" << endl;
err = ::recv(sock, buf, min(remainingLengthInStream, 1024), 0); err = ::recv(sock, buf, min(remainingLengthInStream, 1024), 0);
if (err <= 0) { if (err <= 0) {
remainingLengthInStream = 0; remainingLengthInStream = 0;

View File

@@ -211,6 +211,9 @@ int SessionManager::start()
uint remainingBytes = 0; uint remainingBytes = 0;
uint endOfData, i; uint endOfData, i;
int peakLength,len; int peakLength,len;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100000000; // .1 sec
while(true) while(true)
{ {
peakLength = ::recv(fds[socketIncr].fd, &recv_buffer[remainingBytes], sizeof(recv_buffer)-remainingBytes, MSG_PEEK | MSG_DONTWAIT); peakLength = ::recv(fds[socketIncr].fd, &recv_buffer[remainingBytes], sizeof(recv_buffer)-remainingBytes, MSG_PEEK | MSG_DONTWAIT);
@@ -220,16 +223,20 @@ int SessionManager::start()
{ {
perror("recv() failed"); perror("recv() failed");
closeConn = true; closeConn = true;
break;
} }
break; // let's not saturate the system
nanosleep(&ts, NULL);
continue;
} }
//cout << "recv got " << peakLength << " bytes" << endl; //cout << "recv got " << peakLength << " bytes" << endl;
endOfData = remainingBytes + peakLength; endOfData = remainingBytes + peakLength;
if (endOfData < 8) if (endOfData < 8)
{ {
//read this snippet and keep going //read this snippet and keep going
remainingBytes = endOfData;
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
remainingBytes = endOfData;
assert(len == peakLength);
continue; continue;
} }
@@ -243,7 +250,6 @@ int SessionManager::start()
recvMsgLength = *((uint *) &recv_buffer[i+4]); recvMsgLength = *((uint *) &recv_buffer[i+4]);
//cout << "got length = " << recvMsgLength << endl; //cout << "got length = " << recvMsgLength << endl;
recvMsgStart = i+8; recvMsgStart = i+8;
remainingBytes = 0;
//printf(" recvMsgLength %d recvMsgStart %d endofData %d\n", recvMsgLength,recvMsgStart,endOfData); //printf(" recvMsgLength %d recvMsgStart %d endofData %d\n", recvMsgLength,recvMsgStart,endOfData);
// if >= endOfData then the start of the message data is the beginning of next message // if >= endOfData then the start of the message data is the beginning of next message
if (recvMsgStart >= endOfData) if (recvMsgStart >= endOfData)
@@ -257,10 +263,11 @@ int SessionManager::start()
{ {
printf("No SM_MSG_START\n"); printf("No SM_MSG_START\n");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength); len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], peakLength);
assert(len == peakLength);
// we know the msg header isn't in position [0, endOfData - i), so throw that out // we know the msg header isn't in position [0, endOfData - i), so throw that out
// and copy [i, endOfData) to the front of the buffer to be // and copy [i, endOfData) to the front of the buffer to be
// checked by the next iteration. // checked by the next iteration.
memmove(recv_buffer, &recv_buffer[i], endOfData - i); memcpy(recv_buffer, &recv_buffer[i], endOfData - i);
remainingBytes = endOfData - i; remainingBytes = endOfData - i;
} }
else else
@@ -270,7 +277,11 @@ int SessionManager::start()
if (recvMsgStart > 0) if (recvMsgStart > 0)
{ {
//printf("SM_MSG_START data is here\n"); //printf("SM_MSG_START data is here\n");
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], recvMsgStart); // how many to consume here...
// recvMsgStart is the position in the buffer
// peakLength is the amount peeked this time
// remainingBytes is the amount carried over from previous reads
len = ::read(fds[socketIncr].fd, &recv_buffer[remainingBytes], recvMsgStart - remainingBytes);
} }
else else
{ {
@@ -281,8 +292,6 @@ int SessionManager::start()
fds[socketIncr].events = 0; fds[socketIncr].events = 0;
//Pass the socket to CRP and the message length. next read will be start of message //Pass the socket to CRP and the message length. next read will be start of message
crp->processRequest(fds[socketIncr].fd,recvMsgLength); crp->processRequest(fds[socketIncr].fd,recvMsgLength);
recvMsgLength = 0;
remainingBytes = 0;
/* /*
//Doing this to work with cloudio_component_test //Doing this to work with cloudio_component_test
@@ -292,8 +301,8 @@ int SessionManager::start()
uint32_t response[4] = { storagemanager::SM_MSG_START, 8, (uint32_t ) -1, EINVAL }; uint32_t response[4] = { storagemanager::SM_MSG_START, 8, (uint32_t ) -1, EINVAL };
len = ::send(fds[socketIncr].fd, response, 16, 0); len = ::send(fds[socketIncr].fd, response, 16, 0);
*/ */
break;
} }
break;
} }
if (closeConn) if (closeConn)

View File

@@ -47,16 +47,18 @@ bool StatTask::run()
#endif #endif
int err = ioc->stat(cmd->filename, (struct stat *) resp->payload); int err = ioc->stat(cmd->filename, (struct stat *) resp->payload);
if (err)
{
handleError("StatTask stat", errno);
return true;
}
resp->type = SM_MSG_START; resp->type = SM_MSG_START;
resp->payloadLen = sizeof(struct stat) + 4; resp->returnCode = err;
resp->returnCode = 0; if (!err) {
success = write(buf, sizeof(*resp) + sizeof(struct stat)); resp->payloadLen = sizeof(struct stat) + 4;
success = write(buf, sizeof(*resp) + sizeof(struct stat));
}
else {
resp->payloadLen = 8;
*((int32_t *) resp->payload) = errno;
success = write(buf, SM_HEADER_LEN + resp->payloadLen);
}
return success; return success;
} }