1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00

Merge pull request #874 from pleblanc1976/partial-msg-tests

Partial msg tests
This commit is contained in:
Andrew Hutchings
2019-09-25 17:53:14 +01:00
committed by GitHub
14 changed files with 434 additions and 123 deletions

View File

@ -35,7 +35,7 @@ AppendTask::~AppendTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \
@ -46,7 +46,7 @@ AppendTask::~AppendTask()
bool AppendTask::run() bool AppendTask::run()
{ {
SMLogging* logger = SMLogging::get(); SMLogging* logger = SMLogging::get();
bool success; int success;
uint8_t cmdbuf[1024] = {0}; uint8_t cmdbuf[1024] = {0};
ssize_t err; ssize_t err;
@ -77,14 +77,16 @@ bool AppendTask::run()
uint toRead = min(cmd->count - readCount, bufsize); uint toRead = min(cmd->count - readCount, bufsize);
success = read(&databuf[0], toRead); success = read(&databuf[0], toRead);
check_error("AppendTask read data", false); check_error("AppendTask read data", false);
readCount += toRead; if (success==0)
break;
readCount += success;
uint writePos = 0; uint writePos = 0;
while (writeCount < readCount) while (writeCount < readCount)
{ {
try try
{ {
err = ioc->append(cmd->filename, &databuf[writePos], toRead - writePos); err = ioc->append(cmd->filename, &databuf[writePos], success - writePos);
} }
catch (exception &e) catch (exception &e)
{ {
@ -112,8 +114,7 @@ bool AppendTask::run()
} }
else else
resp->returnCode = writeCount; resp->returnCode = writeCount;
success = write(*resp, payloadLen); return write(*resp, payloadLen);
return success;
} }
} }

View File

@ -35,6 +35,7 @@
#include "SMLogging.h" #include "SMLogging.h"
namespace bf = boost::filesystem;
using namespace std; using namespace std;
namespace namespace
@ -57,6 +58,17 @@ Config * Config::get()
return inst; return inst;
} }
Config * Config::get(const string &configFile)
{
if (inst)
return inst;
boost::mutex::scoped_lock s(m);
if (inst)
return inst;
inst = new Config(configFile);
return inst;
}
Config::Config() : die(false) Config::Config() : die(false)
{ {
/* This will search the current directory, /* This will search the current directory,
@ -94,6 +106,17 @@ Config::Config() : die(false)
reloader = boost::thread([this] { this->reloadThreadFcn(); }); reloader = boost::thread([this] { this->reloadThreadFcn(); });
} }
Config::Config(const string &configFile) : filename(configFile), die(false)
{
if (!bf::is_regular_file(configFile))
throw runtime_error("Config: Could not find the config file for StorageManager");
reloadInterval = boost::posix_time::seconds(60);
last_mtime = {0, 0};
reload();
reloader = boost::thread([this] { this->reloadThreadFcn(); });
}
Config::~Config() Config::~Config()
{ {
die = true; die = true;

View File

@ -37,8 +37,12 @@ class Config : public boost::noncopyable
std::string getValue(const std::string &section, const std::string &key) const; std::string getValue(const std::string &section, const std::string &key) const;
// for testing, lets caller specify a config file to use
static Config *get(const std::string &);
private: private:
Config(); Config();
Config(const std::string &);
void reload(); void reload();
void reloadThreadFcn(); void reloadThreadFcn();

View File

@ -35,7 +35,7 @@ CopyTask::~CopyTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@ -37,7 +37,7 @@ ListDirectoryTask::~ListDirectoryTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \
@ -116,7 +116,7 @@ bool ListDirectoryTask::run()
sm_response *resp = (sm_response *) buf; sm_response *resp = (sm_response *) buf;
resp->header.type = SM_MSG_START; resp->header.type = SM_MSG_START;
resp->header.payloadLen = payloadLen + sizeof(sm_response) - sizeof(sm_msg_header); // the +4 is for the length of the return code resp->header.payloadLen = payloadLen + sizeof(sm_response) - sizeof(sm_msg_header);
resp->header.flags = 0; resp->header.flags = 0;
resp->returnCode = 0; resp->returnCode = 0;
listdir_resp *r = (listdir_resp *) resp->payload; listdir_resp *r = (listdir_resp *) resp->payload;

View File

@ -75,18 +75,20 @@ uint PosixTask::getLength()
} }
// todo, need this to return an int instead of a bool b/c it modifies the length of the read // todo, need this to return an int instead of a bool b/c it modifies the length of the read
bool PosixTask::read(uint8_t *buf, uint length) int PosixTask::read(uint8_t *buf, uint length)
{ {
if (length > remainingLengthForCaller) if (length > remainingLengthForCaller)
length = remainingLengthForCaller; length = remainingLengthForCaller;
if (length == 0) if (length == 0)
return false; return 0;
uint count = 0; uint count = 0;
int err; int err;
// copy data from the local buffer first. // copy data from the local buffer first.
uint dataInBuffer = bufferLen - bufferPos; uint dataInBuffer = bufferLen - bufferPos;
if (length <= dataInBuffer) if (length <= dataInBuffer)
{ {
memcpy(buf, &localBuffer[bufferPos], length); memcpy(buf, &localBuffer[bufferPos], length);
@ -109,7 +111,7 @@ bool PosixTask::read(uint8_t *buf, uint length)
{ {
err = ::recv(sock, &buf[count], length - count, 0); err = ::recv(sock, &buf[count], length - count, 0);
if (err < 0) if (err < 0)
return false; return err;
count += err; count += err;
remainingLengthInStream -= err; remainingLengthInStream -= err;
@ -119,7 +121,7 @@ bool PosixTask::read(uint8_t *buf, uint length)
/* The caller's request has been satisfied here. If there is remaining data in the stream /* The caller's request has been satisfied here. If there is remaining data in the stream
get what's available. */ get what's available. */
primeBuffer(); primeBuffer();
return true; return count;
} }
void PosixTask::primeBuffer() void PosixTask::primeBuffer()

View File

@ -41,7 +41,7 @@ class PosixTask
void primeBuffer(); void primeBuffer();
protected: protected:
bool read(uint8_t *buf, uint length); int read(uint8_t *buf, uint length);
bool write(const std::vector<uint8_t> &buf); bool write(const std::vector<uint8_t> &buf);
bool write(sm_response &resp, uint payloadLength); bool write(sm_response &resp, uint payloadLength);
bool write(const uint8_t *buf, uint length); bool write(const uint8_t *buf, uint length);

View File

@ -35,7 +35,7 @@ ReadTask::~ReadTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@ -39,7 +39,7 @@ StatTask::~StatTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@ -35,7 +35,7 @@ TruncateTask::~TruncateTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@ -35,7 +35,7 @@ UnlinkTask::~UnlinkTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \

View File

@ -36,7 +36,7 @@ WriteTask::~WriteTask()
} }
#define check_error(msg, ret) \ #define check_error(msg, ret) \
if (!success) \ if (success<0) \
{ \ { \
handleError(msg, errno); \ handleError(msg, errno); \
return ret; \ return ret; \
@ -47,7 +47,7 @@ WriteTask::~WriteTask()
bool WriteTask::run() bool WriteTask::run()
{ {
SMLogging* logger = SMLogging::get(); SMLogging* logger = SMLogging::get();
bool success; int success;
uint8_t cmdbuf[1024] = {0}; uint8_t cmdbuf[1024] = {0};
success = read(cmdbuf, sizeof(write_cmd)); success = read(cmdbuf, sizeof(write_cmd));
@ -77,14 +77,16 @@ bool WriteTask::run()
uint toRead = min(cmd->count - readCount, bufsize); uint toRead = min(cmd->count - readCount, bufsize);
success = read(&databuf[0], toRead); success = read(&databuf[0], toRead);
check_error("WriteTask read data", false); check_error("WriteTask read data", false);
readCount += toRead; if (success==0)
break;
readCount += success;
uint writePos = 0; uint writePos = 0;
ssize_t err; ssize_t err;
while (writeCount < readCount) while (writeCount < readCount)
{ {
try try
{ {
err = ioc->write(cmd->filename, &databuf[writePos], cmd->offset + writeCount, toRead - writePos); err = ioc->write(cmd->filename, &databuf[writePos], cmd->offset + writeCount, success - writePos);
} }
catch (exception &e) catch (exception &e)
{ {
@ -113,8 +115,7 @@ bool WriteTask::run()
} }
else else
resp->returnCode = writeCount; resp->returnCode = writeCount;
success = write(*resp, payloadLen); return write(*resp, payloadLen);
return success;
} }
} }

View File

@ -189,9 +189,10 @@ void makeConnection()
t.join(); t.join();
} }
bool opentask() bool opentask(bool connectionTest=false)
{ {
// going to rely on msgs being smaller than the buffer here // going to rely on msgs being smaller than the buffer here
int err=0;
uint8_t buf[1024]; uint8_t buf[1024];
sm_msg_header *hdr = (sm_msg_header *) buf; sm_msg_header *hdr = (sm_msg_header *) buf;
open_cmd *cmd = (open_cmd *) &hdr[1]; open_cmd *cmd = (open_cmd *) &hdr[1];
@ -206,6 +207,7 @@ bool opentask()
cmd->flen = 19; cmd->flen = 19;
strcpy((char *) cmd->filename, filename); strcpy((char *) cmd->filename, filename);
cout << "open file " << filename << endl;
::unlink(filename); ::unlink(filename);
ssize_t result = ::write(sessionSock, cmd, hdr->payloadLen); ssize_t result = ::write(sessionSock, cmd, hdr->payloadLen);
assert(result==(hdr->payloadLen)); assert(result==(hdr->payloadLen));
@ -213,21 +215,29 @@ bool opentask()
OpenTask o(clientSock, hdr->payloadLen); OpenTask o(clientSock, hdr->payloadLen);
o.run(); o.run();
// read the response if (connectionTest)
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); {
sm_response *resp = (sm_response *) buf; close(sessionSock);
assert(err == sizeof(struct stat) + sizeof(sm_response)); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(resp->header.type == SM_MSG_START); assert(err == -1);
assert(resp->header.payloadLen == sizeof(struct stat) + sizeof(ssize_t)); }
assert(resp->header.flags == 0); else
assert(resp->returnCode == 0); {
struct stat *_stat = (struct stat *) resp->payload; // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
// what can we verify about the stat... sm_response *resp = (sm_response *) buf;
assert(_stat->st_uid == getuid()); assert(err == sizeof(struct stat) + sizeof(sm_response));
assert(_stat->st_gid == getgid()); assert(resp->header.type == SM_MSG_START);
assert(_stat->st_size == 0); assert(resp->header.payloadLen == sizeof(struct stat) + sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
struct stat *_stat = (struct stat *) resp->payload;
// what can we verify about the stat...
assert(_stat->st_uid == getuid());
assert(_stat->st_gid == getgid());
assert(_stat->st_size == 0);
}
/* verify the file is there */ /* verify the file is there */
string metaPath = Config::get()->getValue("ObjectStorage", "metadata_path"); string metaPath = Config::get()->getValue("ObjectStorage", "metadata_path");
assert(!metaPath.empty()); assert(!metaPath.empty());
@ -414,7 +424,7 @@ bool writetask()
assert(resp->header.payloadLen == sizeof(ssize_t)); assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0); assert(resp->header.flags == 0);
assert(resp->returnCode == 9); assert(resp->returnCode == 9);
//check file contents //check file contents
err = ::read(fd, buf, 1024); err = ::read(fd, buf, 1024);
assert(err == 9); assert(err == 9);
@ -476,8 +486,9 @@ bool appendtask()
return true; return true;
} }
void unlinktask() void unlinktask(bool connectionTest=false)
{ {
int err=0;
// make a meta file and delete it // make a meta file and delete it
bf::path fullPath = homepath / prefix / "unlinktest1"; bf::path fullPath = homepath / prefix / "unlinktest1";
string pathMeta = prefix + "/unlinktest1"; string pathMeta = prefix + "/unlinktest1";
@ -503,14 +514,23 @@ void unlinktask()
assert(result==(sizeof(unlink_cmd) + cmd->flen)); assert(result==(sizeof(unlink_cmd) + cmd->flen));
u.run(); u.run();
// verify response if (connectionTest)
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); {
sm_response *resp = (sm_response *) buf; close(sessionSock);
assert(err == sizeof(*resp)); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(resp->header.type == SM_MSG_START); assert(err == -1);
assert(resp->header.payloadLen == sizeof(ssize_t)); }
assert(resp->header.flags == 0); else
assert(resp->returnCode == 0); {
// read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
}
// confirm it no longer exists // confirm it no longer exists
assert(!bf::exists(fullPathMeta)); assert(!bf::exists(fullPathMeta));
@ -547,8 +567,9 @@ void unlinktask()
cout << "unlink task OK" << endl; cout << "unlink task OK" << endl;
} }
bool stattask() bool stattask(bool connectionTest=false)
{ {
int err=0;
bf::path fullPath = homepath / prefix / "stattest1"; bf::path fullPath = homepath / prefix / "stattest1";
string filename = fullPath.string(); string filename = fullPath.string();
string Metafilename = prefix + "/stattest1"; string Metafilename = prefix + "/stattest1";
@ -571,21 +592,30 @@ bool stattask()
StatTask s(clientSock, sizeof(*cmd) + cmd->flen); StatTask s(clientSock, sizeof(*cmd) + cmd->flen);
s.run(); s.run();
// read the response if (connectionTest)
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); {
sm_response *resp = (sm_response *) buf; close(sessionSock);
assert(err == sizeof(struct stat) + sizeof(sm_response)); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(resp->header.type == SM_MSG_START); assert(err == -1);
assert(resp->header.flags == 0); }
assert(resp->header.payloadLen == sizeof(struct stat) + sizeof(ssize_t)); else
assert(resp->returnCode == 0); {
struct stat *_stat = (struct stat *) resp->payload; // read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
// what can we verify about the stat... sm_response *resp = (sm_response *) buf;
assert(_stat->st_uid == getuid()); assert(err == sizeof(struct stat) + sizeof(sm_response));
assert(_stat->st_gid == getgid()); assert(resp->header.type == SM_MSG_START);
assert(_stat->st_size == 8192); assert(resp->header.flags == 0);
assert(resp->header.payloadLen == sizeof(struct stat) + sizeof(ssize_t));
assert(resp->returnCode == 0);
struct stat *_stat = (struct stat *) resp->payload;
// what can we verify about the stat...
assert(_stat->st_uid == getuid());
assert(_stat->st_gid == getgid());
assert(_stat->st_size == 8192);
}
::unlink(fullFilename.c_str()); ::unlink(fullFilename.c_str());
cout << "stattask OK" << endl; cout << "stattask OK" << endl;
return true; return true;
@ -732,7 +762,7 @@ bool IOCTruncate()
} }
bool truncatetask() bool truncatetask(bool connectionTest=false)
{ {
IOCoordinator *ioc = IOCoordinator::get(); IOCoordinator *ioc = IOCoordinator::get();
Cache *cache = Cache::get(); Cache *cache = Cache::get();
@ -742,7 +772,7 @@ bool truncatetask()
string metaStr = prefix + "/trunctest1"; string metaStr = prefix + "/trunctest1";
const char *filename = fullPath.string().c_str(); const char *filename = fullPath.string().c_str();
const char *Metafilename = metaStr.c_str(); const char *Metafilename = metaStr.c_str();
int err=0;
// get the metafile created // get the metafile created
string metaFullName = (metaPath/Metafilename).string() + ".meta"; string metaFullName = (metaPath/Metafilename).string() + ".meta";
::unlink(metaFullName.c_str()); ::unlink(metaFullName.c_str());
@ -762,14 +792,23 @@ bool truncatetask()
TruncateTask t(clientSock, sizeof(*cmd) + cmd->flen); TruncateTask t(clientSock, sizeof(*cmd) + cmd->flen);
t.run(); t.run();
// read the response if (connectionTest)
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); {
sm_response *resp = (sm_response *) buf; close(sessionSock);
assert(err == sizeof(sm_response)); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(resp->header.type == SM_MSG_START); assert(err == -1);
assert(resp->header.flags == 0); }
assert(resp->header.payloadLen == sizeof(ssize_t)); else
assert(resp->returnCode == 0); {
// read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->returnCode == 0);
}
// reload the metadata, check that it is 1000 bytes // reload the metadata, check that it is 1000 bytes
meta = MetadataFile(Metafilename); meta = MetadataFile(Metafilename);
@ -781,7 +820,7 @@ bool truncatetask()
return true; return true;
} }
bool listdirtask() bool listdirtask(bool connectionTest=false)
{ {
IOCoordinator *ioc = IOCoordinator::get(); IOCoordinator *ioc = IOCoordinator::get();
const bf::path metaPath = ioc->getMetadataPath(); const bf::path metaPath = ioc->getMetadataPath();
@ -821,36 +860,47 @@ bool listdirtask()
ListDirectoryTask l(clientSock, sizeof(*cmd) + cmd->plen); ListDirectoryTask l(clientSock, sizeof(*cmd) + cmd->plen);
l.run(); l.run();
/* going to keep this simple. Don't run this in a big dir. */ if (connectionTest)
/* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */
err = ::recv(sessionSock, buf, 8192, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err > 0);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
listdir_resp *r = (listdir_resp *) resp->payload;
assert(r->elements == 10);
int off = sizeof(sm_response) + sizeof(listdir_resp);
uint fileCounter = 0;
while (off < err)
{ {
listdir_resp_entry *e = (listdir_resp_entry *) &buf[off]; close(sessionSock);
//cout << "len = " << e->flen << endl; err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(off + e->flen + sizeof(listdir_resp_entry) < 8192); assert(err == -1);
string file(e->filename, e->flen);
assert(files.find((tmpPath/file).string()) != files.end());
fileCounter++;
//cout << "name = " << file << endl;
off += e->flen + sizeof(listdir_resp_entry);
} }
assert(fileCounter == r->elements); else
{
/* going to keep this simple. Don't run this in a big dir. */
/* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */
err = ::recv(sessionSock, buf, 8192, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err > 0);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
listdir_resp *r = (listdir_resp *) resp->payload;
assert(r->elements == 10);
int off = sizeof(sm_response) + sizeof(listdir_resp);
uint fileCounter = 0;
while (off < err)
{
listdir_resp_entry *e = (listdir_resp_entry *) &buf[off];
//cout << "len = " << e->flen << endl;
assert(off + e->flen + sizeof(listdir_resp_entry) < 8192);
string file(e->filename, e->flen);
assert(files.find((tmpPath/file).string()) != files.end());
fileCounter++;
//cout << "name = " << file << endl;
off += e->flen + sizeof(listdir_resp_entry);
}
assert(fileCounter == r->elements);
}
bf::remove_all(tmpPath); bf::remove_all(tmpPath);
return true; return true;
} }
void pingtask() void pingtask(bool connectionTest=false)
{ {
int err=0;
uint8_t buf[1024]; uint8_t buf[1024];
ping_cmd *cmd = (ping_cmd *) buf; ping_cmd *cmd = (ping_cmd *) buf;
cmd->opcode = PING; cmd->opcode = PING;
@ -861,18 +911,28 @@ void pingtask()
PingTask p(clientSock, sizeof(*cmd)); PingTask p(clientSock, sizeof(*cmd));
p.run(); p.run();
// read the response if (connectionTest)
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); {
sm_response *resp = (sm_response *) buf; close(sessionSock);
assert(err == sizeof(sm_response)); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(resp->header.type == SM_MSG_START); assert(err == -1);
assert(resp->header.payloadLen == sizeof(ssize_t)); }
assert(resp->header.flags == 0); else
assert(resp->returnCode == 0); {
// read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
}
cout << "pingtask OK" << endl; cout << "pingtask OK" << endl;
} }
bool copytask() bool copytask(bool connectionTest=false)
{ {
/* /*
make a file make a file
@ -907,15 +967,25 @@ bool copytask()
CopyTask c(clientSock, len); CopyTask c(clientSock, len);
c.run(); c.run();
int err=0;
// read the response if (connectionTest)
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT); {
sm_response *resp = (sm_response *) buf; close(sessionSock);
assert(err == sizeof(sm_response)); err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
assert(resp->header.type == SM_MSG_START); assert(err == -1);
assert(resp->header.payloadLen == sizeof(ssize_t)); }
assert(resp->header.flags == 0); else
assert(resp->returnCode == 0); {
// read the response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
}
// verify copytest2 is there // verify copytest2 is there
MetadataFile meta2(Metadest, MetadataFile::no_create_t(),true); MetadataFile meta2(Metadest, MetadataFile::no_create_t(),true);
@ -1485,8 +1555,8 @@ void IOCCopyFile2()
bf::path fullPath = homepath / prefix / "not-there"; bf::path fullPath = homepath / prefix / "not-there";
const char *source = fullPath.string().c_str(); const char *source = fullPath.string().c_str();
fullPath = homepath / prefix / "not-there2"; bf::path fullPath2 = homepath / prefix / "not-there2";
const char *dest = fullPath.string().c_str(); const char *dest = fullPath2.string().c_str();
bf::path metaPath = ioc->getMetadataPath(); bf::path metaPath = ioc->getMetadataPath();
bf::remove(metaPath/prefix/"not-there.meta"); bf::remove(metaPath/prefix/"not-there.meta");
@ -1587,6 +1657,93 @@ void bigMergeJournal1()
assert(buf); assert(buf);
} }
// This should write an incomplete msg(s) to make sure SM does the right thing. Not
// done yet, handing this off to Ben.
void shortMsg()
{
IOCoordinator *ioc = IOCoordinator::get();
struct stat _stat;
bf::path fullPath = homepath / prefix / "writetest1";
const char *filename = fullPath.string().c_str();
::unlink(filename);
ioc->open(filename,O_WRONLY | O_CREAT,&_stat);
size_t size = 27;
uint8_t bufWrite[(sizeof(write_cmd)+std::strlen(filename)+size)];
sm_msg_header *hdrWrite = (sm_msg_header *) bufWrite;
write_cmd *cmdWrite = (write_cmd *) &hdrWrite[1];
uint8_t *dataWrite;
cmdWrite->opcode = WRITE;
cmdWrite->offset = 0;
cmdWrite->count = size;
cmdWrite->flen = std::strlen(filename);
memcpy(&cmdWrite->filename, filename, cmdWrite->flen);
dataWrite = (uint8_t *) &cmdWrite->filename[cmdWrite->flen];
memcpy(dataWrite, "123456789123456789123456789", cmdWrite->count);
hdrWrite->type = SM_MSG_START;
hdrWrite->payloadLen = sizeof(*cmdWrite) + cmdWrite->flen + 9;
WriteTask w(clientSock, hdrWrite->payloadLen);
ssize_t result = ::write(sessionSock, cmdWrite, hdrWrite->payloadLen);
assert(result==(hdrWrite->payloadLen));
w.run();
// verify response
int err = ::recv(sessionSock, bufWrite, 1024, MSG_DONTWAIT);
sm_response *resp = (sm_response *) bufWrite;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
uint8_t bufAppend[(sizeof(append_cmd)+std::strlen(filename)+size)];
uint8_t *dataAppend;
sm_msg_header *hdrAppend = (sm_msg_header *) bufAppend;
append_cmd *cmdAppend = (append_cmd *) &hdrAppend[1];
cmdAppend->opcode = APPEND;
cmdAppend->count = size;
cmdAppend->flen = std::strlen(filename);
memcpy(&cmdAppend->filename, filename, cmdAppend->flen);
dataAppend = (uint8_t *) &cmdAppend->filename[cmdAppend->flen];
memcpy(dataAppend, "123456789123456789123456789", cmdAppend->count);
hdrAppend->type = SM_MSG_START;
hdrAppend->payloadLen = sizeof(*cmdAppend) + cmdAppend->flen + 9;
AppendTask a(clientSock, hdrAppend->payloadLen);
err = ::write(sessionSock, cmdAppend, hdrAppend->payloadLen);
a.run();
// verify response
err = ::recv(sessionSock, bufAppend, 1024, MSG_DONTWAIT);
resp = (sm_response *) bufAppend;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
ioc->unlink(fullPath.string().c_str());
cout << "shortWriteMsg Test OK" << endl;
}
// write and append are the biggest vulnerabilities here b/c those msgs could be sent in multiple
// pieces, are much larger, and thus if there is a crash mid-message it's most likely to happen
// during a call to write/append().
// it may not even be possible for CS to write a partial open/stat/read/etc msg, but that should be
// tested as well.
void shortMsgTests()
{
shortMsg();
}
int main(int argc, char* argv[]) int main(int argc, char* argv[])
{ {
@ -1617,6 +1774,18 @@ int main(int argc, char* argv[])
} }
} }
if (!bf::is_regular_file("test_data/storagemanager.cnf"))
{
cerr << "This should be run in a dir where ./test_data/storagemanager.cnf exists" << endl;
exit(1);
}
Config *config = Config::get("test_data/storagemanager.cnf");
cout << "Cleaning out debris from previous runs" << endl;
bf::remove_all(config->getValue("ObjectStorage", "metadata_path"));
bf::remove_all(config->getValue("ObjectStorage", "journal_path"));
bf::remove_all(config->getValue("LocalStorage", "path"));
bf::remove_all(config->getValue("Cache", "path"));
cout << "connecting" << endl; cout << "connecting" << endl;
makeConnection(); makeConnection();
cout << "connected" << endl; cout << "connected" << endl;
@ -1650,12 +1819,11 @@ int main(int argc, char* argv[])
// this starts in one object and crosses into new object // this starts in one object and crosses into new object
metadataJournalTest_append((7*sizeKB)); metadataJournalTest_append((7*sizeKB));
//writetask(); //writetask();
//appendtask(); //appendtask();
unlinktask(); unlinktask();
stattask(); stattask();
truncatetask(); // currently waiting on IOC::write() to be completed. truncatetask();
listdirtask(); listdirtask();
pingtask(); pingtask();
copytask(); copytask();
@ -1666,17 +1834,57 @@ int main(int argc, char* argv[])
replicatorTest(); replicatorTest();
syncTest1(); syncTest1();
s3storageTest1();
IOCReadTest1(); IOCReadTest1();
IOCTruncate(); IOCTruncate();
IOCUnlink(); IOCUnlink();
IOCCopyFile(); IOCCopyFile();
shortMsgTests();
// For the moment, this next one just verifies no error happens as reported by the fcns called. // For the moment, this next one just verifies no error happens as reported by the fcns called.
// It doesn't verify the result yet. // It doesn't verify the result yet.
bigMergeJournal1(); bigMergeJournal1();
// skip the s3 test if s3 is not configured
if (config->getValue("S3", "region") != "")
{
s3storageTest1();
}
else
cout << "To run the S3Storage unit tests, configure the S3 section of test-data/storagemanager.cnf"
<< endl;
cout << "Cleanup";
metadataJournalTestCleanup(); metadataJournalTestCleanup();
cout << " DONE" << endl;
cout << "Testing connection loss..." << endl;
//opentask();
opentask(true);
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
unlinktask(true);
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
stattask(true);
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
truncatetask(true);
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
listdirtask(true);
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
pingtask(true);
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
copytask(true);
(Cache::get())->shutdown(); (Cache::get())->shutdown();
delete (Synchronizer::get()); delete (Synchronizer::get());

View File

@ -0,0 +1,72 @@
[ObjectStorage]
service = LocalStorage
# This is a tuneable value, but also implies a maximum capacity.
# Each file managed by StorageManager is broken into chunks of
# object_size bytes. Each chunk is stored in the cache as a file,
# so the filesystem the cache is put on needs to have enough inodes to
# support at least cache_size/object_size files.
#
# Regarding tuning, object stores do not support modifying stored data;
# entire objects must be replaced on modification, and entire
# objects are fetched on read. An access pattern that includes
# frequently accessing small amounts of data will benefit from
# a smaller object_size. An access pattern where data
# is accessed in large chunks will benefit from a larger object_size.
#
# Another limitation to consider is the get/put rate imposed by the
# cloud provider. If that is the limitation, increasing object_size
# will result in higher transfer rates.
# object_size set to 8M for storagemanager unit_tests to specifically
# test metadata and object edge cases
object_size = 8K
metadata_path = ${HOME}/sm-unittest/metadata
journal_path = ${HOME}/sm-unittest/journal
max_concurrent_downloads = 20
max_concurrent_uploads = 20
# This is the depth of the common prefix that all files managed by SM have
# Ex: /usr/local/mariadb/columnstore/data1, and
# /usr/local/mariadb/columnstore/data2 differ at the 5th directory element,
# so they have a common prefix depth of 4.
#
# This value is used to manage the ownership of prefixes between
# StorageManager instances that sharing a filesystem.
#
# -1 is a special value indicating that there is no filesystem shared
# between SM instances.
common_prefix_depth = 1
[S3]
# region = some_region
# bucket = some_bucket
# Specify the endpoint to connect to if using an S3 compatible object
# store like Google Cloud Storage or IBM's Cleversafe.
# endpoint = <optional endpoint to use>
# optional prefix for objects; using this will hurt performance
# prefix = cs/
# Keys for S3 can also be set through the environment vars
# AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
# StorageManager will prioritize these config values over envvars.
# aws_access_key_id =
# aws_secret_access_key =
[LocalStorage]
path = ${HOME}/sm-unittest/fake-cloud
# introduce latency to fake-cloud operations. Useful for debugging.
fake_latency = n
# values are randomized between 1 and max_latency in microseconds.
# values between 30000-50000 roughly simulate observed latency of S3
# access from an EC2 node.
max_latency = 50000
[Cache]
cache_size = 2g
path = ${HOME}/sm-unittest/cache