You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
Changed the mergeJournal* usage under the assumption that the
metadata contains the merged length, not the object's unmerged length.
This commit is contained in:
@@ -110,7 +110,7 @@ int IOCoordinator::loadObjectAndJournal(const char *objFilename, const char *jou
|
||||
{
|
||||
boost::shared_array<uint8_t> argh;
|
||||
|
||||
argh = mergeJournal(objFilename, journalFilename, offset, &length);
|
||||
argh = mergeJournal(objFilename, journalFilename, offset, length);
|
||||
if (!argh)
|
||||
return -1;
|
||||
else
|
||||
@@ -874,7 +874,7 @@ int IOCoordinator::mergeJournal(int objFD, int journalFD, uint8_t *buf, off_t of
|
||||
}
|
||||
|
||||
boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, const char *journal, off_t offset,
|
||||
size_t *len) const
|
||||
size_t len) const
|
||||
{
|
||||
int objFD, journalFD;
|
||||
boost::shared_array<uint8_t> ret;
|
||||
@@ -896,24 +896,28 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
|
||||
boost::property_tree::ptree header;
|
||||
boost::property_tree::json_parser::read_json(ss, header);
|
||||
assert(header.get<int>("version") == 1);
|
||||
size_t maxJournalOffset = header.get<size_t>("max_offset");
|
||||
//size_t maxJournalOffset = header.get<size_t>("max_offset");
|
||||
|
||||
#if 0
|
||||
struct stat objStat;
|
||||
fstat(objFD, &objStat);
|
||||
|
||||
if (*len == 0)
|
||||
if (len == 0)
|
||||
// read to the end of the file
|
||||
*len = max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset;
|
||||
len = max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset;
|
||||
else
|
||||
// make sure len is within the bounds of the data
|
||||
*len = min(*len, (max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset));
|
||||
len = min(*len, (max(maxJournalOffset + 1, (size_t) objStat.st_size) - offset));
|
||||
ret.reset(new uint8_t[*len]);
|
||||
#endif
|
||||
|
||||
ret.reset(new uint8_t[len]);
|
||||
|
||||
// read the object into memory
|
||||
size_t count = 0;
|
||||
::lseek(objFD, offset, SEEK_SET);
|
||||
while (count < *len) {
|
||||
int err = ::read(objFD, &ret[count], *len - count);
|
||||
while (count < len) {
|
||||
int err = ::read(objFD, &ret[count], len - count);
|
||||
if (err < 0)
|
||||
{
|
||||
char buf[80];
|
||||
@@ -927,7 +931,7 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
|
||||
{
|
||||
// at the EOF of the object. The journal may contain entries that append to the data,
|
||||
// so 0-fill the remaining bytes.
|
||||
memset(&ret[count], 0, *len-count);
|
||||
memset(&ret[count], 0, len-count);
|
||||
break;
|
||||
}
|
||||
count += err;
|
||||
@@ -943,7 +947,7 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
|
||||
|
||||
// if this entry overlaps, read the overlapping section
|
||||
uint64_t lastJournalOffset = offlen[0] + offlen[1];
|
||||
uint64_t lastBufOffset = offset + *len;
|
||||
uint64_t lastBufOffset = offset + len;
|
||||
if (offlen[0] <= lastBufOffset && lastJournalOffset >= (uint64_t) offset)
|
||||
{
|
||||
uint64_t startReadingAt = max(offlen[0], (uint64_t) offset);
|
||||
@@ -984,7 +988,7 @@ boost::shared_array<uint8_t> IOCoordinator::mergeJournal(const char *object, con
|
||||
}
|
||||
|
||||
// MergeJournalInMem is a specialized version of mergeJournal(). TODO: refactor if possible.
|
||||
int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t *len, const char *journalPath) const
|
||||
int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t len, const char *journalPath) const
|
||||
{
|
||||
int journalFD = ::open(journalPath, O_RDONLY);
|
||||
if (journalFD < 0)
|
||||
@@ -998,8 +1002,9 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
||||
boost::property_tree::ptree header;
|
||||
boost::property_tree::json_parser::read_json(ss, header);
|
||||
assert(header.get<int>("version") == 1);
|
||||
size_t maxJournalOffset = header.get<size_t>("max_offset");
|
||||
//size_t maxJournalOffset = header.get<size_t>("max_offset");
|
||||
|
||||
#if 0
|
||||
if (maxJournalOffset > *len)
|
||||
{
|
||||
uint8_t *newbuf = new uint8_t[maxJournalOffset + 1];
|
||||
@@ -1008,6 +1013,7 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
||||
objData.reset(newbuf);
|
||||
*len = maxJournalOffset + 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
// start processing the entries
|
||||
while (1)
|
||||
@@ -1018,7 +1024,11 @@ int IOCoordinator::mergeJournalInMem(boost::shared_array<uint8_t> &objData, size
|
||||
break;
|
||||
|
||||
uint64_t startReadingAt = offlen[0];
|
||||
uint64_t lengthOfRead = offlen[1];
|
||||
uint64_t lengthOfRead;
|
||||
if (offlen[0] + offlen[1] > len)
|
||||
lengthOfRead = len - offlen[0];
|
||||
else
|
||||
lengthOfRead = offlen[1];
|
||||
|
||||
uint count = 0;
|
||||
while (count < lengthOfRead)
|
||||
|
||||
@@ -42,12 +42,11 @@ class IOCoordinator : public boost::noncopyable
|
||||
int copyFile(const char *filename1, const char *filename2);
|
||||
|
||||
// The shared logic for merging a journal file with its base file.
|
||||
// *len should be set to the length of the data requested (0 means read the whole file),
|
||||
// on return *len will be the actual length returned.
|
||||
boost::shared_array<uint8_t> mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t *len) const;
|
||||
// len should be set to the length of the data requested
|
||||
boost::shared_array<uint8_t> mergeJournal(const char *objectPath, const char *journalPath, off_t offset, size_t len) const;
|
||||
|
||||
// this version modifies object data in memory, given the journal filename
|
||||
int mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t *len, const char *journalPath) const;
|
||||
int mergeJournalInMem(boost::shared_array<uint8_t> &objData, size_t len, const char *journalPath) const;
|
||||
|
||||
// this version takes already-open file descriptors, and an already-allocated buffer as input.
|
||||
// file descriptor are positioned, eh, best not to assume anything about their positions
|
||||
|
||||
@@ -301,6 +301,11 @@ int MetadataFile::writeMetadata(const char *filename)
|
||||
return error;
|
||||
}
|
||||
|
||||
const metadataObject & MetadataFile::getEntry(off_t offset)
|
||||
{
|
||||
return *(mObjects.find(offset));
|
||||
}
|
||||
|
||||
void MetadataFile::removeEntry(off_t offset)
|
||||
{
|
||||
mObjects.erase(offset);
|
||||
|
||||
@@ -54,6 +54,7 @@ class MetadataFile
|
||||
void updateEntry(off_t offset, const std::string &newName, size_t newLength);
|
||||
void updateEntryLength(off_t offset, size_t newLength);
|
||||
metadataObject addMetadataObject(const char *filename, size_t length);
|
||||
const metadataObject &getEntry(off_t offset);
|
||||
void removeEntry(off_t offset);
|
||||
|
||||
// TBD: this may have to go; there may be no use case where only the uuid needs to change.
|
||||
|
||||
@@ -325,6 +325,14 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
ScopedWriteLock s(ioc, sourceFile);
|
||||
|
||||
string &key = *lit;
|
||||
MetadataFile md(sourceFile.c_str(), MetadataFile::no_create_t());
|
||||
if (!md.exists())
|
||||
{
|
||||
logger->log(LOG_DEBUG, "synchronizeWithJournal(): no metadata found for %s", sourceFile.c_str());
|
||||
return;
|
||||
}
|
||||
const metadataObject &mdEntry = md.getEntry(MetadataFile::getOffsetFromKey(key));
|
||||
|
||||
bf::path oldCachePath = cachePath / key;
|
||||
string journalName = (journalPath/ (key + ".journal")).string();
|
||||
|
||||
@@ -338,7 +346,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
|
||||
int err;
|
||||
boost::shared_array<uint8_t> data;
|
||||
size_t count = 0, size = 0;
|
||||
size_t count = 0, size = mdEntry.length;
|
||||
char buf[80];
|
||||
bool oldObjIsCached = cache->exists(key);
|
||||
|
||||
@@ -356,7 +364,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
}
|
||||
throw runtime_error(string("Synchronizer: getObject() failed: ") + strerror_r(errno, buf, 80));
|
||||
}
|
||||
err = ioc->mergeJournalInMem(data, &size, journalName.c_str());
|
||||
err = ioc->mergeJournalInMem(data, size, journalName.c_str());
|
||||
if (err)
|
||||
{
|
||||
if (!bf::exists(journalName))
|
||||
@@ -369,7 +377,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
}
|
||||
else
|
||||
{
|
||||
data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, &size);
|
||||
data = ioc->mergeJournal(oldCachePath.string().c_str(), journalName.c_str(), 0, size);
|
||||
if (!data)
|
||||
{
|
||||
if (!bf::exists(journalName))
|
||||
@@ -424,7 +432,7 @@ void Synchronizer::synchronizeWithJournal(const string &sourceFile, list<string>
|
||||
}
|
||||
|
||||
// update the metadata for the source file
|
||||
MetadataFile md(sourceFile.c_str());
|
||||
|
||||
md.updateEntry(MetadataFile::getOffsetFromKey(key), newKey, size);
|
||||
replicator->updateMetadata(sourceFile.c_str(), md);
|
||||
|
||||
|
||||
@@ -932,8 +932,8 @@ bool mergeJournalTest()
|
||||
|
||||
int i;
|
||||
IOCoordinator *ioc = IOCoordinator::get();
|
||||
size_t len = 0;
|
||||
boost::shared_array<uint8_t> data = ioc->mergeJournal("test-object", "test-journal", 0, &len);
|
||||
size_t len = 8192;
|
||||
boost::shared_array<uint8_t> data = ioc->mergeJournal("test-object", "test-journal", 0, len);
|
||||
assert(data);
|
||||
int *idata = (int *) data.get();
|
||||
for (i = 0; i < 5; i++)
|
||||
@@ -946,7 +946,7 @@ bool mergeJournalTest()
|
||||
// try different range parameters
|
||||
// read at the beginning of the change
|
||||
len = 40;
|
||||
data = ioc->mergeJournal("test-object", "test-journal", 20, &len);
|
||||
data = ioc->mergeJournal("test-object", "test-journal", 20, len);
|
||||
assert(data);
|
||||
idata = (int *) data.get();
|
||||
for (i = 0; i < 5; i++)
|
||||
@@ -956,7 +956,7 @@ bool mergeJournalTest()
|
||||
|
||||
// read s.t. beginning of the change is in the middle of the range
|
||||
len = 24;
|
||||
data = ioc->mergeJournal("test-object", "test-journal", 8, &len);
|
||||
data = ioc->mergeJournal("test-object", "test-journal", 8, len);
|
||||
assert(data);
|
||||
idata = (int *) data.get();
|
||||
for (i = 0; i < 3; i++)
|
||||
@@ -966,7 +966,7 @@ bool mergeJournalTest()
|
||||
|
||||
// read s.t. end of the change is in the middle of the range
|
||||
len = 20;
|
||||
data = ioc->mergeJournal("test-object", "test-journal", 28, &len);
|
||||
data = ioc->mergeJournal("test-object", "test-journal", 28, len);
|
||||
assert(data);
|
||||
idata = (int *) data.get();
|
||||
for (i = 0; i < 3; i++)
|
||||
|
||||
Reference in New Issue
Block a user