1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Tentative commit, hunting down a source of misbehavior.

Conflicts:
	storage-manager/src/IOCoordinator.cpp
This commit is contained in:
Patrick LeBlanc
2020-05-22 17:55:19 -04:00
parent 359beb9c96
commit faa35ebeeb
4 changed files with 88 additions and 10 deletions

View File

@ -18,6 +18,7 @@
#include "Downloader.h" #include "Downloader.h"
#include "Config.h" #include "Config.h"
#include "SMLogging.h" #include "SMLogging.h"
#include "MetadataFile.h"
#include <string> #include <string>
#include <errno.h> #include <errno.h>
#include <iostream> #include <iostream>
@ -166,6 +167,13 @@ void Downloader::Download::operator()()
bf::remove(tmpFile); bf::remove(tmpFile);
size = 0; size = 0;
} }
if (size != MetadataFile::getLengthFromKey(key))
{
ostringstream oss;
SMLogging *logr = SMLogging::get();
oss << "Downloader: got a file with a bad length field. key = " << key << " actual size = " << size;
logr->log(LOG_ERR, oss.str().c_str());
}
// move it to its proper place // move it to its proper place
boost::system::error_code berr; boost::system::error_code berr;

View File

@ -488,6 +488,8 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
//log error and abort //log error and abort
l_errno = errno; l_errno = errno;
logger->log(LOG_ERR,"IOCoordinator::write(): Failed newObject."); logger->log(LOG_ERR,"IOCoordinator::write(): Failed newObject.");
metadata.removeEntry(newObject.offset);
replicator->remove(firstDir/newObject.key);
errno = l_errno; errno = l_errno;
if (count == 0) // if no data has been written yet, it's safe to return -1 here. if (count == 0) // if no data has been written yet, it's safe to return -1 here.
return -1; return -1;
@ -509,14 +511,17 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
// get a new name for the object // get a new name for the object
string oldKey = newObject.key; string oldKey = newObject.key;
newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + objectOffset); newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + objectOffset);
int renameErr = ::rename((firstDir/oldKey).string().c_str(), (firstDir/newObject.key).string().c_str()); ostringstream os;
os << "IOCoordinator::write(): renaming " << oldKey << " to " << newObject.key;
logger->log(LOG_DEBUG, os.str().c_str());
int renameErr = ::rename((cachePath/firstDir/oldKey).string().c_str(), (cachePath/firstDir/newObject.key).string().c_str());
int renameErrno = errno; int renameErrno = errno;
if (renameErr < 0) if (renameErr < 0)
{ {
ostringstream oss; ostringstream oss;
char buf[80]; char buf[80];
oss << "IOCoordinator::write(): Failed to rename " << (firstDir/oldKey).string() << " to " << oss << "IOCoordinator::write(): Failed to rename " << (cachePath/firstDir/oldKey).string() << " to " <<
(firstDir/newObject.key).string() << "! Got " << strerror_r(renameErrno, buf, 80); (cachePath/firstDir/newObject.key).string() << "! Got " << strerror_r(renameErrno, buf, 80);
logger->log(LOG_ERR, oss.str().c_str()); logger->log(LOG_ERR, oss.str().c_str());
newObject.key = oldKey; newObject.key = oldKey;
} }
@ -527,6 +532,14 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
newObjectKeys.push_back(newObject.key); newObjectKeys.push_back(newObject.key);
goto out; goto out;
} }
if (bf::file_size(cachePath/firstDir/newObject.key) != MetadataFile::getLengthFromKey(newObject.key))
{
ostringstream oss;
oss << "IOCoordinator::write(): detected bad length field in " << newObject.key
<< " real size = " << bf::file_size(cachePath/firstDir/newObject.key);
logger->log(LOG_ERR, oss.str().c_str());
}
cache->newObject(firstDir, newObject.key,writeLength + objectOffset); cache->newObject(firstDir, newObject.key,writeLength + objectOffset);
newObjectKeys.push_back(newObject.key); newObjectKeys.push_back(newObject.key);
@ -635,6 +648,8 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
l_errno = errno; l_errno = errno;
//log error and abort //log error and abort
logger->log(LOG_ERR,"IOCoordinator::append(): Failed newObject."); logger->log(LOG_ERR,"IOCoordinator::append(): Failed newObject.");
metadata.removeEntry(newObject.offset);
replicator->remove(firstDir/newObject.key);
errno = l_errno; errno = l_errno;
// if no data was written successfully yet, it's safe to return -1 here. // if no data was written successfully yet, it's safe to return -1 here.
if (count == 0) if (count == 0)
@ -655,20 +670,32 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
{ {
string oldKey = newObject.key; string oldKey = newObject.key;
newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + newObject.offset); newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + newObject.offset);
int renameErr = ::rename((firstDir/oldKey).string().c_str(), (firstDir/newObject.key).string().c_str()); ostringstream os;
os << "IOCoordinator::append(): renaming " << oldKey << " to " << newObject.key;
logger->log(LOG_DEBUG, os.str().c_str());
int renameErr = ::rename((cachePath/firstDir/oldKey).string().c_str(), (cachePath/firstDir/newObject.key).string().c_str());
int renameErrno = errno; int renameErrno = errno;
if (renameErr < 0) if (renameErr < 0)
{ {
ostringstream oss; ostringstream oss;
char buf[80]; char buf[80];
oss << "IOCoordinator::write(): Failed to rename " << (firstDir/oldKey).string() << " to " << oss << "IOCoordinator::write(): Failed to rename " << (cachePath/firstDir/oldKey).string() << " to " <<
(firstDir/newObject.key).string() << "! Got " << strerror_r(renameErrno, buf, 80); (cachePath/firstDir/newObject.key).string() << "! Got " << strerror_r(renameErrno, buf, 80);
logger->log(LOG_ERR, oss.str().c_str()); logger->log(LOG_ERR, oss.str().c_str());
newObject.key = oldKey; newObject.key = oldKey;
} }
metadata.updateEntry(newObject.offset, newObject.key, err); metadata.updateEntry(newObject.offset, newObject.key, err);
} }
if (bf::file_size(cachePath/firstDir/newObject.key) != MetadataFile::getLengthFromKey(newObject.key))
{
ostringstream oss;
oss << "IOCoordinator::write(): detected bad length field in " << newObject.key
<< " real size = " << bf::file_size(cachePath/firstDir/newObject.key);
logger->log(LOG_ERR, oss.str().c_str());
}
cache->newObject(firstDir, newObject.key,err); cache->newObject(firstDir, newObject.key,err);
newObjectKeys.push_back(newObject.key); newObjectKeys.push_back(newObject.key);
@ -1034,7 +1061,7 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
for (const auto &object : objects) for (const auto &object : objects)
{ {
bf::path journalFile = journalPath/firstDir1/(object.key + ".journal"); bf::path journalFile = journalPath/firstDir1/(object.key + ".journal");
metadataObject newObj = meta2.addMetadataObject(filename2, object.length); metadataObject newObj = meta2.addMetadataObject(filename2, MetadataFile::getLengthFromKey(object.key));
assert(newObj.offset == object.offset); assert(newObj.offset == object.offset);
err = cs->copyObject(object.key, newObj.key); err = cs->copyObject(object.key, newObj.key);
if (err) if (err)
@ -1049,6 +1076,22 @@ int IOCoordinator::copyFile(const char *_filename1, const char *_filename2)
", dest = " + filename2 + ". Object " + object.key + " does not exist in either " ", dest = " + filename2 + ". Object " + object.key + " does not exist in either "
"cloud storage or the cache!"); "cloud storage or the cache!");
if (bf::file_size(cachedObjPath) != MetadataFile::getLengthFromKey(object.key))
{
ostringstream oss;
oss << "CopyFile: found a size mismatch in " << cachedObjPath <<
" real size = " << bf::file_size(cachedObjPath);
logger->log(LOG_ERR, oss.str().c_str());
}
if (MetadataFile::getLengthFromKey(object.key) != MetadataFile::getLengthFromKey(newObj.key))
{
ostringstream oss;
oss << "CopyFile: found a size mismatch in src and dest keys src = " << object.key <<
" dest = " << newObj.key;
logger->log(LOG_ERR, oss.str().c_str());
}
// put the copy in cloudstorage // put the copy in cloudstorage
err = cs->putObject(cachedObjPath.string(), newObj.key); err = cs->putObject(cachedObjPath.string(), newObj.key);
if (err) if (err)

View File

@ -101,8 +101,11 @@ int LocalStorage::copy(const bf::path &source, const bf::path &dest)
if (err) if (err)
{ {
errno = err.value(); errno = err.value();
::unlink(dest.string().c_str());
return -1; return -1;
} }
if (bf::file_size(source) != bf::file_size(dest))
logger->log(LOG_ERR, "LocalStorage::copy: partially copied a file somehow");
return 0; return 0;
} }
@ -216,6 +219,7 @@ int LocalStorage::putObject(boost::shared_array<uint8_t> data, size_t len, const
l_errno = errno; l_errno = errno;
//logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80)); //logger->log(LOG_CRIT, "LocalStorage::putObject(): Failed to write to %s, got '%s'", c_dest, strerror_r(errno, buf, 80));
close(fd); close(fd);
::unlink(c_dest);
errno = l_errno; errno = l_errno;
bytesWritten += count; bytesWritten += count;
return err; return err;
@ -240,6 +244,8 @@ int LocalStorage::copyObject(const string &source, const string &dest)
size_t _size = bf::file_size(prefix/source); size_t _size = bf::file_size(prefix/source);
bytesRead += _size; bytesRead += _size;
bytesWritten += _size; bytesWritten += _size;
if (bf::file_size(prefix/source) != bf::file_size(prefix/dest))
logger->log(LOG_ERR, "LocalStorage::copyObject(): partially copied a file somehow");
} }
return ret; return ret;
} }

View File

@ -437,7 +437,7 @@ void Synchronizer::process(list<string>::iterator name)
catch(exception &e) { catch(exception &e) {
// these are often self-resolving, so we will suppress logging it for 10 iterations, then escalate // these are often self-resolving, so we will suppress logging it for 10 iterations, then escalate
// to error, then to crit // to error, then to crit
if (++retryCount >= 10) //if (++retryCount >= 10)
logger->log((retryCount < 20 ? LOG_ERR : LOG_CRIT), "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(), logger->log((retryCount < 20 ? LOG_ERR : LOG_CRIT), "Synchronizer::process(): error sync'ing %s opFlags=%d, got '%s'. Retrying...", key.c_str(),
pending->opFlags, e.what()); pending->opFlags, e.what());
success = false; success = false;
@ -467,7 +467,7 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
{ {
ScopedReadLock s(ioc, sourceFile); ScopedReadLock s(ioc, sourceFile);
string &key = *it; string key = *it;
size_t pos = key.find_first_of('/'); size_t pos = key.find_first_of('/');
bf::path prefix = key.substr(0, pos); bf::path prefix = key.substr(0, pos);
string cloudKey = key.substr(pos + 1); string cloudKey = key.substr(pos + 1);
@ -499,7 +499,6 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
if (exists) if (exists)
return; return;
// TODO: should be safe to check with Cache instead of a file existence check
exists = cache->exists(prefix, cloudKey); exists = cache->exists(prefix, cloudKey);
if (!exists) if (!exists)
{ {
@ -507,9 +506,17 @@ void Synchronizer::synchronize(const string &sourceFile, list<string>::iterator
return; return;
} }
if (bf::file_size(cachePath/key) != MetadataFile::getLengthFromKey(cloudKey))
{
ostringstream oss;
oss << "Synchronizer::synchronize(): found a size mismatch in key = " << cloudKey <<
" real size = " << bf::file_size(cachePath/key);
logger->log(LOG_ERR, oss.str().c_str());
}
err = cs->putObject((cachePath / key).string(), cloudKey); err = cs->putObject((cachePath / key).string(), cloudKey);
if (err) if (err)
throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80)); throw runtime_error(string("synchronize(): uploading ") + key + ", got " + strerror_r(errno, buf, 80));
numBytesRead += mdEntry.length; numBytesRead += mdEntry.length;
bytesReadBySync += mdEntry.length; bytesReadBySync += mdEntry.length;
numBytesUploaded += mdEntry.length; numBytesUploaded += mdEntry.length;
@ -658,6 +665,20 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
// get a new key for the resolved version & upload it // get a new key for the resolved version & upload it
string newCloudKey = MetadataFile::getNewKeyFromOldKey(cloudKey, size); string newCloudKey = MetadataFile::getNewKeyFromOldKey(cloudKey, size);
string newKey = (prefix/newCloudKey).string(); string newKey = (prefix/newCloudKey).string();
try {
if (size != MetadataFile::getLengthFromKey(newCloudKey))
{
ostringstream oss;
oss << "SyncWithJournal: detected the file size mismatch on the merged object somehow. " <<
"key = " << newCloudKey << "real size = " << bf::file_size(prefix/newCloudKey);
logger->log(LOG_ERR, oss.str().c_str());
}
} catch(exception &e)
{
logger->log(LOG_ERR, "DEB4");
}
err = cs->putObject(data, size, newCloudKey); err = cs->putObject(data, size, newCloudKey);
if (err) if (err)
{ {