1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-05 16:15:50 +03:00

MCOL-3563: Modify all *TASK tests to use processtask and test connection failures and short message error handling on disconnect.

This commit is contained in:
benthompson15
2019-11-14 13:08:55 -06:00
parent 3e3e0ed89b
commit 8d6c2a33bf

View File

@@ -33,6 +33,7 @@
#include "S3Storage.h" #include "S3Storage.h"
#include "Utilities.h" #include "Utilities.h"
#include "Synchronizer.h" #include "Synchronizer.h"
#include "ProcessTask.h"
#include <iostream> #include <iostream>
#include <stdlib.h> #include <stdlib.h>
@@ -189,7 +190,7 @@ void makeConnection()
t.join(); t.join();
} }
bool opentask(bool connectionTest=false) bool opentask(bool connectionTest=false, bool errorHandle=false)
{ {
// going to rely on msgs being smaller than the buffer here // going to rely on msgs being smaller than the buffer here
int err=0; int err=0;
@@ -209,20 +210,39 @@ bool opentask(bool connectionTest=false)
cout << "open file " << filename << endl; cout << "open file " << filename << endl;
::unlink(filename); ::unlink(filename);
ssize_t result = ::write(sessionSock, cmd, hdr->payloadLen);
assert(result==(hdr->payloadLen));
OpenTask o(clientSock, hdr->payloadLen);
o.run();
if (connectionTest)
hdr->payloadLen -= 2;
size_t result = ::write(sessionSock, cmd, hdr->payloadLen);
assert(result==(hdr->payloadLen));
if (errorHandle)
clientSock = 9999999;
if (connectionTest)
hdr->payloadLen += 2;
ProcessTask pt(clientSock, hdr->payloadLen);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
close(sessionSock); sleep(1);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); close(sessionSock);
assert(err == -1); close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
// read the response // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
@@ -237,13 +257,14 @@ bool opentask(bool connectionTest=false)
assert(_stat->st_uid == getuid()); assert(_stat->st_uid == getuid());
assert(_stat->st_gid == getgid()); assert(_stat->st_gid == getgid());
assert(_stat->st_size == 0); assert(_stat->st_size == 0);
} /* verify the file is there */
/* verify the file is there */ string metaPath = Config::get()->getValue("ObjectStorage", "metadata_path");
string metaPath = Config::get()->getValue("ObjectStorage", "metadata_path"); assert(!metaPath.empty());
assert(!metaPath.empty()); metaPath += string("/" + prefix + "/" + testFile + ".meta");
metaPath += string("/" + prefix + "/" + testFile + ".meta");
assert(boost::filesystem::exists(metaPath));
}
assert(boost::filesystem::exists(metaPath));
cout << "opentask OK" << endl; cout << "opentask OK" << endl;
return true; return true;
} }
@@ -486,7 +507,7 @@ bool appendtask()
return true; return true;
} }
void unlinktask(bool connectionTest=false) void unlinktask(bool connectionTest=false, bool errorHandle=false)
{ {
int err=0; int err=0;
// make a meta file and delete it // make a meta file and delete it
@@ -498,8 +519,10 @@ void unlinktask(bool connectionTest=false)
IOCoordinator *ioc = IOCoordinator::get(); IOCoordinator *ioc = IOCoordinator::get();
bf::path fullPathMeta = ioc->getMetadataPath()/(string(Metafilename) + ".meta"); bf::path fullPathMeta = ioc->getMetadataPath()/(string(Metafilename) + ".meta");
bf::remove(fullPathMeta); bf::remove(fullPathMeta);
MetadataFile meta(Metafilename); MetadataFile meta(Metafilename);
meta.writeMetadata();
assert(bf::exists(fullPathMeta)); assert(bf::exists(fullPathMeta));
uint8_t buf[1024]; uint8_t buf[1024];
@@ -509,19 +532,38 @@ void unlinktask(bool connectionTest=false)
cmd->flen = strlen(filename); cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen); memcpy(&cmd->filename, filename, cmd->flen);
UnlinkTask u(clientSock, sizeof(unlink_cmd) + cmd->flen); if (connectionTest)
cmd->flen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen); size_t result = ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen);
assert(result==(sizeof(unlink_cmd) + cmd->flen)); assert(result==(sizeof(unlink_cmd) + cmd->flen));
u.run();
if (errorHandle)
clientSock = 9999999;
if (connectionTest)
cmd->flen += 2;
ProcessTask pt(clientSock, sizeof(unlink_cmd) + cmd->flen);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
close(sessionSock); sleep(1);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); close(sessionSock);
assert(err == -1); close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
// read the response // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
@@ -530,11 +572,10 @@ void unlinktask(bool connectionTest=false)
assert(resp->header.payloadLen == sizeof(ssize_t)); assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0); assert(resp->header.flags == 0);
assert(resp->returnCode == 0); assert(resp->returnCode == 0);
// confirm it no longer exists
assert(!bf::exists(fullPathMeta));
} }
// confirm it no longer exists
assert(!bf::exists(fullPathMeta));
// delete it again, make sure we get an error message & reasonable error code // delete it again, make sure we get an error message & reasonable error code
// Interesting. boost::filesystem::remove() doesn't consider it an error if the file doesn't // Interesting. boost::filesystem::remove() doesn't consider it an error if the file doesn't
// exist. Need to look into the reasoning for that, and decide whether IOC // exist. Need to look into the reasoning for that, and decide whether IOC
@@ -545,7 +586,7 @@ void unlinktask(bool connectionTest=false)
cmd->opcode = UNLINK; cmd->opcode = UNLINK;
cmd->flen = strlen(filename); cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen); memcpy(&cmd->filename, filename, cmd->flen);
UnlinkTask u2(clientSock, sizeof(unlink_cmd) + cmd->flen); UnlinkTask u2(clientSock, sizeof(unlink_cmd) + cmd->flen);
ssize_t result = ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen); ssize_t result = ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen);
assert(result==(sizeof(unlink_cmd) + cmd->flen)); assert(result==(sizeof(unlink_cmd) + cmd->flen));
@@ -567,7 +608,7 @@ void unlinktask(bool connectionTest=false)
cout << "unlink task OK" << endl; cout << "unlink task OK" << endl;
} }
bool stattask(bool connectionTest=false) bool stattask(bool connectionTest=false, bool errorHandle=false)
{ {
int err=0; int err=0;
bf::path fullPath = homepath / prefix / "stattest1"; bf::path fullPath = homepath / prefix / "stattest1";
@@ -586,20 +627,39 @@ bool stattask(bool connectionTest=false)
cmd->flen = filename.length(); cmd->flen = filename.length();
strcpy((char *) cmd->filename, filename.c_str()); strcpy((char *) cmd->filename, filename.c_str());
if (connectionTest)
cmd->flen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->flen); size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->flen);
assert(result==(sizeof(*cmd) + cmd->flen)); assert(result==(sizeof(*cmd) + cmd->flen));
StatTask s(clientSock, sizeof(*cmd) + cmd->flen);
s.run();
if (errorHandle)
clientSock = 9999999;
if (connectionTest)
cmd->flen += 2;
ProcessTask pt(clientSock, sizeof(*cmd) + cmd->flen);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
close(sessionSock); sleep(1);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); close(sessionSock);
assert(err == -1); close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
// read the response // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
@@ -762,7 +822,7 @@ bool IOCTruncate()
} }
bool truncatetask(bool connectionTest=false) bool truncatetask(bool connectionTest=false, bool errorHandle=false)
{ {
IOCoordinator *ioc = IOCoordinator::get(); IOCoordinator *ioc = IOCoordinator::get();
Cache *cache = Cache::get(); Cache *cache = Cache::get();
@@ -786,20 +846,38 @@ bool truncatetask(bool connectionTest=false)
cmd->flen = strlen(filename); cmd->flen = strlen(filename);
strcpy((char *) cmd->filename, filename); strcpy((char *) cmd->filename, filename);
if (connectionTest)
cmd->flen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->flen); size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->flen);
assert(result==(sizeof(*cmd) + cmd->flen)); assert(result==(sizeof(*cmd) + cmd->flen));
TruncateTask t(clientSock, sizeof(*cmd) + cmd->flen);
t.run();
if (errorHandle)
clientSock = 9999999;
if (connectionTest)
cmd->flen += 2;
ProcessTask pt(clientSock, sizeof(*cmd) + cmd->flen);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
close(sessionSock); sleep(1);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); close(sessionSock);
assert(err == -1); close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
// read the response // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
@@ -808,19 +886,18 @@ bool truncatetask(bool connectionTest=false)
assert(resp->header.flags == 0); assert(resp->header.flags == 0);
assert(resp->header.payloadLen == sizeof(ssize_t)); assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->returnCode == 0); assert(resp->returnCode == 0);
// reload the metadata, check that it is 1000 bytes
meta = MetadataFile(Metafilename);
assert(meta.getLength() == 1000);
} }
// reload the metadata, check that it is 1000 bytes
meta = MetadataFile(Metafilename);
assert(meta.getLength() == 1000);
cache->reset(); cache->reset();
::unlink(metaFullName.c_str()); ::unlink(metaFullName.c_str());
cout << "truncate task OK" << endl; cout << "truncate task OK" << endl;
return true; return true;
} }
bool listdirtask(bool connectionTest=false) bool listdirtask(bool connectionTest=false,bool errorHandle=false)
{ {
IOCoordinator *ioc = IOCoordinator::get(); IOCoordinator *ioc = IOCoordinator::get();
const bf::path metaPath = ioc->getMetadataPath(); const bf::path metaPath = ioc->getMetadataPath();
@@ -848,26 +925,46 @@ bool listdirtask(bool connectionTest=false)
} }
uint8_t buf[8192]; uint8_t buf[8192];
memset(buf,0,8192);
listdir_cmd *cmd = (listdir_cmd *) buf; listdir_cmd *cmd = (listdir_cmd *) buf;
cmd->opcode = LIST_DIRECTORY; cmd->opcode = LIST_DIRECTORY;
cmd->plen = strlen(relPath); cmd->plen = strlen(relPath);
memcpy(cmd->path, relPath, cmd->plen); memcpy(cmd->path, relPath, cmd->plen);
if (connectionTest)
cmd->plen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->plen); size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->plen);
assert(result==(sizeof(*cmd) + cmd->plen)); assert(result==(sizeof(*cmd) + cmd->plen));
ListDirectoryTask l(clientSock, sizeof(*cmd) + cmd->plen); if (errorHandle)
l.run(); clientSock = 9999999;
if (connectionTest)
cmd->plen += 2;
//ListDirectoryTask l(clientSock, sizeof(*cmd) + cmd->plen);
//l.run();
ProcessTask pt(clientSock, sizeof(*cmd) + cmd->plen);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
sleep(1);
close(sessionSock); close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1); assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 8192, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
/* going to keep this simple. Don't run this in a big dir. */ /* going to keep this simple. Don't run this in a big dir. */
/* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */ /* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */
err = ::recv(sessionSock, buf, 8192, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 8192, MSG_DONTWAIT);
@@ -895,30 +992,53 @@ bool listdirtask(bool connectionTest=false)
} }
bf::remove_all(tmpPath); bf::remove_all(tmpPath);
cout << "listdir task OK" << endl;
return true; return true;
} }
void pingtask(bool connectionTest=false) void pingtask(bool connectionTest=false, bool errorHandle=false)
{ {
int err=0; int err=0;
uint8_t buf[1024]; uint8_t buf[1024];
ping_cmd *cmd = (ping_cmd *) buf; ping_cmd *cmd = (ping_cmd *) buf;
cmd->opcode = PING; cmd->opcode = PING;
size_t len = sizeof(*cmd);
if (connectionTest)
len -= 2;
ssize_t result = ::write(sessionSock, cmd, sizeof(*cmd)); ssize_t result = ::write(sessionSock, cmd, sizeof(*cmd));
assert(result==(sizeof(*cmd))); assert(result==(sizeof(*cmd)));
PingTask p(clientSock, sizeof(*cmd));
p.run();
if (errorHandle)
clientSock = 9999999;
if (connectionTest)
len += 2;
ProcessTask pt(clientSock, len);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
close(sessionSock); sleep(1);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); close(sessionSock);
assert(err == -1); close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
// read the response // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
@@ -932,7 +1052,7 @@ void pingtask(bool connectionTest=false)
cout << "pingtask OK" << endl; cout << "pingtask OK" << endl;
} }
bool copytask(bool connectionTest=false) bool copytask(bool connectionTest=false, bool errorHandle=false)
{ {
/* /*
make a file make a file
@@ -962,20 +1082,41 @@ bool copytask(bool connectionTest=false)
strncpy(file2->filename, dest, file2->flen); strncpy(file2->filename, dest, file2->flen);
uint len = (uint64_t) &file2->filename[file2->flen] - (uint64_t) buf; uint len = (uint64_t) &file2->filename[file2->flen] - (uint64_t) buf;
if (connectionTest)
len -= 2;
ssize_t result = ::write(sessionSock, buf, len); ssize_t result = ::write(sessionSock, buf, len);
assert(result==len); assert(result==len);
CopyTask c(clientSock, len);
c.run();
int err=0; int err=0;
if (errorHandle)
clientSock = 9999999;
if (connectionTest)
len += 2;
ProcessTask pt(clientSock, len);
boost::thread t(pt);
if (connectionTest) if (connectionTest)
{ {
close(sessionSock); sleep(1);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); close(sessionSock);
assert(err == -1); close(clientSock);
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else if (errorHandle)
{
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(err == -1);
t.join();
} }
else else
{ {
t.join();
// read the response // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
@@ -984,13 +1125,11 @@ bool copytask(bool connectionTest=false)
assert(resp->header.payloadLen == sizeof(ssize_t)); assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0); assert(resp->header.flags == 0);
assert(resp->returnCode == 0); assert(resp->returnCode == 0);
// verify copytest2 is there
MetadataFile meta2(Metadest, MetadataFile::no_create_t(),true);
assert(meta2.exists());
} }
// verify copytest2 is there
MetadataFile meta2(Metadest, MetadataFile::no_create_t(),true);
assert(meta2.exists());
bf::path metaPath = IOCoordinator::get()->getMetadataPath(); bf::path metaPath = IOCoordinator::get()->getMetadataPath();
bf::remove(metaPath/(metaStrSrc + ".meta")); bf::remove(metaPath/(metaStrSrc + ".meta"));
bf::remove(metaPath/(metaStrDest + ".meta")); bf::remove(metaPath/(metaStrDest + ".meta"));
@@ -1861,30 +2000,32 @@ int main(int argc, char* argv[])
//opentask(); //opentask();
opentask(true); opentask(true);
cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; opentask(false,true);
makeConnection();
unlinktask(true); unlinktask(true);
cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; unlinktask(false,true);
makeConnection();
stattask(true); stattask(true);
cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; stattask(false,true);
makeConnection();
truncatetask(true); truncatetask(true);
cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; truncatetask(false,true);
makeConnection();
listdirtask(true); listdirtask(true);
cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; listdirtask(false,true);
makeConnection();
pingtask(true); pingtask(true);
cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; pingtask(false,true);
makeConnection();
copytask(true); copytask(true);
makeConnection();
copytask(false,true);
(Cache::get())->shutdown(); (Cache::get())->shutdown();
delete (Synchronizer::get()); delete (Synchronizer::get());