You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
More fixes around sync, error handling, and logging.
This commit is contained in:
@@ -387,8 +387,8 @@ void Cache::rename(const string &oldKey, const string &newKey, ssize_t sizediff)
|
||||
//assert(it != m_lru.end());
|
||||
if (it == m_lru.end())
|
||||
{
|
||||
logger->log(LOG_ERR, "Cache: was told to rename %s, but it does not exist", oldKey.string().c_str());
|
||||
assert(0);
|
||||
logger->log(LOG_WARNING, "Cache: was told to rename %s, but it is not in the cache", oldKey.c_str());
|
||||
return;
|
||||
}
|
||||
|
||||
auto lit = it->lit;
|
||||
|
||||
@@ -579,15 +579,14 @@ void IOCoordinator::deleteMetaFile(const bf::path &file)
|
||||
|
||||
Synchronizer *synchronizer = Synchronizer::get();
|
||||
|
||||
|
||||
// this is kind of ugly. We need to lock on file relative to metaPath
|
||||
string lockKey = file.string().substr(metaPath.string().length() + 1);
|
||||
cout << "Deletemetafile locking on " << lockKey << endl;
|
||||
ScopedWriteLock lock(this, lockKey);
|
||||
// this is kind of ugly. We need to lock on 'file' relative to metaPath, and without the .meta extension
|
||||
string pita = file.string();
|
||||
pita = pita.substr(metaPath.string().length() + 1);
|
||||
pita = pita.substr(0, pita.find_last_of('.'));
|
||||
ScopedWriteLock lock(this, pita);
|
||||
|
||||
MetadataFile meta(file);
|
||||
replicator->remove(file);
|
||||
lock.unlock();
|
||||
|
||||
vector<metadataObject> objects = meta.metadataRead(0, meta.getLength());
|
||||
vector<string> deletedObjects;
|
||||
@@ -1045,6 +1044,7 @@ void IOCoordinator::readLock(const string &filename)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(lockMutex);
|
||||
|
||||
//cout << "read-locking " << filename << endl;
|
||||
auto ins = locks.insert(pair<string, RWLock *>(filename, NULL));
|
||||
if (ins.second)
|
||||
ins.first->second = new RWLock();
|
||||
@@ -1068,6 +1068,7 @@ void IOCoordinator::writeLock(const string &filename)
|
||||
{
|
||||
boost::unique_lock<boost::mutex> s(lockMutex);
|
||||
|
||||
//cout << "write-locking " << filename << endl;
|
||||
auto ins = locks.insert(pair<string, RWLock *>(filename, NULL));
|
||||
if (ins.second)
|
||||
ins.first->second = new RWLock();
|
||||
|
||||
@@ -74,12 +74,15 @@ int LocalStorage::getObject(const std::string &sourceKey, boost::shared_array<ui
|
||||
{
|
||||
bf::path source = prefix / sourceKey;
|
||||
const char *c_source = source.string().c_str();
|
||||
char buf[80];
|
||||
//char buf[80];
|
||||
int l_errno;
|
||||
|
||||
int fd = ::open(c_source, O_RDONLY);
|
||||
if (fd < 0)
|
||||
{
|
||||
logger->log(LOG_CRIT, "LocalStorage::getObject() failed to open %s, got '%s'", c_source, strerror_r(errno, buf, 80));
|
||||
l_errno = errno;
|
||||
//logger->log(LOG_WARNING, "LocalStorage::getObject() failed to open %s, got '%s'", c_source, strerror_r(errno, buf, 80));
|
||||
errno = l_errno;
|
||||
return fd;
|
||||
}
|
||||
|
||||
@@ -91,8 +94,10 @@ int LocalStorage::getObject(const std::string &sourceKey, boost::shared_array<ui
|
||||
int err = ::read(fd, &(*data)[count], l_size - count);
|
||||
if (err < 0)
|
||||
{
|
||||
logger->log(LOG_CRIT, "LocalStorage::getObject() failed to read %s, got '%s'", c_source, strerror_r(errno, buf, 80));
|
||||
l_errno = errno;
|
||||
//logger->log(LOG_WARNING, "LocalStorage::getObject() failed to read %s, got '%s'", c_source, strerror_r(errno, buf, 80));
|
||||
close(fd);
|
||||
errno = l_errno;
|
||||
return err;
|
||||
}
|
||||
count += err;
|
||||
@@ -112,12 +117,15 @@ int LocalStorage::putObject(boost::shared_array<uint8_t> data, size_t len, const
|
||||
{
|
||||
bf::path destPath = prefix / dest;
|
||||
const char *c_dest = destPath.string().c_str();
|
||||
char buf[80];
|
||||
//char buf[80];
|
||||
int l_errno;
|
||||
|
||||
int fd = ::open(c_dest, O_WRONLY | O_CREAT | O_TRUNC, 0600);
|
||||
if (fd < 0)
|
||||
{
|
||||
logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to open %s, got '%s'", c_dest, strerror_r(errno, buf, 80));
|
||||
l_errno = errno;
|
||||
//logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to open %s, got '%s'", c_dest, strerror_r(errno, buf, 80));
|
||||
errno = l_errno;
|
||||
return fd;
|
||||
}
|
||||
|
||||
@@ -128,8 +136,10 @@ int LocalStorage::putObject(boost::shared_array<uint8_t> data, size_t len, const
|
||||
err = ::write(fd, &data[count], len - count);
|
||||
if (err < 0)
|
||||
{
|
||||
logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80));
|
||||
l_errno = errno;
|
||||
//logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80));
|
||||
close(fd);
|
||||
errno = l_errno;
|
||||
return err;
|
||||
}
|
||||
count += err;
|
||||
|
||||
@@ -147,16 +147,16 @@ int S3Storage::getObject(const string &sourceKey, boost::shared_array<uint8_t> *
|
||||
|
||||
do {
|
||||
err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len);
|
||||
if (err)
|
||||
if (err && retryable_error(err))
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
||||
sleep(5);
|
||||
}
|
||||
} while (err && retryable_error(err));
|
||||
if (err)
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
|
||||
logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
|
||||
data->reset();
|
||||
errno = s3err_to_errno[err];
|
||||
@@ -200,7 +200,7 @@ int S3Storage::putObject(const string &sourceFile, const string &destKey)
|
||||
if (err < 0)
|
||||
{
|
||||
int l_errno = errno;
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): Failed to read %s @ position %s, got %s", sourceFile.c_str(),
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): Failed to read %s @ position %ld, got %s", sourceFile.c_str(),
|
||||
count, strerror_r(l_errno, buf, 80));
|
||||
errno = l_errno;
|
||||
return -1;
|
||||
@@ -208,7 +208,7 @@ int S3Storage::putObject(const string &sourceFile, const string &destKey)
|
||||
else if (err == 0)
|
||||
{
|
||||
// this shouldn't happen, we just checked the size
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): Got early EOF reading %s @ position %s", sourceFile.c_str(),
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): Got early EOF reading %s @ position %ld", sourceFile.c_str(),
|
||||
count);
|
||||
errno = ENODATA; // is there a better errno for early eof?
|
||||
return -1;
|
||||
@@ -227,16 +227,16 @@ int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, co
|
||||
|
||||
do {
|
||||
s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len);
|
||||
if (s3err)
|
||||
if (s3err && retryable_error(s3err))
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
||||
logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
|
||||
" Retrying...", s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
||||
sleep(5);
|
||||
}
|
||||
} while (s3err && retryable_error(s3err));
|
||||
if (s3err)
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
|
||||
logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
|
||||
s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
|
||||
errno = s3err_to_errno[s3err];
|
||||
return -1;
|
||||
@@ -252,7 +252,7 @@ void S3Storage::deleteObject(const string &key)
|
||||
|
||||
do {
|
||||
s3err = ms3_delete(creds, bucket.c_str(), key.c_str());
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND)
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err))
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
@@ -287,7 +287,7 @@ int S3Storage::exists(const string &key, bool *out)
|
||||
|
||||
do {
|
||||
s3err = ms3_status(creds, bucket.c_str(), key.c_str(), &status);
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND)
|
||||
if (s3err && s3err != MS3_ERR_NOT_FOUND && retryable_error(s3err))
|
||||
{
|
||||
logger->log(LOG_CRIT, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
|
||||
s3err_msgs[s3err], bucket.c_str(), key.c_str());
|
||||
|
||||
@@ -346,11 +346,29 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80));
|
||||
}
|
||||
err = ioc->mergeJournalInMem(data, &size, journalName.c_str());
|
||||
assert(!err);
|
||||
if (err)
|
||||
{
|
||||
if (!bf::exists(journalName))
|
||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking",
|
||||
journalName.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "synchronizeWithJournal(): unexpected error merging journal for %s", key.c_str());
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, &size);
|
||||
assert(data);
|
||||
if (!data)
|
||||
{
|
||||
if (!bf::exists(journalName))
|
||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): journal %s was deleted mid-operation, check locking",
|
||||
journalName.c_str());
|
||||
else
|
||||
logger->log(LOG_ERR, "synchronizeWithJournal(): unexpected error merging journal for %s", key.c_str());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// get a new key for the resolved version & upload it
|
||||
string newKey = MetadataFile::getNewKeyFromOldKey(key, size);
|
||||
|
||||
@@ -1329,8 +1329,8 @@ void IOCCopyFile1()
|
||||
assert(err == 8192);
|
||||
assert(memcmp(buf1, buf2, 8192) == 0);
|
||||
|
||||
ioc->unlink("copyfile1");
|
||||
ioc->unlink("copyfile2");
|
||||
assert(ioc->unlink("copyfile1") == 0);
|
||||
assert(ioc->unlink("copyfile2") == 0);
|
||||
assert(cache->getCurrentCacheSize() == 0);
|
||||
cout << "IOC copy file 1 OK" << endl;
|
||||
}
|
||||
@@ -1390,8 +1390,8 @@ void IOCCopyFile3()
|
||||
assert(err == 8192);
|
||||
assert(memcmp(buf1, buf2, 8192) == 0);
|
||||
|
||||
ioc->unlink("copyfile3");
|
||||
ioc->unlink("copyfile4");
|
||||
assert(ioc->unlink("copyfile3") == 0);
|
||||
assert(ioc->unlink("copyfile4") == 0);
|
||||
assert(cache->getCurrentCacheSize() == 0);
|
||||
cout << "IOC copy file 3 OK" << endl;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user