You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
Fixed up the Sync unit test, fixed a couple things in S3Storage,
it's working now.
This commit is contained in:
@@ -243,7 +243,10 @@ void MetadataFile::updateEntry(off_t offset, const string &newName, size_t newLe
|
||||
set<metadataObject>::iterator updateObj = mObjects.find(lookup);
|
||||
if (updateObj == mObjects.end())
|
||||
{
|
||||
//throw
|
||||
stringstream ss;
|
||||
ss << "MetadataFile::updateEntry(): failed to find object at offset " << offset;
|
||||
mpLogger->log(LOG_ERR, ss.str().c_str());
|
||||
throw logic_error(ss.str());
|
||||
}
|
||||
updateObj->key = newName;
|
||||
updateObj->length = newLength;
|
||||
@@ -256,7 +259,10 @@ void MetadataFile::updateEntryLength(off_t offset, size_t newLength)
|
||||
set<metadataObject>::iterator updateObj = mObjects.find(lookup);
|
||||
if (updateObj == mObjects.end())
|
||||
{
|
||||
//throw
|
||||
stringstream ss;
|
||||
ss << "MetadataFile::updateEntryLength(): failed to find object at offset " << offset;
|
||||
mpLogger->log(LOG_ERR, ss.str().c_str());
|
||||
throw logic_error(ss.str());
|
||||
}
|
||||
updateObj->length = newLength;
|
||||
}
|
||||
|
||||
@@ -50,9 +50,9 @@ class MetadataFile
|
||||
static void setOffsetInKey(std::string &key, off_t newOffset);
|
||||
static void setLengthInKey(std::string &key, size_t newLength);
|
||||
|
||||
|
||||
private:
|
||||
Config *mpConfig;
|
||||
std::string prefix;
|
||||
SMLogging *mpLogger;
|
||||
int mVersion;
|
||||
int mRevision;
|
||||
|
||||
@@ -144,12 +144,14 @@ int S3Storage::getObject(const string &sourceKey, boost::shared_array<uint8_t> *
|
||||
} while (err && retryable_error(err));
|
||||
if (err)
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
||||
data->reset();
|
||||
errno = s3err_to_errno[err];
|
||||
return -1;
|
||||
}
|
||||
|
||||
data->reset(_data);
|
||||
data->reset(_data, free);
|
||||
if (size)
|
||||
*size = len;
|
||||
return 0;
|
||||
@@ -220,6 +222,8 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
||||
} while (s3err && retryable_error(s3err));
|
||||
if (s3err)
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
||||
errno = s3err_to_errno[s3err];
|
||||
return -1;
|
||||
}
|
||||
@@ -239,6 +243,10 @@ void S3Storage::deleteObject(const string &key)
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err));
|
||||
|
||||
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
||||
logger->log(LOG_CRIT, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
}
|
||||
|
||||
int S3Storage::copyObject(const string &sourceKey, const string &destKey)
|
||||
@@ -251,9 +259,7 @@ int S3Storage::copyObject(const string &sourceKey, const string &destKey)
|
||||
err = getObject(sourceKey, &data, &len);
|
||||
if (err)
|
||||
return err;
|
||||
err = putObject(data, len, destKey);
|
||||
if (err)
|
||||
return err;
|
||||
return putObject(data, len, destKey);
|
||||
}
|
||||
|
||||
int S3Storage::exists(const string &key, bool *out)
|
||||
@@ -271,8 +277,10 @@ int S3Storage::exists(const string &key, bool *out)
|
||||
}
|
||||
} while (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err));
|
||||
|
||||
if (s3err)
|
||||
if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
errno = s3err_to_errno[s3err];
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -277,6 +277,8 @@ void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
|
||||
mutex.unlock();
|
||||
|
||||
bool success = false;
|
||||
|
||||
// in testing, this seems to only run once when an exception is caught. Not obvious why yet.... ??
|
||||
while (!success)
|
||||
{
|
||||
try {
|
||||
@@ -296,6 +298,7 @@ void Synchronizer::process(list<string>::iterator name, bool callerHoldsLock)
|
||||
catch(exception &e) {
|
||||
logger->log(LOG_CRIT, "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(),
|
||||
pending->opFlags, e.what());
|
||||
success = false;
|
||||
sleep(1);
|
||||
// TODO: requeue the job instead of looping infinitely
|
||||
}
|
||||
@@ -419,8 +422,10 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
}
|
||||
|
||||
// update the metadata for the source file
|
||||
|
||||
MetadataFile md(sourceFile.c_str());
|
||||
// note: a temporary fix. Metadatafile needs the full path to the metadata atm. Fix this
|
||||
// once MDF knows where to look.
|
||||
string metaPrefix = Config::get()->getValue("ObjectStorage", "metadata_path");
|
||||
MetadataFile md((metaPrefix + "/" + sourceFile).c_str());
|
||||
md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size);
|
||||
replicator->updateMetadata(sourceFile.c_str(), md);
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "LocalStorage.h"
|
||||
#include "MetadataFile.h"
|
||||
#include "Replicator.h"
|
||||
#include "S3Storage.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <stdlib.h>
|
||||
@@ -94,7 +95,6 @@ void makeConnection()
|
||||
assert(err == 0);
|
||||
t.join();
|
||||
}
|
||||
|
||||
|
||||
bool opentask()
|
||||
{
|
||||
@@ -458,7 +458,7 @@ bool listdirtask()
|
||||
assert(fd > 0);
|
||||
scoped_closer f(fd);
|
||||
|
||||
uint8_t buf[1024];
|
||||
uint8_t buf[8192];
|
||||
listdir_cmd *cmd = (listdir_cmd *) buf;
|
||||
|
||||
cmd->opcode = LIST_DIRECTORY;
|
||||
@@ -471,7 +471,7 @@ bool listdirtask()
|
||||
|
||||
/* going to keep this simple. Don't run this in a big dir. */
|
||||
/* maybe later I'll make a dir, put a file in it, and etc. For now run it in a small dir. */
|
||||
int err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
|
||||
int err = ::recv(sessionSock, buf, 8192, MSG_DONTWAIT);
|
||||
sm_response *resp = (sm_response *) buf;
|
||||
assert(err > 0);
|
||||
assert(resp->header.type == SM_MSG_START);
|
||||
@@ -587,6 +587,9 @@ bool cacheTest1()
|
||||
return false;
|
||||
}
|
||||
|
||||
cache->reset();
|
||||
assert(cache->getCurrentCacheSize() == 0);
|
||||
|
||||
bf::path storagePath = ls->getPrefix();
|
||||
bf::path cachePath = cache->getCachePath();
|
||||
vector<string> v_bogus;
|
||||
@@ -663,19 +666,20 @@ void makeTestMetadata(const char *dest)
|
||||
assert(metaFD >= 0);
|
||||
scoped_closer sc(metaFD);
|
||||
|
||||
// need to parameterize the object name in the objects list
|
||||
const char *metadata =
|
||||
"{ \
|
||||
\"version\" : 1, \
|
||||
\"revision\" : 1, \
|
||||
\"objects\" : \
|
||||
[ \
|
||||
{ \
|
||||
\"offset\" : 0, \
|
||||
\"length\" : 8192, \
|
||||
\"name\" : \"12345_0_8192_test-object\" \
|
||||
} \
|
||||
] \
|
||||
}";
|
||||
"{ \n\
|
||||
\"version\" : 1, \n\
|
||||
\"revision\" : 1, \n\
|
||||
\"objects\" : \n\
|
||||
[ \n\
|
||||
{ \n\
|
||||
\"offset\" : 0, \n\
|
||||
\"length\" : 8192, \n\
|
||||
\"key\" : \"12345_0_8192_test-file\" \n\
|
||||
} \n\
|
||||
] \n\
|
||||
}\n";
|
||||
write(metaFD, metadata, strlen(metadata));
|
||||
}
|
||||
|
||||
@@ -771,7 +775,7 @@ bool syncTest1()
|
||||
bf::create_directories(metaPath);
|
||||
|
||||
// make the test obj, journal, and metadata
|
||||
string key = "12345_0_8192_test-object";
|
||||
string key = "12345_0_8192_test-file";
|
||||
string journalName = key + ".journal";
|
||||
|
||||
makeTestObject((cachePath/key).string().c_str());
|
||||
@@ -811,7 +815,7 @@ bool syncTest1()
|
||||
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator() && !foundIt; ++dir)
|
||||
{
|
||||
newKey = dir->path().filename().string();
|
||||
foundIt = (MetadataFile::getSourceFromKey(newKey) == "test-object");
|
||||
foundIt = (MetadataFile::getSourceFromKey(newKey) == "test-file");
|
||||
if (foundIt)
|
||||
{
|
||||
size_t fsize = bf::file_size(dir->path());
|
||||
@@ -841,7 +845,7 @@ bool syncTest1()
|
||||
for (bf::directory_iterator dir(fakeCloudPath); dir != bf::directory_iterator() && !foundIt; ++dir)
|
||||
{
|
||||
key = dir->path().filename().string();
|
||||
foundIt = (MetadataFile::getSourceFromKey(key) == "test-object");
|
||||
foundIt = (MetadataFile::getSourceFromKey(key) == "test-file");
|
||||
}
|
||||
assert(foundIt);
|
||||
|
||||
@@ -869,7 +873,79 @@ void metadataUpdateTest()
|
||||
//mdfTest.printObjects();
|
||||
::unlink("metadataUpdateTest.meta");
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void s3storageTest1()
|
||||
{
|
||||
S3Storage s3;
|
||||
bool exists;
|
||||
int err;
|
||||
string testFile = "storagemanager.cnf";
|
||||
string testFile2 = testFile + "2";
|
||||
|
||||
exists = bf::exists(testFile);
|
||||
if (!exists)
|
||||
{
|
||||
cout << "s3storageTest1() requires having " << testFile << " in the current directory.";
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
err = s3.exists(testFile, &exists);
|
||||
assert(!err);
|
||||
if (exists)
|
||||
s3.deleteObject(testFile);
|
||||
|
||||
err = s3.exists(testFile2, &exists);
|
||||
assert(!err);
|
||||
if (exists)
|
||||
s3.deleteObject(testFile2);
|
||||
|
||||
// put it & get it
|
||||
err = s3.putObject(testFile, testFile);
|
||||
assert(!err);
|
||||
err = s3.exists(testFile, &exists);
|
||||
assert(!err);
|
||||
assert(exists);
|
||||
err = s3.getObject(testFile, testFile2);
|
||||
assert(!err);
|
||||
exists = bf::exists(testFile2);
|
||||
assert(bf::file_size(testFile) == bf::file_size(testFile2));
|
||||
|
||||
// do a deep compare testFile vs testFile2
|
||||
size_t len = bf::file_size(testFile);
|
||||
int fd1 = open(testFile.c_str(), O_RDONLY);
|
||||
assert(fd1 >= 0);
|
||||
int fd2 = open(testFile2.c_str(), O_RDONLY);
|
||||
assert(fd2 >= 0);
|
||||
|
||||
uint8_t *data1 = new uint8_t[len];
|
||||
uint8_t *data2 = new uint8_t[len];
|
||||
err = read(fd1, data1, len);
|
||||
assert(err == len);
|
||||
err = read(fd2, data2, len);
|
||||
assert(err == len);
|
||||
assert(!memcmp(data1, data2, len));
|
||||
close(fd1);
|
||||
close(fd2);
|
||||
delete [] data1;
|
||||
delete [] data2;
|
||||
|
||||
err = s3.copyObject(testFile, testFile2);
|
||||
assert(!err);
|
||||
err = s3.exists(testFile2, &exists);
|
||||
assert(!err);
|
||||
assert(exists);
|
||||
s3.deleteObject(testFile);
|
||||
s3.deleteObject(testFile2);
|
||||
}
|
||||
catch(exception &e)
|
||||
{
|
||||
cout << __FUNCTION__ << " caught " << e.what() << endl;
|
||||
assert(0);
|
||||
}
|
||||
cout << "S3Storage Test 1 OK" << endl;
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
@@ -908,6 +984,8 @@ int main()
|
||||
mergeJournalTest();
|
||||
replicatorTest();
|
||||
syncTest1();
|
||||
|
||||
s3storageTest1();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user