1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Wrote a version of mergejournal that uses more mem but reduces the io op

requirement.  Added a safety valve; if the journal file is > 100MB, it
will fall back to the previous IO op heavy but mem friendly version.

This also includes a compiler warning fix for smls & smput.
This commit is contained in:
Patrick LeBlanc
2019-08-22 16:38:27 -05:00
parent fa0da4b9bb
commit 281443cb0a
5 changed files with 129 additions and 39 deletions

View File

@ -1083,19 +1083,6 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
if (objFD < 0)
return ret;
ScopedCloser s1(objFD);
journalFD = ::open(journal, O_RDONLY);
if (journalFD < 0)
return ret;
ScopedCloser s2(journalFD);
// grab the journal header, make sure the version is 1, and get the max offset
boost::shared_array<char> headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead);
stringstream ss;
ss << headertxt.get();
boost::property_tree::ptree header;
boost::property_tree::json_parser::read_json(ss, header);
assert(header.get<int>("version") == 1);
ret.reset(new uint8_t[len]);
@ -1117,14 +1104,39 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
else if (err == 0)
{
// at the EOF of the object. The journal may contain entries that append to the data,
// so 0-fill the remaining bytes.
memset(&ret[count], 0, len-count);
break;
}
count += err;
}
l_bytesRead += count;
// mergeJournalInMem has lower a IOPS requirement than the fully general code in this fcn. Use
// that if the caller requested the whole object to be merged
if (offset == 0 && (ssize_t) len >= ::lseek(objFD, 0, SEEK_END))
{
size_t mjimBytesRead = 0;
int mjimerr = mergeJournalInMem(ret, len, journal, &mjimBytesRead);
if (mjimerr)
{
ret.reset();
return ret;
}
l_bytesRead += mjimBytesRead;
return ret;
}
journalFD = ::open(journal, O_RDONLY);
if (journalFD < 0)
return ret;
ScopedCloser s2(journalFD);
boost::shared_array<char> headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead);
stringstream ss;
ss << headertxt.get();
boost::property_tree::ptree header;
boost::property_tree::json_parser::read_json(ss, header);
assert(header.get<int>("version") == 1);
// start processing the entries
while (1)
{
@ -1134,8 +1146,7 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
break;
assert(err == 16);
l_bytesRead += 16;
//cout << "MJ: got offset " << offlen[0] << " length " << offlen[1] << endl;
// if this entry overlaps, read the overlapping section
uint64_t lastJournalOffset = offlen[0] + offlen[1];
uint64_t lastBufOffset = offset + len;
@ -1144,8 +1155,6 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
uint64_t startReadingAt = max(offlen[0], (uint64_t) offset);
uint64_t lengthOfRead = min(lastBufOffset, lastJournalOffset) - startReadingAt;
//cout << "MJ: startReadingAt = " << startReadingAt << " offlen[0] = " << offlen[0] << endl;
// seek to the portion of the entry to start reading at
if (startReadingAt != offlen[0])
::lseek(journalFD, startReadingAt - offlen[0], SEEK_CUR);
@ -1184,9 +1193,95 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
return ret;
}
// MergeJournalInMem is a specialized version of mergeJournal(). TODO: refactor if possible.
// MergeJournalInMem is a specialized version of mergeJournal(). This is currently only used by Synchronizer
// and mergeJournal(), and only for merging the whole object with the whole journal.
int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t len, const char *journalPath,
size_t *_bytesReadOut) const
{
// if the journal is over some size threshold (100MB for now why not),
// use the original low-mem-usage version
if (len > (100 << 20))
return mergeJournalInMem_bigJ(objData, len, journalPath, _bytesReadOut);
size_t l_bytesRead = 0;
int journalFD = ::open(journalPath, O_RDONLY);
if (journalFD < 0)
return -1;
ScopedCloser s(journalFD);
// grab the journal header and make sure the version is 1
boost::shared_array<char> headertxt = seekToEndOfHeader1(journalFD, &l_bytesRead);
stringstream ss;
ss << headertxt.get();
boost::property_tree::ptree header;
boost::property_tree::json_parser::read_json(ss, header);
assert(header.get<int>("version") == 1);
// read the journal file into memory
size_t journalBytes = ::lseek(journalFD, 0, SEEK_END) - l_bytesRead;
::lseek(journalFD, l_bytesRead, SEEK_SET);
boost::scoped_array<uint8_t> journalData(new uint8_t[journalBytes]);
size_t readCount = 0;
while (readCount < journalBytes)
{
ssize_t err = ::read(journalFD, &journalData[readCount], journalBytes - readCount);
if (err < 0)
{
char buf[80];
int l_errno = errno;
logger->log(LOG_ERR, "mergeJournalInMem: got %s", strerror_r(errno, buf, 80));
errno = l_errno;
return -1;
}
else if (err == 0)
{
logger->log(LOG_ERR, "mergeJournalInMem: got early EOF");
errno = ENODATA; // is there a better errno for early EOF?
return -1;
}
readCount += err;
l_bytesRead += err;
}
// start processing the entries
size_t offset = 0;
while (offset < journalBytes)
{
if (offset + 16 >= journalBytes)
{
logger->log(LOG_ERR, "mergeJournalInMem: got early EOF");
errno = ENODATA; // is there a better errno for early EOF?
return -1;
}
uint64_t *offlen = (uint64_t *) &journalData[offset];
offset += 16;
uint64_t startReadingAt = offlen[0];
uint64_t lengthOfRead = offlen[1];
if (startReadingAt > len)
{
offset += offlen[1];
continue;
}
if (startReadingAt + lengthOfRead > len)
lengthOfRead = len - startReadingAt;
if (offset + lengthOfRead > journalBytes)
{
logger->log(LOG_ERR, "mergeJournalInMem: got early EOF");
errno = ENODATA; // is there a better errno for early EOF?
return -1;
}
memcpy(&objData[startReadingAt], &journalData[offset], lengthOfRead);
offset += offlen[1];
}
*_bytesReadOut = l_bytesRead;
return 0;
}
int IOCoordinator::mergeJournalInMem_bigJ(boost::shared_array<uint8_t> &objData, size_t len, const char *journalPath,
size_t *_bytesReadOut) const
{
size_t l_bytesRead = 0;
int journalFD = ::open(journalPath, O_RDONLY);
@ -1201,7 +1296,6 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
boost::property_tree::ptree header;
boost::property_tree::json_parser::read_json(ss, header);
assert(header.get<int>("version") == 1);
//size_t maxJournalOffset = header.get<size_t>("max_offset");
// start processing the entries
while (1)
@ -1222,23 +1316,15 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
uint64_t startReadingAt = offlen[0];
uint64_t lengthOfRead = offlen[1];
// XXXPAT: Speculative change. Got mem errors from writing past the end of objData. The length
// in the metadata is shorter than this journal entry, and not because it got crazy values.
// I think the explanation is a truncation.
if (startReadingAt > len)
{
//logger->log(LOG_CRIT, "mergeJournalInMem: skipping a theoretically irrelevant journal entry in %s. "
// "jstart = %llu, max = %llu", journalPath, startReadingAt, len);
::lseek(journalFD, offlen[1], SEEK_CUR);
continue;
}
if (startReadingAt + lengthOfRead > len)
{
//logger->log(LOG_CRIT, "mergeJournalInMem: possibly bad journal entry in %s. jStart = %llu, jEnd = %llu, max = %llu",
// journalPath, startReadingAt, startReadingAt + lengthOfRead, len);
lengthOfRead = len - startReadingAt;
}
uint count = 0;
while (count < lengthOfRead)
{
@ -1267,8 +1353,6 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
return 0;
}
void IOCoordinator::readLock(const string &filename)
{
boost::unique_lock<boost::mutex> s(lockMutex);

View File

@ -63,10 +63,15 @@ class IOCoordinator : public boost::noncopyable
boost::shared_array<uint8_t> mergeJournal(const char *objectPath, const char *journalPath, off_t offset,
size_t len, size_t *sizeRead) const;
// this version modifies object data in memory, given the journal filename
// this version modifies object data in memory, given the journal filename. Processes the whole object
// and whole journal file.
int mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t len, const char *journalPath,
size_t *sizeRead) const;
// this version of MJIM has a higher IOPS requirement and lower mem usage.
int mergeJournalInMem_bigJ(boost::shared_array<uint8_t> &objData, size_t len, const char *journalPath,
size_t *sizeRead) const;
// this version takes already-open file descriptors, and an already-allocated buffer as input.
// file descriptor are positioned, eh, best not to assume anything about their positions
// on return.

View File

@ -669,7 +669,8 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
count += err;
}
numBytesWritten += size;
cache->rename(prefix, cloudKey, newCloudKey, size - bf::file_size(oldCachePath));
assert(bf::file_size(oldCachePath) == MetadataFile:getLengthFromKey(cloudKey));
cache->rename(prefix, cloudKey, newCloudKey, size - MetadataFile::getLengthFromKey(cloudKey));
replicator->remove(oldCachePath);
}

View File

@ -157,12 +157,12 @@ int main(int argc, char **argv)
}
char prefix[8192];
makePathPrefix(prefix, 8192);
int prefixlen = makePathPrefix(prefix, 8192);
if (SMOnline())
lsOnline(strncat(prefix, argv[1], 8192));
lsOnline(strncat(prefix, argv[1], 8192 - prefixlen));
else
lsOffline(strncat(prefix, argv[1], 8192));
lsOffline(strncat(prefix, argv[1], 8192 - prefixlen));
return 0;
}

View File

@ -172,9 +172,9 @@ int main(int argc, char **argv)
int prefixlen = makePathPrefix(prefix, 8192);
if (SMOnline())
putOnline(strncat(prefix, argv[1], 8192), prefixlen);
putOnline(strncat(prefix, argv[1], 8192 - prefixlen), prefixlen);
else
putOffline(strncat(prefix, argv[1], 8192), prefixlen);
putOffline(strncat(prefix, argv[1], 8192 - prefixlen), prefixlen);
return 0;
}