1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Fixed a number of bugs in storage manager, and added code to detect

and recover from being killed while writing new objects.

Conflicts:
	storage-manager/src/Synchronizer.cpp
This commit is contained in:
Patrick LeBlanc
2020-05-20 18:37:17 -04:00
parent 5b6f1290d7
commit aeec468814
7 changed files with 107 additions and 23 deletions

View File

@ -355,6 +355,20 @@ void Cache::configListener()
logger->log(LOG_CRIT, "Cache/cache_size is not a number. Using current value = %zi",maxCacheSize);
}
}
void Cache::repopulate()
{
boost::unique_lock<boost::mutex> sl(lru_mutex);
for (auto &pcache : prefixCaches)
pcache.second->repopulate();
}
void Cache::repopulate(const boost::filesystem::path &p)
{
getPCache(p).repopulate();
}
}

View File

@ -92,6 +92,11 @@ class Cache : public boost::noncopyable , public ConfigListener
void shutdown();
void printKPIs() const;
// Used to update accounting variables in the PrefixCaches when a potential error
// is detected.
void repopulate();
void repopulate(const boost::filesystem::path &prefix);
// test helpers
const boost::filesystem::path &getCachePath() const;
const boost::filesystem::path &getJournalPath() const;

View File

@ -505,6 +505,8 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
dataRemaining -= err;
count += err;
iocBytesWritten += err;
// get a new name for the object
newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + objectOffset);
metadata.updateEntryLength(newObject.offset, (err + objectOffset));
cache->newObject(firstDir, newObject.key,err + objectOffset);
newObjectKeys.push_back(newObject.key);
@ -634,14 +636,17 @@ ssize_t IOCoordinator::append(const char *_filename, const uint8_t *data, size_t
count += err;
dataRemaining -= err;
iocBytesWritten += err;
if (err < (int64_t) writeLength)
{
newObject.key = metadata.getNewKeyFromOldKey(newObject.key, err + newObject.offset);
metadata.updateEntry(newObject.offset, newObject.key, err + newObject.offset);
}
cache->newObject(firstDir, newObject.key,err);
newObjectKeys.push_back(newObject.key);
if (err < (int64_t) writeLength)
{
//logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length);
// make the object reflect length actually written
metadata.updateEntryLength(newObject.offset, err);
goto out;
}
}
@ -1135,7 +1140,10 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
objFD = ::open(object, O_RDONLY);
if (objFD < 0)
{
*_bytesReadOut = 0;
return ret;
}
ScopedCloser s1(objFD);
ret.reset(new uint8_t[len]);
@ -1148,11 +1156,12 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
int err = ::read(objFD, &ret[count], len - count);
if (err < 0)
{
char buf[80];
logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(errno, buf, 80));
int l_errno = errno;
char buf[80];
logger->log(LOG_CRIT, "IOC::mergeJournal(): failed to read %s, got '%s'", object, strerror_r(l_errno, buf, 80));
ret.reset();
errno = l_errno;
*_bytesReadOut = count;
return ret;
}
else if (err == 0)
@ -1171,17 +1180,18 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
size_t mjimBytesRead = 0;
int mjimerr = mergeJournalInMem(ret, len, journal, &mjimBytesRead);
if (mjimerr)
{
ret.reset();
return ret;
}
l_bytesRead += mjimBytesRead;
*_bytesReadOut = l_bytesRead;
return ret;
}
journalFD = ::open(journal, O_RDONLY);
if (journalFD < 0)
{
*_bytesReadOut = l_bytesRead;
return ret;
}
ScopedCloser s2(journalFD);
boost::shared_array<char> headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead);
@ -1219,17 +1229,21 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
err = ::read(journalFD, &ret[startReadingAt - offset + count], lengthOfRead - count);
if (err < 0)
{
int l_errno = errno;
char buf[80];
logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(errno, buf, 80));
logger->log(LOG_ERR, "mergeJournal: got %s", strerror_r(l_errno, buf, 80));
ret.reset();
return ret;
errno = l_errno;
l_bytesRead += count;
goto out;
}
else if (err == 0)
{
logger->log(LOG_ERR, "mergeJournal: got early EOF. offset=%ld, len=%ld, jOffset=%ld, jLen=%ld,"
" startReadingAt=%ld, lengthOfRead=%ld", offset, len, offlen[0], offlen[1], startReadingAt, lengthOfRead);
ret.reset();
return ret;
l_bytesRead += count;
goto out;
}
count += err;
}
@ -1243,6 +1257,7 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
// skip over this journal entry
::lseek(journalFD, offlen[1], SEEK_CUR);
}
out:
*_bytesReadOut = l_bytesRead;
return ret;
}

View File

@ -127,12 +127,20 @@ PrefixCache::~PrefixCache()
*/
}
void PrefixCache::populate()
void PrefixCache::repopulate()
{
lru_mutex.lock();
populate(false);
}
void PrefixCache::populate(bool useSync)
{
Synchronizer *sync = Synchronizer::get();
bf::directory_iterator dir(cachePrefix);
bf::directory_iterator dend;
vector<string> newObjects;
lru.clear();
m_lru.clear();
while (dir != dend)
{
// put everything in lru & m_lru
@ -143,12 +151,14 @@ void PrefixCache::populate()
auto last = lru.end();
m_lru.insert(--last);
currentCacheSize += bf::file_size(*dir);
if (useSync)
newObjects.push_back(p.filename().string());
}
else if (p != cachePrefix/downloader->getTmpPath())
logger->log(LOG_WARNING, "Cache: found something in the cache that does not belong '%s'", p.string().c_str());
++dir;
}
if (useSync)
sync->newObjects(firstDir, newObjects);
newObjects.clear();
@ -164,6 +174,7 @@ void PrefixCache::populate()
{
size_t s = bf::file_size(*dir);
currentCacheSize += s;
if (useSync)
newJournals.push_back(pair<string, size_t>(p.stem().string(), s));
}
else
@ -174,6 +185,7 @@ void PrefixCache::populate()
++dir;
}
lru_mutex.unlock();
if (useSync)
sync->newJournalEntries(firstDir, newJournals);
}
@ -380,14 +392,25 @@ void PrefixCache::newJournalEntry(size_t size)
void PrefixCache::deletedJournal(size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
//assert(currentCacheSize >= size);
if (currentCacheSize >= size)
currentCacheSize -= size;
else
{
ostringstream oss;
oss << "PrefixCache::deletedJournal(): Detected an accounting error." <<
" Reloading cache metadata, this will pause IO activity briefly.";
logger->log(LOG_WARNING, oss.str().c_str());
populate(false);
}
}
void PrefixCache::deletedObject(const string &key, size_t size)
{
boost::unique_lock<boost::mutex> s(lru_mutex);
assert(currentCacheSize >= size);
//assert(currentCacheSize >= size);
M_LRU_t::iterator mit = m_lru.find(key);
assert(mit != m_lru.end());
@ -397,7 +420,16 @@ void PrefixCache::deletedObject(const string &key, size_t size)
doNotEvict.erase(mit->lit);
lru.erase(mit->lit);
m_lru.erase(mit);
if (currentCacheSize >= size)
currentCacheSize -= size;
else
{
ostringstream oss;
oss << "PrefixCache::deletedObject(): Detected an accounting error." <<
" Reloading cache metadata, this will pause IO activity briefly.";
logger->log(LOG_WARNING, oss.str().c_str());
populate(false);
}
}
}

View File

@ -77,6 +77,11 @@ class PrefixCache : public boost::noncopyable
size_t getMaxCacheSize() const;
void shutdown();
// clears out cache structures and reloads them from cache/journal dir contents
// needed to potentially repair the cache's accounting error after detecting
// an error.
void repopulate();
// test helpers
const boost::filesystem::path &getCachePath();
const boost::filesystem::path &getJournalPath();
@ -97,7 +102,9 @@ class PrefixCache : public boost::noncopyable
SMLogging *logger;
Downloader *downloader;
void populate();
// useSync makes populate() tell Synchronizer about what it finds.
// set it to false when the system is already fully up.
void populate(bool useSync = true);
void _makeSpace(size_t size);
/* The main PrefixCache structures */

View File

@ -128,7 +128,7 @@ int Replicator::newObject(const boost::filesystem::path &filename, const uint8_t
OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT);
size_t count = 0;
while (count < length) {
err = ::pwrite(fd, &data[count], length - count, offset);
err = ::pwrite(fd, &data[count], length - count, offset + count);
if (err <= 0)
{
if (count > 0) // return what was successfully written

View File

@ -683,7 +683,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
while (count < size)
{
err = ::write(newFD, data.get(), size - count);
err = ::write(newFD, &data[count], size - count);
if (err < 0)
{
::unlink(newCachePath.string().c_str());
@ -693,8 +693,19 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
count += err;
}
numBytesWritten += size;
assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey));
//assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey));
cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey));
if (bf::file_size(oldCachePath) != MetadataFile::getLengthFromKey(cloudKey))
{
ostringstream oss;
oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " <<
"length stored in the object name. object name = " << cloudKey << " length-in-name = " <<
MetadataFile::getLengthFromKey(cloudKey) << " real-length = " << bf::file_size(oldCachePath)
<< ". Reloading cache metadata, this will pause IO activity briefly.";
logger->log(LOG_WARNING, oss.str().c_str());
cache->repopulate(prefix);
}
replicator->remove(oldCachePath);
}