You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-08-07 03:22:57 +03:00
MCOL-3426: Fix writes that start past current EOF.
This commit is contained in:
@@ -402,15 +402,75 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
|
|||||||
// there is no overlapping data, or data goes beyond end of last object
|
// there is no overlapping data, or data goes beyond end of last object
|
||||||
while (dataRemaining > 0)
|
while (dataRemaining > 0)
|
||||||
{
|
{
|
||||||
off_t newObjectOffset = metadata.getMetadataNewObjectOffset();
|
off_t currentEndofData = metadata.getMetadataNewObjectOffset();
|
||||||
|
|
||||||
// count is 0 so first write is beyond current end of file.
|
// count is 0 so first write is beyond current end of file.
|
||||||
// offset is > than newObject.offset so we need to adjust offset for object
|
// offset is > than newObject.offset so we need to adjust offset for object
|
||||||
// unless offset is beyond newObject.offset + objectSize then we need to write null data to this object
|
// unless offset is beyond newObject.offset + objectSize then we need to write null data to this object
|
||||||
if (count == 0 && offset > newObjectOffset)
|
if (count == 0 && offset > currentEndofData)
|
||||||
{
|
{
|
||||||
|
// First lets fill the last object with null data
|
||||||
|
objects = metadata.metadataRead(currentEndofData,1);
|
||||||
|
if (objects.size() == 1)
|
||||||
|
{
|
||||||
|
// last objeect needs data
|
||||||
|
metadataObject lastObject = objects[0];
|
||||||
|
uint64_t nullJournalSize = (objectSize - lastObject.length);
|
||||||
|
uint8_t nullData[nullJournalSize];
|
||||||
|
memset(nullData,0,nullJournalSize);
|
||||||
|
err = replicator->addJournalEntry((firstDir/lastObject.key),nullData,lastObject.length,nullJournalSize);
|
||||||
|
if (err < 0)
|
||||||
|
{
|
||||||
|
l_errno = errno;
|
||||||
|
replicator->updateMetadata(metadata);
|
||||||
|
//log error and abort
|
||||||
|
logger->log(LOG_ERR,"IOCoordinator::write(): Failed addJournalEntry -- NullData. Journal file likely has partially written data and incorrect metadata.");
|
||||||
|
errno = l_errno;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if ((uint)err < nullJournalSize)
|
||||||
|
{
|
||||||
|
dataRemaining -= err;
|
||||||
|
count += err;
|
||||||
|
iocBytesWritten += err;
|
||||||
|
if ((err + lastObject.length) > lastObject.length)
|
||||||
|
metadata.updateEntryLength(lastObject.offset, (err + lastObject.length));
|
||||||
|
cache->newJournalEntry(firstDir, err+JOURNAL_ENTRY_HEADER_SIZE);
|
||||||
|
synchronizer->newJournalEntry(firstDir, lastObject.key, err+JOURNAL_ENTRY_HEADER_SIZE);
|
||||||
|
replicator->updateMetadata(metadata);
|
||||||
|
//logger->log(LOG_ERR,"IOCoordinator::write(): addJournalEntry incomplete write, %u of %u bytes written.",count,length);
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
metadata.updateEntryLength(lastObject.offset, (nullJournalSize + lastObject.length));
|
||||||
|
cache->newJournalEntry(firstDir, nullJournalSize+JOURNAL_ENTRY_HEADER_SIZE);
|
||||||
|
synchronizer->newJournalEntry(firstDir, lastObject.key, nullJournalSize+JOURNAL_ENTRY_HEADER_SIZE);
|
||||||
|
currentEndofData += nullJournalSize;
|
||||||
|
iocBytesWritten += nullJournalSize + JOURNAL_ENTRY_HEADER_SIZE;
|
||||||
|
}
|
||||||
|
// dd we need to do some full null data objects
|
||||||
|
while ((currentEndofData + (off_t)objectSize) <= offset)
|
||||||
|
{
|
||||||
|
metadataObject nullObject = metadata.addMetadataObject(filename,objectSize);
|
||||||
|
err = replicator->newNullObject((firstDir/nullObject.key),objectSize);
|
||||||
|
if (err < 0)
|
||||||
|
{
|
||||||
|
//log error and abort
|
||||||
|
l_errno = errno;
|
||||||
|
logger->log(LOG_ERR,"IOCoordinator::write(): Failed newNullObject.");
|
||||||
|
errno = l_errno;
|
||||||
|
if (count == 0) // if no data has been written yet, it's safe to return -1 here.
|
||||||
|
return -1;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
cache->newObject(firstDir, nullObject.key,objectSize);
|
||||||
|
newObjectKeys.push_back(nullObject.key);
|
||||||
|
iocBytesWritten += objectSize;
|
||||||
|
currentEndofData += objectSize;
|
||||||
|
}
|
||||||
|
|
||||||
//this is starting beyond last object in metadata
|
//this is starting beyond last object in metadata
|
||||||
//figure out if the offset is in this object
|
//figure out if the offset is in this object
|
||||||
objectOffset = offset - newObjectOffset;
|
objectOffset = offset - currentEndofData;
|
||||||
writeLength = min((objectSize - objectOffset),dataRemaining);
|
writeLength = min((objectSize - objectOffset),dataRemaining);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -423,9 +483,7 @@ ssize_t IOCoordinator::_write(const boost::filesystem::path &filename, const uin
|
|||||||
// objectOffset is 0 unless the write starts beyond the end of data
|
// objectOffset is 0 unless the write starts beyond the end of data
|
||||||
// in that case need to add the null data to cachespace
|
// in that case need to add the null data to cachespace
|
||||||
//cache->makeSpace(writeLength + objectOffset);
|
//cache->makeSpace(writeLength + objectOffset);
|
||||||
|
|
||||||
metadataObject newObject = metadata.addMetadataObject(filename,(writeLength + objectOffset));
|
metadataObject newObject = metadata.addMetadataObject(filename,(writeLength + objectOffset));
|
||||||
|
|
||||||
// send to replicator
|
// send to replicator
|
||||||
err = replicator->newObject((firstDir/newObject.key),&data[count],objectOffset,writeLength);
|
err = replicator->newObject((firstDir/newObject.key),&data[count],objectOffset,writeLength);
|
||||||
//assert((uint) err == writeLength);
|
//assert((uint) err == writeLength);
|
||||||
|
@@ -295,7 +295,7 @@ metadataObject MetadataFile::addMetadataObject(const boost::filesystem::path &fi
|
|||||||
if (!objects.empty())
|
if (!objects.empty())
|
||||||
{
|
{
|
||||||
auto &lastObject = objects.back().second;
|
auto &lastObject = objects.back().second;
|
||||||
addObject.offset = lastObject.get<off_t>("offset") + lastObject.get<size_t>("length");
|
addObject.offset = lastObject.get<off_t>("offset") + mpConfig->mObjectSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
addObject.length = length;
|
addObject.length = length;
|
||||||
|
@@ -143,6 +143,17 @@ int Replicator::newObject(const boost::filesystem::path &filename, const uint8_t
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int Replicator::newNullObject(const boost::filesystem::path &filename,size_t length )
|
||||||
|
{
|
||||||
|
int fd, err;
|
||||||
|
string objectFilename = msCachePath + "/" + filename.string();
|
||||||
|
|
||||||
|
OPEN(objectFilename.c_str(), O_WRONLY | O_CREAT);
|
||||||
|
err = ftruncate(fd,length);
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
int Replicator::addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length)
|
int Replicator::addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length)
|
||||||
{
|
{
|
||||||
int fd, err;
|
int fd, err;
|
||||||
|
@@ -48,6 +48,8 @@ class Replicator
|
|||||||
|
|
||||||
int addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length);
|
int addJournalEntry(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length);
|
||||||
int newObject(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length);
|
int newObject(const boost::filesystem::path &filename, const uint8_t *data, off_t offset, size_t length);
|
||||||
|
int newNullObject(const boost::filesystem::path &filename,size_t length );
|
||||||
|
|
||||||
int remove(const boost::filesystem::path &file, Flags flags = NONE);
|
int remove(const boost::filesystem::path &file, Flags flags = NONE);
|
||||||
|
|
||||||
int updateMetadata(MetadataFile &meta);
|
int updateMetadata(MetadataFile &meta);
|
||||||
|
@@ -1316,7 +1316,9 @@ void IOCReadTest1()
|
|||||||
string metaFilename = (metaPath/metaTestFile).string() + ".meta";
|
string metaFilename = (metaPath/metaTestFile).string() + ".meta";
|
||||||
|
|
||||||
boost::scoped_array<uint8_t> data(new uint8_t[1<<20]);
|
boost::scoped_array<uint8_t> data(new uint8_t[1<<20]);
|
||||||
|
|
||||||
memset(data.get(), 0, 1<<20);
|
memset(data.get(), 0, 1<<20);
|
||||||
|
|
||||||
int err;
|
int err;
|
||||||
err = ioc->read(testFile, data.get(), 0, 1<<20);
|
err = ioc->read(testFile, data.get(), 0, 1<<20);
|
||||||
assert(err < 0);
|
assert(err < 0);
|
||||||
@@ -1348,6 +1350,9 @@ void IOCReadTest1()
|
|||||||
for (; i < (1<<20)/4; i++)
|
for (; i < (1<<20)/4; i++)
|
||||||
assert(data32[i] == 0);
|
assert(data32[i] == 0);
|
||||||
|
|
||||||
|
err = ioc->read(testFile, data.get(), 9000, 4000);
|
||||||
|
assert(err==0);
|
||||||
|
|
||||||
cache->reset();
|
cache->reset();
|
||||||
err = ioc->unlink(testFile);
|
err = ioc->unlink(testFile);
|
||||||
assert(err>= 0);
|
assert(err>= 0);
|
||||||
@@ -1628,8 +1633,14 @@ int main(int argc, char* argv[])
|
|||||||
//Case 3 write spans 2 journal objects
|
//Case 3 write spans 2 journal objects
|
||||||
metadataJournalTest((8*sizeKB),(4*sizeKB));
|
metadataJournalTest((8*sizeKB),(4*sizeKB));
|
||||||
//Case 4 write starts object1 ends object3
|
//Case 4 write starts object1 ends object3
|
||||||
metadataJournalTest((10*sizeKB),(7*sizeKB));
|
metadataJournalTest((16*sizeKB),(7*sizeKB));
|
||||||
//Case 5 write starts in new object at offset >0
|
//Case 5 write starts in new object at 0 offset after null objects
|
||||||
|
metadataJournalTest((8*sizeKB),(32*sizeKB));
|
||||||
|
// overwrite null objects
|
||||||
|
metadataJournalTest((10*sizeKB),(40*sizeKB));
|
||||||
|
metadataJournalTest((8*sizeKB),(24*sizeKB));
|
||||||
|
// overwrite whole file and create new objects
|
||||||
|
metadataJournalTest((96*sizeKB),(0));
|
||||||
|
|
||||||
//append test
|
//append test
|
||||||
// first one appends file to end of 8K object
|
// first one appends file to end of 8K object
|
||||||
@@ -1668,8 +1679,9 @@ int main(int argc, char* argv[])
|
|||||||
metadataJournalTestCleanup();
|
metadataJournalTestCleanup();
|
||||||
|
|
||||||
(Cache::get())->shutdown();
|
(Cache::get())->shutdown();
|
||||||
delete (IOCoordinator::get());
|
delete (Synchronizer::get());
|
||||||
delete (Cache::get());
|
delete (Cache::get());
|
||||||
|
delete (IOCoordinator::get());
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user