1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-30 19:23:07 +03:00
Files
mariadb-columnstore-engine/storage-manager/src/unit_tests.cpp
2023-04-14 10:33:27 +00:00

2037 lines
61 KiB
C++

/* Copyright (C) 2019 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "OpenTask.h"
#include "WriteTask.h"
#include "AppendTask.h"
#include "UnlinkTask.h"
#include "StatTask.h"
#include "TruncateTask.h"
#include "ListDirectoryTask.h"
#include "PingTask.h"
#include "CopyTask.h"
#include "messageFormat.h"
#include "Config.h"
#include "Cache.h"
#include "LocalStorage.h"
#include "MetadataFile.h"
#include "Replicator.h"
#include "S3Storage.h"
#include "Utilities.h"
#include "Synchronizer.h"
#include "ProcessTask.h"
#include <iostream>
#include <stdlib.h>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <string.h>
#include <fcntl.h>
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <algorithm>
#include <random>
#undef NDEBUG
#include <cassert>
using namespace storagemanager;
using namespace std;
namespace bf = boost::filesystem;
void printUsage()
{
cout << "MariaDB Columnstore Storage Manager Unit Test\n"
<< endl
<< "Usage unit_test [OPTION] " << endl
<< "-d [test_data] Location of test_data included with source code" << endl
<< " Default = ./" << endl
<< "-p [prefix] This directory will be used as scratch space for tests run" << endl
<< " Default = unittest" << endl;
}
struct scoped_closer
{
scoped_closer(int f) : fd(f)
{
}
~scoped_closer()
{
int s_errno = errno;
::close(fd);
errno = s_errno;
}
int fd;
};
// (ints) 0 1 2 3 ... 2048
void makeTestObject(const char* dest)
{
int objFD = open(dest, O_WRONLY | O_CREAT | O_TRUNC, 0600);
assert(objFD >= 0);
scoped_closer s1(objFD);
for (int i = 0; i < 2048; i++)
assert(write(objFD, &i, 4) == 4);
}
// the merged version should look like
// (ints) 0 1 2 3 4 0 1 2 3 4 10 11 12 13...
void makeTestJournal(const char* dest)
{
int journalFD = open(dest, O_WRONLY | O_CREAT | O_TRUNC, 0600);
assert(journalFD >= 0);
scoped_closer s2(journalFD);
char header[] = "{ \"version\" : 1, \"max_offset\": 39 }";
size_t result = write(journalFD, header, strlen(header) + 1);
assert(result == (strlen(header) + 1));
uint64_t offlen[2] = {20, 20};
result = write(journalFD, offlen, 16);
for (int i = 0; i < 5; i++)
assert(write(journalFD, &i, 4) == 4);
}
bf::path testDirPath = "./";
bf::path homepath = getenv("HOME");
string prefix = "unittest";
string testObjKey = "12345_0_8192_" + prefix + "~test-file";
string copyfileObjKey = "12345_0_8192_" + prefix + "~source";
string metaTestFile = prefix + "/test-file";
bf::path testFilePath = homepath / metaTestFile;
const char* testFile = testFilePath.string().c_str();
string _metadata =
"{ \n\
\"version\" : 1, \n\
\"revision\" : 1, \n\
\"objects\" : \n\
[ \n\
{ \n\
\"offset\" : 0, \n\
\"length\" : 8192, \n\
\"key\" : \"xxx\" \n\
} \n\
] \n\
}\n";
void makeTestMetadata(const char* dest, string& key)
{
int metaFD = open(dest, O_WRONLY | O_CREAT | O_TRUNC, 0600);
assert(metaFD >= 0);
scoped_closer sc(metaFD);
boost::algorithm::replace_all(_metadata, "xxx", key);
// need to parameterize the object name in the objects list
size_t result = write(metaFD, _metadata.c_str(), _metadata.length());
assert(result == _metadata.length());
boost::algorithm::replace_all(_metadata, key, "xxx");
}
int getSocket()
{
int sock = ::socket(AF_UNIX, SOCK_STREAM, 0);
assert(sock >= 0);
return sock;
}
int sessionSock = -1; // tester uses this end of the connection
int serverSock = -1;
int clientSock = -1; // have the Tasks use this end of the connection
void acceptConnection()
{
int err;
if (serverSock == -1)
{
serverSock = getSocket();
struct sockaddr_un sa;
memset(&sa, 0, sizeof(sa));
sa.sun_family = AF_UNIX;
memcpy(&sa.sun_path[1], "testing", 7);
err = ::bind(serverSock, (struct sockaddr*)&sa, sizeof(sa));
assert(err == 0);
err = ::listen(serverSock, 2);
assert(err == 0);
}
sessionSock = ::accept(serverSock, NULL, NULL);
assert(sessionSock > 0);
}
// connects sessionSock to clientSock
void makeConnection()
{
boost::thread t(acceptConnection);
struct sockaddr_un sa;
memset(&sa, 0, sizeof(sa));
sa.sun_family = AF_UNIX;
memcpy(&sa.sun_path[1], "testing", 7);
clientSock = ::socket(AF_UNIX, SOCK_STREAM, 0);
assert(clientSock > 0);
sleep(1); // let server thread get to accept()
int err = ::connect(clientSock, (struct sockaddr*)&sa, sizeof(sa));
assert(err == 0);
t.join();
}
bool opentask(bool connectionTest = false)
{
// going to rely on msgs being smaller than the buffer here
int err = 0;
uint8_t buf[1024];
sm_msg_header* hdr = (sm_msg_header*)buf;
open_cmd* cmd = (open_cmd*)&hdr[1];
string testFile = "metadataJournalTest";
// open/create a file named 'opentest1'
std::string filename = homepath.string() + "/" + prefix + "/" + testFile;
hdr->type = SM_MSG_START;
hdr->flags = 0;
hdr->payloadLen = sizeof(*cmd) + filename.size();
cmd->opcode = OPEN;
cmd->openmode = O_WRONLY | O_CREAT;
cmd->flen = 19;
strcpy((char*)cmd->filename, filename.c_str());
cout << "open file " << filename << endl;
::unlink(filename.c_str());
// set payload to be shorter than actual message lengh
// and send a shortened message.
if (connectionTest)
hdr->payloadLen -= 2;
size_t result = ::write(sessionSock, cmd, hdr->payloadLen);
assert(result == (hdr->payloadLen));
// set payload to be correct length again
if (connectionTest)
hdr->payloadLen += 2;
// process task will look for the full length and
// will wait on the rest of the message.
ProcessTask pt(clientSock, hdr->payloadLen);
boost::thread t(pt);
if (connectionTest)
{
// make sure the thread is waiting for the rest of the data
// then kill the connection. This will trigger the task thread
// to exit on an error handling path
sleep(1);
close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else
{
t.join();
// read the response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(struct stat) + sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(struct stat) + sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
struct stat* _stat = (struct stat*)resp->payload;
// what can we verify about the stat...
assert(_stat->st_size == 0);
/* verify the file is there */
string metaPath = Config::get()->getValue("ObjectStorage", "metadata_path");
assert(!metaPath.empty());
metaPath += string("/" + prefix + "/" + testFile + ".meta");
assert(boost::filesystem::exists(metaPath));
}
cout << "opentask OK" << endl;
return true;
}
bool replicatorTest()
{
Config* config = Config::get();
string metaPath = config->getValue("ObjectStorage", "metadata_path");
string journalPath = config->getValue("ObjectStorage", "journal_path");
string cacehPath = config->getValue("Cache", "path");
Replicator* repli = Replicator::get();
int err, fd;
string newobject = prefix + "/newobjectTest";
string newObjectJournalFullPath = journalPath + "/" + prefix + "/newobjectTest.journal";
string newObjectCacheFullPath = cacehPath + "/" + prefix + "/newobjectTest";
uint8_t buf[1024];
uint8_t data[1024];
int version = 1;
uint64_t max_offset = 0;
memcpy(data, "1234567890", 10);
string header =
(boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % max_offset).str();
// test newObject
repli->newObject(newobject, data, 0, 10);
// check file contents
fd = ::open(newObjectCacheFullPath.c_str(), O_RDONLY);
err = ::read(fd, buf, sizeof(buf));
assert(err == 10);
buf[10] = 0;
assert(!strcmp("1234567890", (const char*)buf));
cout << "replicator newObject OK" << endl;
::close(fd);
// test addJournalEntry
repli->addJournalEntry(newobject, data, 0, 10);
fd = ::open(newObjectJournalFullPath.c_str(), O_RDONLY);
err = ::read(fd, buf, sizeof(buf));
assert((uint)err == (header.length() + 1 + 16 + 10));
buf[err] = 0;
assert(!strcmp("1234567890", (const char*)buf + header.length() + 1 + 16));
cout << "replicator addJournalEntry OK" << endl;
::close(fd);
repli->remove(newObjectCacheFullPath.c_str());
repli->remove(newObjectJournalFullPath.c_str());
assert(!boost::filesystem::exists(newObjectCacheFullPath.c_str()));
cout << "replicator remove OK" << endl;
return true;
}
void metadataJournalTest(std::size_t size, off_t offset)
{
// make an empty file to write to
bf::path fullPath = homepath / prefix / "metadataJournalTest";
const char* filename = fullPath.string().c_str();
size_t buflen = sizeof(write_cmd) + std::strlen(filename) + size + sizeof(sm_msg_header);
std::vector<uint8_t> buf(buflen);
uint64_t* data;
sm_msg_header* hdr = (sm_msg_header*)buf.data();
write_cmd* cmd = (write_cmd*)&hdr[1];
cmd->opcode = WRITE;
cmd->offset = offset;
cmd->count = size;
cmd->flen = std::strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint64_t*)&cmd->filename[cmd->flen];
for (uint64_t i = 0; i < (size / sizeof(uint64_t)); i++)
{
data[i] = i;
}
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
WriteTask w(clientSock, hdr->payloadLen);
int err = ::write(sessionSock, cmd, hdr->payloadLen);
w.run();
// verify response
uint8_t bufRead[1024];
err = ::recv(sessionSock, bufRead, sizeof(bufRead), MSG_DONTWAIT);
sm_response* resp = (sm_response*)bufRead;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == (int)size);
}
void metadataJournalTest_append(std::size_t size)
{
// make an empty file to write to
bf::path fullPath = homepath / prefix / "metadataJournalTest";
const char* filename = fullPath.string().c_str();
std::vector<uint8_t> buf(sizeof(write_cmd) + std::strlen(filename) + size + sizeof(sm_msg_header));
uint64_t* data;
sm_msg_header* hdr = (sm_msg_header*)buf.data();
append_cmd* cmd = (append_cmd*)&hdr[1];
cmd->opcode = APPEND;
cmd->count = size;
cmd->flen = std::strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint64_t*)&cmd->filename[cmd->flen];
for (uint64_t i = 0; i < (size / sizeof(uint64_t)); i++)
{
data[i] = i;
}
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
AppendTask a(clientSock, hdr->payloadLen);
int err = ::write(sessionSock, cmd, hdr->payloadLen);
a.run();
// verify response
uint8_t bufRead[1024];
err = ::recv(sessionSock, bufRead, sizeof(bufRead), MSG_DONTWAIT);
sm_response* resp = (sm_response*)bufRead;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == (int)size);
}
void metadataJournalTestCleanup()
{
IOCoordinator* ioc = IOCoordinator::get();
Synchronizer* sync = Synchronizer::get();
bf::path fullPath = homepath / prefix / "metadataJournalTest";
ioc->unlink(fullPath.string().c_str());
sync->forceFlush();
}
bool writetask()
{
// make an empty file to write to
bf::path fullPath = homepath / prefix / "writetest1";
const char* filename = fullPath.string().c_str();
::unlink(filename);
int fd = ::open(filename, O_CREAT | O_RDWR, 0666);
assert(fd > 0);
scoped_closer f(fd);
uint8_t buf[1024];
sm_msg_header* hdr = (sm_msg_header*)buf;
write_cmd* cmd = (write_cmd*)&hdr[1];
uint8_t* data;
cmd->opcode = WRITE;
cmd->offset = 0;
cmd->count = 9;
cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint8_t*)&cmd->filename[cmd->flen];
memcpy(data, "123456789", cmd->count);
hdr->type = SM_MSG_START;
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
WriteTask w(clientSock, hdr->payloadLen);
ssize_t result = ::write(sessionSock, cmd, hdr->payloadLen);
assert(result == static_cast<ssize_t>(hdr->payloadLen));
w.run();
// verify response
int err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
// check file contents
err = ::read(fd, buf, sizeof(buf));
assert(err == 9);
buf[9] = 0;
assert(!strcmp("123456789", (const char*)buf));
::unlink(filename);
cout << "write task OK" << endl;
return true;
}
bool appendtask()
{
// make a file and put some stuff in it
bf::path fullPath = homepath / prefix / "appendtest1";
const char* filename = fullPath.string().c_str();
::unlink(filename);
int fd = ::open(filename, O_CREAT | O_RDWR, 0666);
assert(fd > 0);
scoped_closer f(fd);
int err = ::write(fd, "testjunk", 8);
assert(err == 8);
uint8_t buf[1024];
append_cmd* cmd = (append_cmd*)buf;
uint8_t* data;
cmd->opcode = APPEND;
cmd->count = 9;
cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
data = (uint8_t*)&cmd->filename[cmd->flen];
memcpy(data, "123456789", cmd->count);
int payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
AppendTask a(clientSock, payloadLen);
ssize_t result = ::write(sessionSock, cmd, payloadLen);
assert(result == (payloadLen));
a.run();
// verify response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
// check file contents
::lseek(fd, 0, SEEK_SET);
err = ::read(fd, buf, sizeof(buf));
assert(err == 17);
buf[17] = 0;
assert(!strcmp("testjunk123456789", (const char*)buf));
::unlink(filename);
cout << "append task OK" << endl;
return true;
}
void unlinktask(bool connectionTest = false)
{
int err = 0;
// make a meta file and delete it
bf::path fullPath = homepath / prefix / "unlinktest1";
string pathMeta = prefix + "/unlinktest1";
const char* Metafilename = pathMeta.c_str();
const char* filename = fullPath.string().c_str();
IOCoordinator* ioc = IOCoordinator::get();
bf::path fullPathMeta = ioc->getMetadataPath() / (string(Metafilename) + ".meta");
bf::remove(fullPathMeta);
MetadataFile meta(Metafilename);
meta.writeMetadata();
assert(bf::exists(fullPathMeta));
uint8_t buf[1024];
unlink_cmd* cmd = (unlink_cmd*)buf;
cmd->opcode = UNLINK;
cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
// set payload to be shorter than actual message lengh
// and send a shortened message.
if (connectionTest)
cmd->flen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen);
assert(result == (sizeof(unlink_cmd) + cmd->flen));
// set payload to be correct length again
if (connectionTest)
cmd->flen += 2;
// process task will look for the full length and
// will wait on the rest of the message.
ProcessTask pt(clientSock, sizeof(unlink_cmd) + cmd->flen);
boost::thread t(pt);
if (connectionTest)
{
// make sure the thread is waiting for the rest of the data
// then kill the connection. This will trigger the task thread
// to exit on an error handling path
sleep(1);
close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else
{
t.join();
// read the response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
// confirm it no longer exists
assert(!bf::exists(fullPathMeta));
}
// delete it again, make sure we get an error message & reasonable error code
// Interesting. boost::filesystem::remove() doesn't consider it an error if the file doesn't
// exist. Need to look into the reasoning for that, and decide whether IOC
// should return an error anyway. For now, this test below doesn't get
// an error msg.
#if 0
memset(buf, 0, 1024);
cmd->opcode = UNLINK;
cmd->flen = strlen(filename);
memcpy(&cmd->filename, filename, cmd->flen);
UnlinkTask u2(clientSock, sizeof(unlink_cmd) + cmd->flen);
ssize_t result = ::write(sessionSock, cmd, sizeof(unlink_cmd) + cmd->flen);
assert(result==(sizeof(unlink_cmd) + cmd->flen));
u2.run();
// verify response
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
resp = (sm_response *) buf;
assert(err == sizeof(*resp) + 4);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == 8);
assert(resp->header.flags == 0);
assert(resp->returnCode == -1);
err = (*(int *) resp->payload);
assert(err == ENOENT);
#endif
cout << "unlink task OK" << endl;
}
bool stattask(bool connectionTest = false)
{
int err = 0;
bf::path fullPath = homepath / prefix / "stattest1";
string filename = fullPath.string();
string Metafilename = prefix + "/stattest1";
string fullFilename =
Config::get()->getValue("ObjectStorage", "metadata_path") + "/" + Metafilename + ".meta";
::unlink(fullFilename.c_str());
makeTestMetadata(fullFilename.c_str(), testObjKey);
uint8_t buf[1024];
stat_cmd* cmd = (stat_cmd*)buf;
cmd->opcode = STAT;
cmd->flen = filename.length();
strcpy((char*)cmd->filename, filename.c_str());
// set payload to be shorter than actual message lengh
// and send a shortened message.
if (connectionTest)
cmd->flen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->flen);
assert(result == (sizeof(*cmd) + cmd->flen));
// set payload to be correct length again
if (connectionTest)
cmd->flen += 2;
// process task will look for the full length and
// will wait on the rest of the message.
ProcessTask pt(clientSock, sizeof(*cmd) + cmd->flen);
boost::thread t(pt);
if (connectionTest)
{
// make sure the thread is waiting for the rest of the data
// then kill the connection. This will trigger the task thread
// to exit on an error handling path
sleep(1);
close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else
{
t.join();
// read the response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(struct stat) + sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->header.payloadLen == sizeof(struct stat) + sizeof(ssize_t));
assert(resp->returnCode == 0);
struct stat* _stat = (struct stat*)resp->payload;
// what can we verify about the stat...
assert(_stat->st_uid == getuid());
assert(_stat->st_gid == getgid());
assert(_stat->st_size == 8192);
}
::unlink(fullFilename.c_str());
cout << "stattask OK" << endl;
return true;
}
bool IOCTruncate()
{
IOCoordinator* ioc = IOCoordinator::get();
CloudStorage* cs = CloudStorage::get();
Synchronizer* sync = Synchronizer::get();
LocalStorage* ls = dynamic_cast<LocalStorage*>(cs);
if (!ls)
{
cout << "IOCTruncate() currently requires using Local storage" << endl;
return true;
}
Cache* cache = Cache::get();
cache->reset();
bf::path cachePath = ioc->getCachePath();
bf::path journalPath = ioc->getJournalPath();
bf::path metaPath = ioc->getMetadataPath();
bf::path cloudPath = ls->getPrefix();
// metaPath doesn't necessarily exist until a MetadataFile instance is created
bf::create_directories(metaPath);
/* start with one object in cloud storage
truncate past the end of the object
verify nothing changed & got success
truncate at 4000 bytes
verify everything sees the 'file' as 4000 bytes
- IOC + meta
truncate at 0 bytes
verify file now looks empty
verify the object was deleted
add 2 8k test objects and a journal against the second one
truncate @ 10000 bytes
verify all files still exist
truncate @ 6000 bytes, 2nd object & journal were deleted
truncate @ 0 bytes, verify no files are left
*/
bf::path metadataFile = metaPath / prefix / "test-file.meta";
bf::path objectPath = cloudPath / testObjKey;
bf::path cachedObjectPath = cachePath / prefix / testObjKey;
makeTestMetadata(metadataFile.string().c_str(), testObjKey);
makeTestObject(objectPath.string().c_str());
int err;
uint8_t buf[1 << 14];
int* buf32 = (int*)buf;
/* Need to enable this later.
// Extend the test file to 10000 bytes
err = ioc->truncate(testFile, 10000);
assert(!err);
err = ioc->read(testFile, buf, 0, 10000);
assert(err == 10000);
// verity the data is what it should be
for (int i = 0; i < 2048; i++)
assert(buf32[i] == i);
for (int i = 2048; i < 2500; i++)
assert(buf32[i] == 0);
*/
err = ioc->truncate(testFile, 4000);
assert(!err);
MetadataFile meta(metaTestFile);
assert(meta.getLength() == 4000);
// read the data, make sure there are only 4000 bytes & the object still exists
err = ioc->read(testFile, buf, 0, 8192);
assert(err == 4000);
err = ioc->read(testFile, buf, 4000, 1);
assert(err == 0);
err = ioc->read(testFile, buf, 4005, 1);
assert(err == 0);
assert(bf::exists(objectPath));
// truncate to 0 bytes, make sure everything is consistent with that, and the object no longer exists
err = ioc->truncate(testFile, 0);
assert(!err);
meta = MetadataFile(metaTestFile);
assert(meta.getLength() == 0);
err = ioc->read(testFile, buf, 0, 1);
assert(err == 0);
err = ioc->read(testFile, buf, 4000, 1);
assert(err == 0);
sync->forceFlush();
sleep(1); // give Sync a chance to delete the object from the cloud
assert(!bf::exists(objectPath));
// recreate the meta file, make a 2-object version
ioc->unlink(testFile);
makeTestMetadata(metadataFile.string().c_str(), testObjKey);
makeTestObject(objectPath.string().c_str());
meta = MetadataFile(metaTestFile);
bf::path secondObjectPath = cloudPath / meta.addMetadataObject(testFile, 8192).key;
bf::path cachedSecondObject = cachePath / prefix / secondObjectPath.filename();
makeTestObject(secondObjectPath.string().c_str());
meta.writeMetadata();
// make sure there are 16k bytes, and the data is valid before going forward
memset(buf, 0, sizeof(buf));
err = ioc->read(testFile, buf, 0, sizeof(buf));
assert(err == sizeof(buf));
for (int i = 0; i < (int)sizeof(buf) / 4; i++)
assert(buf32[i] == (i % 2048));
assert(bf::exists(cachedSecondObject));
assert(bf::exists(cachedObjectPath));
// truncate to 10k, make sure everything looks right
err = ioc->truncate(testFile, 10240);
assert(!err);
meta = MetadataFile(metaTestFile);
assert(meta.getLength() == 10240);
memset(buf, 0, sizeof(buf));
err = ioc->read(testFile, buf, 0, 10240);
for (int i = 0; i < 10240 / 4; i++)
assert(buf32[i] == (i % 2048));
err = ioc->read(testFile, buf, 10239, 10);
assert(err == 1);
// truncate to 6000 bytes, make sure second object got deleted
err = ioc->truncate(testFile, 6000);
meta = MetadataFile(metaTestFile);
assert(meta.getLength() == 6000);
err = ioc->read(testFile, buf, 0, 8192);
assert(err == 6000);
sync->forceFlush();
sleep(1); // give Synchronizer a chance to delete the file from the 'cloud'
assert(!bf::exists(secondObjectPath));
assert(!bf::exists(cachedSecondObject));
cache->reset();
ioc->unlink(testFile);
cout << "IOCTruncate OK" << endl;
return true;
}
bool truncatetask(bool connectionTest = false)
{
IOCoordinator* ioc = IOCoordinator::get();
Cache* cache = Cache::get();
bf::path metaPath = ioc->getMetadataPath();
bf::path fullPath = homepath / prefix / "trunctest1";
string metaStr = prefix + "/trunctest1";
const char* filename = fullPath.string().c_str();
const char* Metafilename = metaStr.c_str();
int err = 0;
// get the metafile created
string metaFullName = (metaPath / Metafilename).string() + ".meta";
::unlink(metaFullName.c_str());
MetadataFile meta(Metafilename);
uint8_t buf[1024];
truncate_cmd* cmd = (truncate_cmd*)buf;
cmd->opcode = TRUNCATE;
cmd->length = 1000;
cmd->flen = strlen(filename);
strcpy((char*)cmd->filename, filename);
// set payload to be shorter than actual message lengh
// and send a shortened message.
if (connectionTest)
cmd->flen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->flen);
assert(result == (sizeof(*cmd) + cmd->flen));
// set payload to be correct length again
if (connectionTest)
cmd->flen += 2;
// process task will look for the full length and
// will wait on the rest of the message.
ProcessTask pt(clientSock, sizeof(*cmd) + cmd->flen);
boost::thread t(pt);
if (connectionTest)
{
// make sure the thread is waiting for the rest of the data
// then kill the connection. This will trigger the task thread
// to exit on an error handling path
sleep(1);
close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else
{
t.join();
// read the response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->returnCode == 0);
// reload the metadata, check that it is 1000 bytes
meta = MetadataFile(Metafilename);
assert(meta.getLength() == 1000);
}
cache->reset();
::unlink(metaFullName.c_str());
cout << "truncate task OK" << endl;
return true;
}
bool listdirtask(bool connectionTest = false)
{
IOCoordinator* ioc = IOCoordinator::get();
const bf::path metaPath = ioc->getMetadataPath();
bf::path fullPath = homepath / prefix / "listdirtask";
string metaStr = prefix + "/listdirtask";
const char* relPath = fullPath.string().c_str();
const char* MetarelPath = metaStr.c_str();
bf::path tmpPath = metaPath / MetarelPath;
// make some dummy files, make sure they are in the list returned.
set<string> files;
int err;
vector<SharedCloser> fdMinders;
bf::create_directories(tmpPath);
for (int i = 0; i < 10; i++)
{
string file(tmpPath.string() + "/dummy" + to_string(i));
files.insert(file);
file += ".meta";
err = ::open(file.c_str(), O_CREAT | O_WRONLY, 0600);
assert(err >= 0);
fdMinders.push_back(err);
}
uint8_t buf[8192];
memset(buf, 0, sizeof(buf));
listdir_cmd* cmd = (listdir_cmd*)buf;
cmd->opcode = LIST_DIRECTORY;
cmd->plen = strlen(relPath);
memcpy(cmd->path, relPath, cmd->plen);
// set payload to be shorter than actual message lengh
// and send a shortened message.
if (connectionTest)
cmd->plen -= 2;
size_t result = ::write(sessionSock, cmd, sizeof(*cmd) + cmd->plen);
assert(result == (sizeof(*cmd) + cmd->plen));
// set payload to be correct length again
if (connectionTest)
cmd->plen += 2;
// process task will look for the full length and
// will wait on the rest of the message.
ProcessTask pt(clientSock, sizeof(*cmd) + cmd->plen);
boost::thread t(pt);
if (connectionTest)
{
// make sure the thread is waiting for the rest of the data
// then kill the connection. This will trigger the task thread
// to exit on an error handling path
sleep(1);
close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else
{
t.join();
/* going to keep this simple. Don't run this in a big dir. */
/* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err > 0);
assert(resp->header.type == SM_MSG_START);
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
listdir_resp* r = (listdir_resp*)resp->payload;
assert(r->elements == 10);
int off = sizeof(sm_response) + sizeof(listdir_resp);
uint fileCounter = 0;
while (off < err)
{
listdir_resp_entry* e = (listdir_resp_entry*)&buf[off];
// cout << "len = " << e->flen << endl;
assert(off + e->flen + sizeof(listdir_resp_entry) < 8192);
string file(e->filename, e->flen);
assert(files.find((tmpPath / file).string()) != files.end());
fileCounter++;
// cout << "name = " << file << endl;
off += e->flen + sizeof(listdir_resp_entry);
}
assert(fileCounter == r->elements);
}
bf::remove_all(tmpPath);
cout << "listdir task OK" << endl;
return true;
}
void pingtask()
{
int err = 0;
uint8_t buf[1024];
ping_cmd* cmd = (ping_cmd*)buf;
cmd->opcode = PING;
size_t len = sizeof(*cmd);
ssize_t result = ::write(sessionSock, cmd, sizeof(*cmd));
assert(result == (sizeof(*cmd)));
// process task will look for the full length and
// will wait on the rest of the message.
// don't test connection loss here since this message is only 1 byte
ProcessTask pt(clientSock, len);
boost::thread t(pt);
t.join();
// read the response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
cout << "pingtask OK" << endl;
}
bool copytask(bool connectionTest = false)
{
/*
make a file
copy it
verify it exists
*/
bf::path fullPathSrc = homepath / prefix / "dummy1";
string metaStrSrc = prefix + "/dummy1";
const char* source = fullPathSrc.string().c_str();
const char* Metasource = metaStrSrc.c_str();
bf::path fullPathDest = homepath / prefix / "dummy2";
string metaStrDest = prefix + "/dummy2";
const char* dest = fullPathDest.string().c_str();
const char* Metadest = metaStrDest.c_str();
MetadataFile meta1(Metasource);
uint8_t buf[1024];
copy_cmd* cmd = (copy_cmd*)buf;
cmd->opcode = COPY;
cmd->file1.flen = strlen(source);
memcpy(cmd->file1.filename, source, cmd->file1.flen);
f_name* file2 = (f_name*)&cmd->file1.filename[cmd->file1.flen];
file2->flen = strlen(dest);
memcpy(file2->filename, dest, file2->flen);
uint len = (uint64_t)&file2->filename[file2->flen] - (uint64_t)buf;
// set payload to be shorter than actual message lengh
// and send a shortened message.
if (connectionTest)
len -= 2;
ssize_t result = ::write(sessionSock, buf, len);
assert(result == static_cast<ssize_t>(len));
int err = 0;
// set payload to be correct length again
if (connectionTest)
len += 2;
// process task will look for the full length and
// will wait on the rest of the message.
ProcessTask pt(clientSock, len);
boost::thread t(pt);
if (connectionTest)
{
// make sure the thread is waiting for the rest of the data
// then kill the connection. This will trigger the task thread
// to exit on an error handling path
sleep(1);
close(sessionSock);
close(clientSock);
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
assert(err == -1);
t.join();
}
else
{
t.join();
// read the response
err = ::recv(sessionSock, buf, sizeof(buf), MSG_DONTWAIT);
sm_response* resp = (sm_response*)buf;
assert(err == sizeof(sm_response));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 0);
// verify copytest2 is there
MetadataFile meta2(Metadest, MetadataFile::no_create_t(), true);
assert(meta2.exists());
}
bf::path metaPath = IOCoordinator::get()->getMetadataPath();
bf::remove(metaPath / (metaStrSrc + ".meta"));
bf::remove(metaPath / (metaStrDest + ".meta"));
cout << "copytask OK " << endl;
return true;
}
bool localstorageTest1()
{
LocalStorage ls;
/* TODO: Some stuff */
cout << "local storage test 1 OK" << endl;
return true;
}
bool cacheTest1()
{
Cache* cache = Cache::get();
CloudStorage* cs = CloudStorage::get();
LocalStorage* ls = dynamic_cast<LocalStorage*>(cs);
if (ls == NULL)
{
cout << "Cache test 1 requires using local storage" << endl;
return false;
}
cache->reset();
assert(cache->getCurrentCacheSize() == 0);
bf::path storagePath = ls->getPrefix();
bf::path cachePath = cache->getCachePath() / prefix;
vector<string> v_bogus;
vector<bool> exists;
// make sure nothing shows up in the cache path for files that don't exist
v_bogus.push_back("does-not-exist");
cache->read(prefix, v_bogus);
assert(!bf::exists(cachePath / "does-not-exist"));
cache->exists(prefix, v_bogus, &exists);
assert(exists.size() == 1);
assert(!exists[0]);
// make sure a file that does exist does show up in the cache path
makeTestObject((storagePath / testObjKey).string().c_str());
v_bogus[0] = testObjKey.c_str();
cache->read(prefix, v_bogus);
assert(bf::exists(cachePath / testObjKey));
exists.clear();
cache->exists(prefix, v_bogus, &exists);
assert(exists.size() == 1);
assert(exists[0]);
size_t currentSize = cache->getCurrentCacheSize();
assert(currentSize == bf::file_size(cachePath / testObjKey));
// lie about the file being deleted and then replaced
cache->deletedObject(prefix, testObjKey, currentSize);
assert(cache->getCurrentCacheSize() == 0);
cache->newObject(prefix, testObjKey, currentSize);
assert(cache->getCurrentCacheSize() == currentSize);
cache->exists(prefix, v_bogus, &exists);
assert(exists.size() == 1);
assert(exists[0]);
// cleanup
bf::remove(cachePath / testObjKey);
bf::remove(storagePath / testObjKey);
cout << "cache test 1 OK" << endl;
return true;
}
bool mergeJournalTest()
{
/*
create a dummy object and a dummy journal
call mergeJournal to process it with various params
verify the expected values
*/
makeTestObject("test-object");
makeTestJournal("test-journal");
int i;
IOCoordinator* ioc = IOCoordinator::get();
size_t len = 8192, tmp;
std::shared_ptr<uint8_t[]> data = ioc->mergeJournal("test-object", "test-journal", 0, len, &tmp);
assert(data);
int* idata = (int*)data.get();
for (i = 0; i < 5; i++)
assert(idata[i] == i);
for (; i < 10; i++)
assert(idata[i] == i - 5);
for (; i < 2048; i++)
assert(idata[i] == i);
// try different range parameters
// read at the beginning of the change
len = 40;
data = ioc->mergeJournal("test-object", "test-journal", 20, len, &tmp);
assert(data);
idata = (int*)data.get();
for (i = 0; i < 5; i++)
assert(idata[i] == i);
for (; i < 10; i++)
assert(idata[i] == i + 5);
// read s.t. beginning of the change is in the middle of the range
len = 24;
data = ioc->mergeJournal("test-object", "test-journal", 8, len, &tmp);
assert(data);
idata = (int*)data.get();
for (i = 0; i < 3; i++)
assert(idata[i] == i + 2);
for (; i < 6; i++)
assert(idata[i] == i - 3);
// read s.t. end of the change is in the middle of the range
len = 20;
data = ioc->mergeJournal("test-object", "test-journal", 28, len, &tmp);
assert(data);
idata = (int*)data.get();
for (i = 0; i < 3; i++)
assert(idata[i] == i + 2);
for (; i < 3; i++)
assert(idata[i] == i + 7);
// cleanup
bf::remove("test-object");
bf::remove("test-journal");
cout << "mergeJournalTest OK" << endl;
return true;
}
bool syncTest1()
{
IOCoordinator* ioc = IOCoordinator::get();
Config* config = Config::get();
Synchronizer* sync = Synchronizer::get();
Cache* cache = Cache::get();
CloudStorage* cs = CloudStorage::get();
LocalStorage* ls = dynamic_cast<LocalStorage*>(cs);
if (!ls)
{
cout << "syncTest1() requires using local storage at the moment." << endl;
return true;
}
cache->reset();
// delete everything in the fake cloud to make it easier to list later
bf::path fakeCloudPath = ls->getPrefix();
cout << "fakeCLoudPath = " << fakeCloudPath << endl;
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir)
bf::remove(dir->path());
bf::path cachePath = sync->getCachePath();
bf::path journalPath = sync->getJournalPath();
string stmp = config->getValue("ObjectStorage", "metadata_path");
assert(!stmp.empty());
bf::path metaPath = stmp;
// nothing creates the dir yet
bf::create_directories(metaPath);
// make the test obj, journal, and metadata
string journalName = prefix + "/" + testObjKey + ".journal";
makeTestObject((cachePath / prefix / testObjKey).string().c_str());
makeTestJournal((journalPath / journalName).string().c_str());
makeTestMetadata((metaPath / string(metaTestFile + ".meta")).string().c_str(), testObjKey);
cache->newObject(prefix, testObjKey, bf::file_size(cachePath / prefix / testObjKey));
cache->newJournalEntry(prefix, bf::file_size(journalPath / journalName));
vector<string> vObj;
vObj.push_back(testObjKey);
sync->newObjects(prefix, vObj);
sync->forceFlush();
sleep(2); // wait for the job to run
// make sure that it made it to the cloud
bool exists = false;
int err = cs->exists(testObjKey, &exists);
assert(!err);
assert(exists);
sync->newJournalEntry(prefix, testObjKey, 0);
sync->forceFlush();
sleep(1); // let it do what it does
// check that the original objects no longer exist
assert(!cache->exists(prefix, testObjKey));
assert(!bf::exists(journalPath / journalName));
// Replicator doesn't implement all of its functionality yet, need to delete key from the cache manually for
// now
bf::remove(cachePath / testObjKey);
// check that a new version of object exists in cloud storage
// D'oh, this would have to list the objects to find it, not going to implement
// that everywhere just now. For now, making this test require LocalStorage.
bool foundIt = false;
string newKey;
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator() && !foundIt; ++dir)
{
newKey = dir->path().filename().string();
foundIt = (MetadataFile::getSourceFromKey(newKey) == metaTestFile);
if (foundIt)
{
assert(cache->exists(prefix, newKey));
cs->deleteObject(newKey);
break;
}
}
assert(foundIt);
cache->makeSpace(prefix, cache->getMaxCacheSize()); // clear the cache & make it call sync->flushObject()
// the key should now be back in cloud storage and deleted from the cache
assert(!cache->exists(prefix, newKey));
err = cs->exists(newKey, &exists);
assert(!err && exists);
// make the journal again, call sync->newJournalObject()
makeTestJournal((journalPath / prefix / (newKey + ".journal")).string().c_str());
cache->newJournalEntry(prefix, bf::file_size(journalPath / prefix / (newKey + ".journal")));
sync->newJournalEntry(prefix, newKey, 0);
sync->forceFlush();
sleep(1);
// verify that newkey is no longer in cloud storage, and that another permutation is
err = cs->exists(newKey, &exists);
assert(!err && !exists);
foundIt = false;
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator() && !foundIt; ++dir)
{
testObjKey = dir->path().filename().string();
foundIt = (MetadataFile::getSourceFromKey(testObjKey) == metaTestFile);
}
assert(foundIt);
// TODO test error paths, pass in some junk
// cleanup, just blow away everything for now
cache->reset();
vector<string> keys;
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator(); ++dir)
keys.push_back(dir->path().filename().string());
sync->deletedObjects(prefix, keys);
sync->forceFlush();
sleep(1);
ioc->unlink(testFile);
cout << "Sync test 1 OK" << endl;
return true;
}
void metadataUpdateTest()
{
Config* config = Config::get();
string metaPath = config->getValue("ObjectStorage", "metadata_path");
MetadataFile mdfTest("metadataUpdateTest");
mdfTest.addMetadataObject("metadataUpdateTest", 100);
mdfTest.printObjects();
mdfTest.updateEntryLength(0, 200);
mdfTest.printObjects();
// mdfTest.updateEntryLength(0,100);
// mdfTest.printObjects();
string metaFilePath = metaPath + "/" + "metadataUpdateTest.meta";
::unlink(metaFilePath.c_str());
}
void s3storageTest1()
{
try
{
S3Storage s3;
bool exists;
int err;
string testFile = "storagemanager.cnf";
string testFile2 = testFile + "2";
exists = bf::exists(testFile);
if (!exists)
{
cout << "s3storageTest1() requires having " << testFile << " in the current directory.";
return;
}
try
{
err = s3.exists(testFile, &exists);
assert(!err);
if (exists)
s3.deleteObject(testFile);
err = s3.exists(testFile2, &exists);
assert(!err);
if (exists)
s3.deleteObject(testFile2);
// put it & get it
err = s3.putObject(testFile, testFile);
assert(!err);
err = s3.exists(testFile, &exists);
assert(!err);
assert(exists);
err = s3.getObject(testFile, testFile2);
assert(!err);
exists = bf::exists(testFile2);
assert(bf::file_size(testFile) == bf::file_size(testFile2));
// do a deep compare testFile vs testFile2
size_t len = bf::file_size(testFile);
int fd1 = open(testFile.c_str(), O_RDONLY);
assert(fd1 >= 0);
int fd2 = open(testFile2.c_str(), O_RDONLY);
assert(fd2 >= 0);
uint8_t* data1 = new uint8_t[len];
uint8_t* data2 = new uint8_t[len];
err = read(fd1, data1, len);
assert(err == (int)len);
err = read(fd2, data2, len);
assert(err == (int)len);
assert(!memcmp(data1, data2, len));
close(fd1);
close(fd2);
delete[] data1;
delete[] data2;
err = s3.copyObject(testFile, testFile2);
assert(!err);
err = s3.exists(testFile2, &exists);
assert(!err);
assert(exists);
s3.deleteObject(testFile);
s3.deleteObject(testFile2);
err = s3.copyObject("this-does-not-exist", testFile2);
assert(err < 0);
assert(errno == ENOENT);
}
catch (exception& e)
{
cout << __FUNCTION__ << " caught " << e.what() << endl;
assert(0);
}
cout << "S3Storage Test 1 OK" << endl;
}
catch (exception& e)
{
cout << e.what() << endl;
}
}
void IOCReadTest1()
{
/* Generate the test object & metadata
read it, verify result
Generate the journal object
read it, verify the merged result
TODO: do partial reads with an offset similar to what the mergeJournal tests do
TODO: some error path testing
*/
Cache* cache = Cache::get();
CloudStorage* cs = CloudStorage::get();
IOCoordinator* ioc = IOCoordinator::get();
Config* config = Config::get();
LocalStorage* ls = dynamic_cast<LocalStorage*>(cs);
if (!ls)
{
cout << "IOC read test 1 requires LocalStorage for now." << endl;
return;
}
testObjKey = "12345_0_8192_" + prefix + "~test-file";
cache->reset();
bf::path storagePath = ls->getPrefix();
bf::path cachePath = cache->getCachePath();
bf::path journalPath = cache->getJournalPath();
bf::path metaPath = config->getValue("ObjectStorage", "metadata_path");
assert(!metaPath.empty());
bf::create_directories(metaPath / prefix);
string objFilename = (storagePath / testObjKey).string();
string journalFilename = (journalPath / prefix / testObjKey).string() + ".journal";
string metaFilename = (metaPath / metaTestFile).string() + ".meta";
boost::scoped_array<uint8_t> data(new uint8_t[1 << 20]);
memset(data.get(), 0, 1 << 20);
int err;
err = ioc->read(testFile, data.get(), 0, 1 << 20);
assert(err < 0);
assert(errno == ENOENT);
makeTestObject(objFilename.c_str());
makeTestMetadata(metaFilename.c_str(), testObjKey);
size_t objSize = bf::file_size(objFilename);
err = ioc->read(testFile, data.get(), 0, 1 << 20);
assert(err == (int)objSize);
// verify the data
int* data32 = (int*)data.get();
int i;
for (i = 0; i < 2048; i++)
assert(data32[i] == i);
for (; i < (1 << 20) / 4; i++)
assert(data32[i] == 0);
makeTestJournal(journalFilename.c_str());
err = ioc->read(testFile, data.get(), 0, 1 << 20);
assert(err == (int)objSize);
for (i = 0; i < 5; i++)
assert(data32[i] == i);
for (; i < 10; i++)
assert(data32[i] == i - 5);
for (; i < 2048; i++)
assert(data32[i] == i);
for (; i < (1 << 20) / 4; i++)
assert(data32[i] == 0);
err = ioc->read(testFile, data.get(), 9000, 4000);
assert(err == 0);
cache->reset();
err = ioc->unlink(testFile);
assert(err >= 0);
cout << "IOC read test 1 OK" << endl;
}
void IOCUnlink()
{
IOCoordinator* ioc = IOCoordinator::get();
CloudStorage* cs = CloudStorage::get();
Cache* cache = Cache::get();
Synchronizer* sync = Synchronizer::get();
cache->reset();
/*
Make a metadata file with a complex path
make the test object and test journal
delete it at the parent dir level
make sure the parent dir was deleted
make sure the object and journal were deleted
*/
bf::path metaPath = ioc->getMetadataPath();
bf::path cachePath = ioc->getCachePath();
bf::path journalPath = ioc->getJournalPath();
bf::path cachedObjPath = cachePath / prefix / testObjKey;
bf::path cachedJournalPath = journalPath / prefix / (string(testObjKey) + ".journal");
bf::path metadataFile = metaPath / (string(metaTestFile) + ".meta");
makeTestMetadata(metadataFile.string().c_str(), testObjKey);
makeTestObject(cachedObjPath.string().c_str());
makeTestJournal(cachedJournalPath.string().c_str());
cache->newObject(prefix, cachedObjPath.filename().string(), bf::file_size(cachedObjPath));
cache->newJournalEntry(prefix, bf::file_size(cachedJournalPath));
vector<string> keys;
keys.push_back(cachedObjPath.filename().string());
sync->newObjects(prefix, keys);
// sync->newJournalEntry(keys[0]); don't want to end up renaming it
sync->forceFlush();
sleep(1);
// ok, they should be fully 'in the system' now.
// verify that they are
assert(bf::exists(metadataFile));
assert(bf::exists(cachedObjPath));
assert(bf::exists(cachedJournalPath));
bool exists;
cs->exists(cachedObjPath.filename().string(), &exists);
assert(exists);
int err = ioc->unlink(testFile);
assert(err == 0);
assert(!bf::exists(metadataFile));
assert(!bf::exists(cachedObjPath));
assert(!bf::exists(cachedJournalPath));
sync->forceFlush();
sleep(1); // stall for sync
cs->exists(cachedObjPath.filename().string(), &exists);
assert(!exists);
assert(cache->getCurrentCacheSize() == 0);
cout << "IOC unlink test OK" << endl;
}
void IOCCopyFile1()
{
/*
Make our usual test files
with metadata in a subdir
with object in cloud storage
call ioc::copyFile()
with dest in a different subdir
verify the contents
*/
IOCoordinator* ioc = IOCoordinator::get();
Cache* cache = Cache::get();
CloudStorage* cs = CloudStorage::get();
LocalStorage* ls = dynamic_cast<LocalStorage*>(cs);
Synchronizer* sync = Synchronizer::get();
if (!ls)
{
cout << "IOCCopyFile1 requires local storage at the moment" << endl;
return;
}
bf::path metaPath = ioc->getMetadataPath();
bf::path cachePath = ioc->getCachePath();
bf::path csPath = ls->getPrefix();
bf::path journalPath = ioc->getJournalPath();
bf::path cachedObjPath = cachePath / prefix / copyfileObjKey;
bf::path sourcePath = metaPath / prefix / "source.meta";
bf::path destPath = metaPath / prefix / "dest.meta";
bf::path l_sourceFile = homepath / prefix / string("source");
bf::path l_destFile = homepath / prefix / string("dest");
cache->reset();
bf::create_directories(sourcePath.parent_path());
makeTestMetadata(sourcePath.string().c_str(), copyfileObjKey);
makeTestObject((csPath / copyfileObjKey).string().c_str());
makeTestJournal((journalPath / prefix / (string(copyfileObjKey) + ".journal")).string().c_str());
cache->newJournalEntry(prefix, bf::file_size(journalPath / prefix / (string(copyfileObjKey) + ".journal")));
int err = ioc->copyFile(l_sourceFile.string().c_str(), l_destFile.string().c_str());
assert(!err);
uint8_t buf1[8192], buf2[8192];
err = ioc->read(l_sourceFile.string().c_str(), buf1, 0, sizeof(buf1));
assert(err == sizeof(buf1));
err = ioc->read(l_destFile.string().c_str(), buf2, 0, sizeof(buf2));
assert(err == sizeof(buf2));
assert(memcmp(buf1, buf2, 8192) == 0);
assert(ioc->unlink(l_sourceFile.string().c_str()) == 0);
assert(ioc->unlink(l_destFile.string().c_str()) == 0);
sync->forceFlush();
assert(cache->getCurrentCacheSize() == 0);
cout << "IOC copy file 1 OK" << endl;
}
void IOCCopyFile2()
{
// call IOC::copyFile() with non-existant file
IOCoordinator* ioc = IOCoordinator::get();
bf::path fullPath = homepath / prefix / "not-there";
const char* source = fullPath.string().c_str();
bf::path fullPath2 = homepath / prefix / "not-there2";
const char* dest = fullPath2.string().c_str();
bf::path metaPath = ioc->getMetadataPath();
bf::remove(metaPath / prefix / "not-there.meta");
bf::remove(metaPath / prefix / "not-there2.meta");
int err = ioc->copyFile(source, dest);
assert(err);
assert(errno == ENOENT);
assert(!bf::exists(metaPath / "not-there.meta"));
assert(!bf::exists(metaPath / "not-there2.meta"));
cout << "IOC copy file 2 OK" << endl;
}
void IOCCopyFile3()
{
/*
Make our usual test files
with object in the cache not in CS
call ioc::copyFile()
verify dest file exists
*/
IOCoordinator* ioc = IOCoordinator::get();
Cache* cache = Cache::get();
Synchronizer* sync = Synchronizer::get();
bf::path metaPath = ioc->getMetadataPath();
bf::path journalPath = ioc->getJournalPath();
bf::path cachePath = ioc->getCachePath();
bf::path sourcePath = metaPath / prefix / "source.meta";
bf::path destPath = metaPath / prefix / "dest.meta";
bf::path l_sourceFile = homepath / prefix / string("source");
bf::path l_destFile = homepath / prefix / string("dest");
cache->reset();
makeTestObject((cachePath / prefix / copyfileObjKey).string().c_str());
makeTestJournal((journalPath / prefix / (string(copyfileObjKey) + ".journal")).string().c_str());
makeTestMetadata(sourcePath.string().c_str(), copyfileObjKey);
cache->newObject(prefix, copyfileObjKey, bf::file_size(cachePath / prefix / copyfileObjKey));
cache->newJournalEntry(prefix, bf::file_size(journalPath / prefix / (string(copyfileObjKey) + ".journal")));
int err = ioc->copyFile(l_sourceFile.string().c_str(), l_destFile.string().c_str());
assert(!err);
uint8_t buf1[8192], buf2[8192];
err = ioc->read(l_sourceFile.string().c_str(), buf1, 0, sizeof(buf1));
assert(err == sizeof(buf1));
err = ioc->read(l_destFile.string().c_str(), buf2, 0, sizeof(buf2));
assert(err == sizeof(buf2));
assert(memcmp(buf1, buf2, 8192) == 0);
assert(ioc->unlink(l_sourceFile.string().c_str()) == 0);
assert(ioc->unlink(l_destFile.string().c_str()) == 0);
sync->forceFlush();
assert(cache->getCurrentCacheSize() == 0);
cout << "IOC copy file 3 OK" << endl;
}
void IOCCopyFile()
{
IOCCopyFile1();
IOCCopyFile2();
IOCCopyFile3();
}
/* Correctness was tested by inspecting debugging outputs in mergeJournal(). With a little more work
we could capture the merged output and use that to confirm the expected result. Later.
*/
void bigMergeJournal1()
{
const char* jName =
"test_data/e7a81ca3-0af8-48cc-b224-0f59c187e0c1_0_3436_~home~patrick~"
"mariadb~columnstore~data1~systemFiles~dbrm~BRM_saves_em.journal";
const char* fName =
"test_data/e7a81ca3-0af8-48cc-b224-0f59c187e0c1_0_3436_~home~patrick~"
"mariadb~columnstore~data1~systemFiles~dbrm~BRM_saves_em";
bf::path jNamePath = testDirPath / jName;
bf::path fNamePath = testDirPath / fName;
if (!bf::is_directory(testDirPath / "test_data"))
{
cout << "bigMergeJournal1 test_data directory not found at " << (testDirPath / "test_data") << endl
<< " Check if -d option needs to be provided or changed." << endl;
return;
}
IOCoordinator* ioc = IOCoordinator::get();
std::shared_ptr<uint8_t[]> buf;
size_t tmp;
buf = ioc->mergeJournal(fNamePath.string().c_str(), jNamePath.string().c_str(), 0, 68332, &tmp);
assert(buf);
buf = ioc->mergeJournal(fNamePath.string().c_str(), jNamePath.string().c_str(), 100, 68232, &tmp);
assert(buf);
buf = ioc->mergeJournal(fNamePath.string().c_str(), jNamePath.string().c_str(), 0, 68232, &tmp);
assert(buf);
buf = ioc->mergeJournal(fNamePath.string().c_str(), jNamePath.string().c_str(), 100, 68132, &tmp);
assert(buf);
buf = ioc->mergeJournal(fNamePath.string().c_str(), jNamePath.string().c_str(), 100, 10, &tmp);
assert(buf);
}
// This should write an incomplete msg(s) to make sure SM does the right thing. Not
// done yet, handing this off to Ben.
void shortMsg()
{
IOCoordinator* ioc = IOCoordinator::get();
struct stat _stat;
bf::path fullPath = homepath / prefix / "writetest1";
const char* filename = fullPath.string().c_str();
::unlink(filename);
ioc->open(filename, O_WRONLY | O_CREAT, &_stat);
size_t size = 27;
std::vector<uint8_t> bufWrite(sizeof(write_cmd) + std::strlen(filename) + size + sizeof(sm_msg_header));
sm_msg_header* hdrWrite = (sm_msg_header*)bufWrite.data();
write_cmd* cmdWrite = (write_cmd*)&hdrWrite[1];
uint8_t* dataWrite;
cmdWrite->opcode = WRITE;
cmdWrite->offset = 0;
cmdWrite->count = size;
cmdWrite->flen = std::strlen(filename);
memcpy(&cmdWrite->filename, filename, cmdWrite->flen);
dataWrite = (uint8_t*)&cmdWrite->filename[cmdWrite->flen];
memcpy(dataWrite, "123456789123456789123456789", cmdWrite->count);
hdrWrite->type = SM_MSG_START;
hdrWrite->payloadLen = sizeof(*cmdWrite) + cmdWrite->flen + 9;
WriteTask w(clientSock, hdrWrite->payloadLen);
ssize_t result = ::write(sessionSock, cmdWrite, hdrWrite->payloadLen);
assert(result == static_cast<ssize_t>(hdrWrite->payloadLen));
w.run();
// verify response
uint8_t bufRead[1024];
int err = ::recv(sessionSock, bufRead, sizeof(bufRead), MSG_DONTWAIT);
sm_response* resp = (sm_response*)bufRead;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
std::vector<uint8_t> bufAppend(sizeof(append_cmd) + std::strlen(filename) + size + sizeof(sm_msg_header));
uint8_t* dataAppend;
sm_msg_header* hdrAppend = (sm_msg_header*)bufAppend.data();
append_cmd* cmdAppend = (append_cmd*)&hdrAppend[1];
cmdAppend->opcode = APPEND;
cmdAppend->count = size;
cmdAppend->flen = std::strlen(filename);
memcpy(&cmdAppend->filename, filename, cmdAppend->flen);
dataAppend = (uint8_t*)&cmdAppend->filename[cmdAppend->flen];
memcpy(dataAppend, "123456789123456789123456789", cmdAppend->count);
hdrAppend->type = SM_MSG_START;
hdrAppend->payloadLen = sizeof(*cmdAppend) + cmdAppend->flen + 9;
AppendTask a(clientSock, hdrAppend->payloadLen);
err = ::write(sessionSock, cmdAppend, hdrAppend->payloadLen);
a.run();
// verify response
err = ::recv(sessionSock, bufRead, sizeof(bufRead), MSG_DONTWAIT);
resp = (sm_response*)bufRead;
assert(err == sizeof(*resp));
assert(resp->header.type == SM_MSG_START);
assert(resp->header.payloadLen == sizeof(ssize_t));
assert(resp->header.flags == 0);
assert(resp->returnCode == 9);
ioc->unlink(fullPath.string().c_str());
cout << "shortWriteMsg Test OK" << endl;
}
// write and append are the biggest vulnerabilities here b/c those msgs could be sent in multiple
// pieces, are much larger, and thus if there is a crash mid-message it's most likely to happen
// during a call to write/append().
// it may not even be possible for CS to write a partial open/stat/read/etc msg, but that should be
// tested as well.
void shortMsgTests()
{
shortMsg();
}
int main(int argc, char* argv[])
{
std::size_t sizeKB = 1024;
int option;
while ((option = getopt(argc, argv, "d:p:h:")) != EOF)
{
switch (option)
{
case 'd':
testDirPath = optarg;
cout << "test_dir path is " << testDirPath << endl;
break;
case 'p':
prefix = optarg;
testObjKey = "12345_0_8192_" + prefix + "~test-file";
copyfileObjKey = "12345_0_8192_" + prefix + "~source";
metaTestFile = prefix + "/test-file";
testFilePath = homepath / metaTestFile;
testFile = testFilePath.string().c_str();
cout << "TestFile is " << testFile << endl;
break;
case 'h':
default:
printUsage();
return 0;
break;
}
}
if (!bf::is_regular_file("test_data/storagemanager.cnf"))
{
cerr << "This should be run in a dir where ./test_data/storagemanager.cnf exists" << endl;
exit(1);
}
Config* config = Config::get("test_data/storagemanager.cnf");
cout << "Cleaning out debris from previous runs" << endl;
bf::remove_all(config->getValue("ObjectStorage", "metadata_path"));
bf::remove_all(config->getValue("ObjectStorage", "journal_path"));
bf::remove_all(config->getValue("LocalStorage", "path"));
bf::remove_all(config->getValue("Cache", "path"));
cout << "connecting" << endl;
makeConnection();
cout << "connected" << endl;
scoped_closer sc1(serverSock), sc2(sessionSock), sc3(clientSock);
opentask();
metadataUpdateTest();
// create the metadatafile to use
// requires 8K object size to test boundries
// Case 1 new write that spans full object
metadataJournalTest((10 * sizeKB), 0);
// Case 2 write data beyond end of data in object 2 that still ends in object 2
metadataJournalTest((4 * sizeKB), (8 * sizeKB));
// Case 3 write spans 2 journal objects
metadataJournalTest((8 * sizeKB), (4 * sizeKB));
// Case 4 write starts object1 ends object3
metadataJournalTest((16 * sizeKB), (7 * sizeKB));
// Case 5 write starts in new object at 0 offset after null objects
metadataJournalTest((8 * sizeKB), (32 * sizeKB));
// overwrite null objects
metadataJournalTest((10 * sizeKB), (40 * sizeKB));
metadataJournalTest((8 * sizeKB), (24 * sizeKB));
// overwrite whole file and create new objects
metadataJournalTest((96 * sizeKB), (0));
// append test
// first one appends file to end of 8K object
metadataJournalTest_append((7 * sizeKB));
// this apppends that starts on new object
metadataJournalTest_append((7 * sizeKB));
// this starts in one object and crosses into new object
metadataJournalTest_append((7 * sizeKB));
// broken
// writetask();
// broken
// appendtask();
unlinktask();
stattask();
truncatetask();
listdirtask();
pingtask();
copytask();
localstorageTest1();
cacheTest1();
mergeJournalTest();
replicatorTest();
syncTest1();
IOCReadTest1();
// broken
//IOCTruncate();
//IOCUnlink();
IOCCopyFile();
shortMsgTests();
// For the moment, this next one just verifies no error happens as reported by the fcns called.
// It doesn't verify the result yet.
// bigMergeJournal1();
// skip the s3 test if s3 is not configured
if (config->getValue("S3", "region") != "")
{
s3storageTest1();
}
else
cout << "To run the S3Storage unit tests, configure the S3 section of test-data/storagemanager.cnf"
<< endl;
cout << "Cleanup";
metadataJournalTestCleanup();
cout << " DONE" << endl;
cout << "Testing connection loss..." << endl << endl;
cout << "Check log files for lines:" << endl;
cout << "[NameTask read] caught an error: Bad file descriptor." << endl;
cout << "****** socket error!" << endl;
cout << "PosixTask::consumeMsg(): Discarding the tail end of a partial msg." << endl << endl;
opentask();
// broken
// cout << "OpenTask read2 connection test " << endl;
//opentask(true);
// makeConnection();
// cout << "UnlinkTask read connection test " << endl;
// unlinktask(true);
// makeConnection();
// cout << "StatTask read connection test " << endl;
// stattask(true);
// makeConnection();
// cout << "TruncateTask read connection test " << endl;
// truncatetask(true);
// makeConnection();
// cout << "ListDirextoryTask read connection test " << endl;
// listdirtask(true);
// makeConnection();
// cout << "CopyTask read connection test " << endl;
// copytask(true);
(Cache::get())->shutdown();
delete (Synchronizer::get());
delete (Cache::get());
delete (IOCoordinator::get());
return 0;
}