1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Cleaned up a little code of previous commit, added retry loops and

a little better error handling to the code that writes journal entries.
This commit is contained in:
Patrick LeBlanc
2020-05-21 16:28:32 -04:00
parent aeec468814
commit 4b9c1d9169
3 changed files with 100 additions and 17 deletions

View File

@ -154,6 +154,67 @@ int Replicator::newNullObject(const boost::filesystem::path &filename,size_t len
return err; return err;
} }
ssize_t Replicator::_pwrite(int fd, const void *data, size_t length, off_t offset)
{
ssize_t err;
size_t count = 0;
uint8_t *bData = (uint8_t *) data;
do
{
err = ::pwrite(fd, &bData[count], length - count, offset + count);
if (err < 0 || (err == 0 && errno != EINTR))
{
if (count > 0)
return count;
else
return err;
}
err += count;
} while (count < length);
return count;
}
ssize_t Replicator::_write(int fd, const void *data, size_t length)
{
ssize_t err;
size_t count = 0;
uint8_t *bData = (uint8_t *) data;
do
{
err = ::write(fd, &bData[count], length - count);
if (err < 0 || (err == 0 && errno != EINTR))
{
if (count > 0)
return count;
else
return err;
}
err += count;
} while (count < length);
return count;
}
/* XXXPAT: I think we'll have to rewrite this function some; we'll have to at least clearly define
what happens in the various error scenarios.
To be more resilent in the face of hard errors, we may also want to redefine what a journal file is.
If/when we cannot fix the journal file in the face of an error, there are scenarios that the read code
will not be able to cope with. Ex, a journal entry that says it's 200 bytes long, but there are only
really 100 bytes. The read code has no way to tell the difference if there is an entry that follows
the bad entry, and that will cause an unrecoverable error.
Initial thought on a sol'n. Make each journal entry its own file in a tmp dir, ordered by a sequence
number in the filename. Then, one entry cannot affect the others, and the end of the file is unambiguously
the end of the data. On successful write, move the file to where it should be. This would also prevent
the readers from ever seeing bad data, and possibly reduce the size of some critical sections.
Benefits would be data integrity, and possibly add'l parallelism. The downside is of course, a higher
number of IO ops for the same operation.
*/
int Replicator::addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length) int Replicator::addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length)
{ {
int fd, err; int fd, err;
@ -177,7 +238,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
bHeaderChanged = true; bHeaderChanged = true;
// create new journal file with header // create new journal file with header
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
err = ::write(fd, header.c_str(), header.length() + 1); err = _write(fd, header.c_str(), header.length() + 1);
l_errno = errno; l_errno = errno;
repHeaderDataWritten += (header.length() + 1); repHeaderDataWritten += (header.length() + 1);
if ((uint)err != (header.length() + 1)) if ((uint)err != (header.length() + 1))
@ -238,26 +299,32 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
{ {
bHeaderChanged = true; bHeaderChanged = true;
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
err = ::pwrite(fd, header.c_str(), header.length() + 1,0); err = _pwrite(fd, header.c_str(), header.length() + 1,0);
l_errno = errno;
repHeaderDataWritten += (header.length() + 1); repHeaderDataWritten += (header.length() + 1);
if ((uint)err != (header.length() + 1)) if ((uint)err != (header.length() + 1))
{ {
// only the header was possibly changed rollback attempt // only the header was possibly changed rollback attempt
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Updating journal header failed. " mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Updating journal header failed. "
"Attempting to rollback and continue."); "Attempting to rollback and continue.");
int rollbackErr = ::pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0); int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0);
if ((uint)rollbackErr == (headerRollback.length() + 1)) if ((uint)rollbackErr == (headerRollback.length() + 1))
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header success."); mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header success.");
else else
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed!"); mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed!");
return err; errno = l_errno;
if (err < 0)
return err;
else
return 0;
} }
} }
} }
off_t entryHeaderOffset = ::lseek(fd, 0, SEEK_END); off_t entryHeaderOffset = ::lseek(fd, 0, SEEK_END);
err = ::write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE); err = _write(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE);
l_errno = errno;
repHeaderDataWritten += JOURNAL_ENTRY_HEADER_SIZE; repHeaderDataWritten += JOURNAL_ENTRY_HEADER_SIZE;
if (err != JOURNAL_ENTRY_HEADER_SIZE) if (err != JOURNAL_ENTRY_HEADER_SIZE)
{ {
@ -266,12 +333,16 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
{ {
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: write journal entry header failed. Attempting to rollback and continue."); mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: write journal entry header failed. Attempting to rollback and continue.");
//attempt to rollback top level header //attempt to rollback top level header
int rollbackErr = ::pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0); int rollbackErr = _pwrite(fd, headerRollback.c_str(), headerRollback.length() + 1,0);
if ((uint)rollbackErr != (headerRollback.length() + 1)) if ((uint)rollbackErr != (headerRollback.length() + 1))
{ {
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed! (%s)", mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed! (%s)",
strerror_r(errno, errbuf, 80)); strerror_r(errno, errbuf, 80));
return err; errno = l_errno;
if (err < 0)
return err;
else
return 0;
} }
} }
int rollbackErr = ::ftruncate(fd,entryHeaderOffset); int rollbackErr = ::ftruncate(fd,entryHeaderOffset);
@ -279,13 +350,16 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
{ {
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)", mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Truncate to previous EOF failed! (%s)",
strerror_r(errno, errbuf, 80)); strerror_r(errno, errbuf, 80));
return err; if (err < 0)
return err;
else
return 0;
} }
l_errno = errno;
return err; return err;
} }
while (count < length) { while (count < length) {
err = ::write(fd, &data[count], length - count); err = _write(fd, &data[count], length - count);
if (err < 0 ) if (err < 0 )
{ {
l_errno = errno; l_errno = errno;
@ -301,7 +375,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
if (thisEntryMaxOffset > currentMaxOffset) if (thisEntryMaxOffset > currentMaxOffset)
{ {
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str(); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % thisEntryMaxOffset).str();
int rollbackErr = ::pwrite(fd, header.c_str(), header.length() + 1,0); int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1,0);
if ((uint)rollbackErr != (header.length() + 1)) if ((uint)rollbackErr != (header.length() + 1))
{ {
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal header failed! (%s)", mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal header failed! (%s)",
@ -312,7 +386,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
} }
// Update the journal entry header // Update the journal entry header
offlen[1] = count; offlen[1] = count;
int rollbackErr = ::pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE,entryHeaderOffset); int rollbackErr = _pwrite(fd, offlen, JOURNAL_ENTRY_HEADER_SIZE,entryHeaderOffset);
if ((uint)rollbackErr != JOURNAL_ENTRY_HEADER_SIZE) if ((uint)rollbackErr != JOURNAL_ENTRY_HEADER_SIZE)
{ {
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal entry header failed! (%s)", mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Update of journal entry header failed! (%s)",
@ -337,7 +411,7 @@ int Replicator::addJournalEntry(const boost::filesystem::path &filename, const u
"Attempting to rollback and continue.", strerror_r(l_errno, errbuf, 80)); "Attempting to rollback and continue.", strerror_r(l_errno, errbuf, 80));
//attempt to rollback top level header //attempt to rollback top level header
string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % 0).str(); string header = (boost::format("{ \"version\" : \"%03i\", \"max_offset\" : \"%011u\" }") % version % 0).str();
int rollbackErr = ::pwrite(fd, header.c_str(), header.length() + 1,0); int rollbackErr = _pwrite(fd, header.c_str(), header.length() + 1,0);
if ((uint)rollbackErr != (header.length() + 1)) if ((uint)rollbackErr != (header.length() + 1))
{ {
mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed (%s)!", mpLogger->log(LOG_CRIT, "Replicator::addJournalEntry: Rollback of journal header failed (%s)!",

View File

@ -58,6 +58,11 @@ class Replicator
private: private:
Replicator(); Replicator();
// a couple helpers
ssize_t _write(int fd, const void *data, size_t len);
ssize_t _pwrite(int fd, const void *data, size_t len, off_t offset);
Config *mpConfig; Config *mpConfig;
SMLogging *mpLogger; SMLogging *mpLogger;
std::string msJournalPath; std::string msJournalPath;

View File

@ -694,9 +694,14 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
} }
numBytesWritten += size; numBytesWritten += size;
//assert(bf::file_size(oldCachePath) == MetadataFile::getLengthFromKey(cloudKey)); size_t oldSize = bf::file_size(oldCachePath);
cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey));
if (bf::file_size(oldCachePath) != MetadataFile::getLengthFromKey(cloudKey)) cache->rename(prefix, cloudKey, newCloudKey, size - oldSize);
replicator->remove(oldCachePath);
// This condition is probably irrelevant for correct functioning now,
// but it should be very rare so what the hell.
if (oldSize != MetadataFile::getLengthFromKey(cloudKey))
{ {
ostringstream oss; ostringstream oss;
oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " << oss << "Synchronizer::synchronizeWithJournal(): detected a mismatch between file size and " <<
@ -706,7 +711,6 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
logger->log(LOG_WARNING, oss.str().c_str()); logger->log(LOG_WARNING, oss.str().c_str());
cache->repopulate(prefix); cache->repopulate(prefix);
} }
replicator->remove(oldCachePath);
} }
mergeDiff += size - originalSize; mergeDiff += size - originalSize;