You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-12-13 23:02:14 +03:00
fixes for write and finish append. add append unit_test cases.
This commit is contained in:
@@ -247,7 +247,7 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
uint64_t count = 0;
|
||||
uint64_t writeLength = 0;
|
||||
uint64_t dataRemaining = length;
|
||||
uint64_t journalOffset = 0;
|
||||
uint64_t objectOffset = 0;
|
||||
vector<metadataObject> objects;
|
||||
vector<string> newObjectKeys;
|
||||
Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency...
|
||||
@@ -269,86 +269,77 @@ int IOCoordinator::write(const char *filename, const uint8_t *data, off_t offset
|
||||
{
|
||||
// first object in the list so start at offset and
|
||||
// write to end of oject or all the data
|
||||
journalOffset = offset - i->offset;
|
||||
writeLength = min((objectSize - journalOffset),dataRemaining);
|
||||
objectOffset = offset - i->offset;
|
||||
writeLength = min((objectSize - objectOffset),dataRemaining);
|
||||
}
|
||||
else
|
||||
{
|
||||
// starting at beginning of next object write the rest of data
|
||||
// or until object length is reached
|
||||
writeLength = min(objectSize,dataRemaining);
|
||||
journalOffset = 0;
|
||||
objectOffset = 0;
|
||||
}
|
||||
cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
|
||||
|
||||
err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength);
|
||||
err = replicator->addJournalEntry(i->key.c_str(),&data[count],objectOffset,writeLength);
|
||||
|
||||
if (err <= 0)
|
||||
{
|
||||
if ((count + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (count + journalOffset));
|
||||
if ((count + objectOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (count + objectOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): object failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
}
|
||||
if ((writeLength + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (writeLength + journalOffset));
|
||||
if ((writeLength + objectOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (writeLength + objectOffset));
|
||||
|
||||
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
|
||||
|
||||
synchronizer->newJournalEntry(i->key);
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
count += writeLength;
|
||||
dataRemaining -= writeLength;
|
||||
}
|
||||
}
|
||||
// there is no overlapping data, or data goes beyond end of last object
|
||||
while (dataRemaining > 0 && err >= 0)
|
||||
while (dataRemaining > 0)
|
||||
{
|
||||
//add a new metaDataObject
|
||||
if (count == 0)
|
||||
cache->makeSpace(dataRemaining);
|
||||
metadataObject newObject = metadata.addMetadataObject(filename,0);
|
||||
if (count == 0 && (uint64_t) offset > newObject.offset)
|
||||
{
|
||||
//this is starting beyond last object in metadata
|
||||
//figure out if the offset is in this object
|
||||
if ((uint64_t) offset < objectSize)
|
||||
{
|
||||
journalOffset = offset;
|
||||
writeLength = min((objectSize - journalOffset),dataRemaining);
|
||||
}
|
||||
else
|
||||
{
|
||||
//we need to create an object that is only padding
|
||||
}
|
||||
objectOffset = offset - newObject.offset;
|
||||
writeLength = min((objectSize - objectOffset),dataRemaining);
|
||||
}
|
||||
else
|
||||
{
|
||||
// count != 0 we've already started writing and are going to new object
|
||||
// start at beginning of the new object
|
||||
writeLength = min(objectSize,dataRemaining);
|
||||
journalOffset = 0;
|
||||
objectOffset = 0;
|
||||
}
|
||||
cache->makeSpace(journalOffset+writeLength);
|
||||
// add a new metadata object, this will get a new objectKey
|
||||
metadataObject newObject = metadata.addMetadataObject(filename,writeLength);
|
||||
// write the new object
|
||||
err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength);
|
||||
if ((writeLength + objectOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (writeLength + objectOffset));
|
||||
// send to replicator
|
||||
err = replicator->newObject(newObject.key.c_str(),data,objectOffset,writeLength);
|
||||
if (err <= 0)
|
||||
{
|
||||
// update metadataObject length to reflect what awas actually written
|
||||
if ((count + journalOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (count + journalOffset));
|
||||
if ((count + objectOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (count + objectOffset));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
//log error and abort
|
||||
}
|
||||
if ((writeLength + journalOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (writeLength + journalOffset));
|
||||
|
||||
cache->newObject(newObject.key,(writeLength + journalOffset));
|
||||
cache->newObject(newObject.key,(writeLength + objectOffset));
|
||||
newObjectKeys.push_back(newObject.key);
|
||||
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
count += writeLength;
|
||||
dataRemaining -= writeLength;
|
||||
}
|
||||
|
||||
synchronizer->newObjects(newObjectKeys);
|
||||
@@ -368,15 +359,16 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng
|
||||
size_t count = 0;
|
||||
uint64_t writeLength = 0;
|
||||
uint64_t dataRemaining = length;
|
||||
uint64_t journalOffset = 0;
|
||||
vector<metadataObject> objects;
|
||||
|
||||
//writeLock(filename);
|
||||
vector<string> newObjectKeys;
|
||||
Synchronizer *synchronizer = Synchronizer::get(); // need to init sync here to break circular dependency...
|
||||
|
||||
ScopedWriteLock lock(this, filename);
|
||||
|
||||
MetadataFile metadata = MetadataFile(filename);
|
||||
|
||||
uint64_t offset = metadata.getLength();
|
||||
|
||||
|
||||
//read metadata determine if this fits in the last object
|
||||
objects = metadata.metadataRead(offset,length);
|
||||
|
||||
@@ -384,65 +376,71 @@ int IOCoordinator::append(const char *filename, const uint8_t *data, size_t leng
|
||||
{
|
||||
std::vector<metadataObject>::const_iterator i = objects.begin();
|
||||
|
||||
if ((objectSize - i->length) > 0) // if this is zero then we can't put anything else in this object
|
||||
{
|
||||
// figure out how much data to write to this object
|
||||
if (offset > i->offset)
|
||||
{
|
||||
journalOffset = offset - i->offset;
|
||||
writeLength = min((objectSize - journalOffset),dataRemaining);
|
||||
}
|
||||
err = replicator->addJournalEntry(i->key.c_str(),&data[count],journalOffset,writeLength);
|
||||
writeLength = min((objectSize - i->length),dataRemaining);
|
||||
|
||||
cache->makeSpace(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
|
||||
|
||||
err = replicator->addJournalEntry(i->key.c_str(),&data[count],i->length,writeLength);
|
||||
if (err <= 0)
|
||||
{
|
||||
if ((count + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (count + journalOffset));
|
||||
metadata.updateEntryLength(i->offset, (count + i->length));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
logger->log(LOG_ERR,"IOCoordinator::append(): journal failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
}
|
||||
if ((writeLength + journalOffset) > i->length)
|
||||
metadata.updateEntryLength(i->offset, (writeLength + journalOffset));
|
||||
metadata.updateEntryLength(i->offset, (writeLength + i->length));
|
||||
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
//cache->makeSpace(journal_data_size)
|
||||
//Synchronizer::newJournalData(journal_file);
|
||||
cache->newJournalEntry(writeLength+JOURNAL_ENTRY_HEADER_SIZE);
|
||||
|
||||
synchronizer->newJournalEntry(i->key);
|
||||
|
||||
count += writeLength;
|
||||
dataRemaining -= writeLength;
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (objects.size() > 1)
|
||||
{
|
||||
//Something went wrong this shouldn't overlap objects
|
||||
logger->log(LOG_ERR,"IOCoordinator::append(): overlapping objects found on append.",count,length);
|
||||
return -1;
|
||||
}
|
||||
// there is no overlapping data, or data goes beyond end of last object
|
||||
while (dataRemaining > 0 && err >= 0)
|
||||
// append is starting or adding to a new object
|
||||
while (dataRemaining > 0)
|
||||
{
|
||||
//add a new metaDataObject
|
||||
writeLength = min(objectSize,dataRemaining);
|
||||
if (count == 0)
|
||||
{
|
||||
//this is append and it starting beyond last object in metadata
|
||||
//figure out if the offset is in this object
|
||||
}
|
||||
//cache->makeSpace(size)
|
||||
|
||||
cache->makeSpace(writeLength);
|
||||
// add a new metadata object, this will get a new objectKey NOTE: probably needs offset too
|
||||
metadataObject newObject = metadata.addMetadataObject(filename,writeLength);
|
||||
|
||||
// write the new object
|
||||
err = replicator->newObject(newObject.key.c_str(),data,journalOffset,writeLength);
|
||||
err = replicator->newObject(newObject.key.c_str(),data,0,writeLength);
|
||||
if (err <= 0)
|
||||
{
|
||||
// update metadataObject length to reflect what awas actually written
|
||||
if ((count + journalOffset) > newObject.length)
|
||||
metadata.updateEntryLength(newObject.offset, (count + journalOffset));
|
||||
metadata.updateEntryLength(newObject.offset, (count));
|
||||
metadata.writeMetadata(filename);
|
||||
logger->log(LOG_ERR,"IOCoordinator::write(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
logger->log(LOG_ERR,"IOCoordinator::append(): newObject failed to complete write, %u of %u bytes written.",count,length);
|
||||
return count;
|
||||
//log error and abort
|
||||
}
|
||||
// sync
|
||||
//Synchronizer::newObject(newname)
|
||||
count += err;
|
||||
dataRemaining -= err;
|
||||
cache->newObject(newObject.key,writeLength);
|
||||
newObjectKeys.push_back(newObject.key);
|
||||
|
||||
count += writeLength;
|
||||
dataRemaining -= writeLength;
|
||||
}
|
||||
|
||||
synchronizer->newObjects(newObjectKeys);
|
||||
|
||||
metadata.writeMetadata(filename);
|
||||
|
||||
lock.unlock();
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
@@ -287,6 +287,43 @@ void metadataJournalTest(std::size_t size, off_t offset)
|
||||
assert(resp->returnCode == (int) size);
|
||||
}
|
||||
|
||||
void metadataJournalTest_append(std::size_t size)
|
||||
{
|
||||
// make an empty file to write to
|
||||
const char *filename = "metadataJournalTest";
|
||||
uint8_t buf[(sizeof(write_cmd)+std::strlen(filename)+size)];
|
||||
uint64_t *data;
|
||||
|
||||
sm_msg_header *hdr = (sm_msg_header *) buf;
|
||||
append_cmd *cmd = (append_cmd *) &hdr[1];
|
||||
cmd->opcode = APPEND;
|
||||
cmd->count = size;
|
||||
cmd->flen = std::strlen(filename);
|
||||
memcpy(&cmd->filename, filename, cmd->flen);
|
||||
data = (uint64_t *) &cmd->filename[cmd->flen];
|
||||
int count = 0;
|
||||
for (uint64_t i = 0; i < (size/sizeof(uint64_t)); i++)
|
||||
{
|
||||
data[i] = i;
|
||||
count++;
|
||||
}
|
||||
hdr->type = SM_MSG_START;
|
||||
hdr->payloadLen = sizeof(*cmd) + cmd->flen + cmd->count;
|
||||
AppendTask a(clientSock, hdr->payloadLen);
|
||||
int err = ::write(sessionSock, cmd, hdr->payloadLen);
|
||||
|
||||
a.run();
|
||||
|
||||
// verify response
|
||||
err = ::recv(sessionSock, buf, 1024, MSG_DONTWAIT);
|
||||
sm_response *resp = (sm_response *) buf;
|
||||
assert(err == sizeof(*resp));
|
||||
assert(resp->header.type == SM_MSG_START);
|
||||
assert(resp->header.payloadLen == 4);
|
||||
assert(resp->header.flags == 0);
|
||||
assert(resp->returnCode == (int) size);
|
||||
}
|
||||
|
||||
void metadataJournalTestCleanup(std::size_t size)
|
||||
{
|
||||
Config* config = Config::get();
|
||||
@@ -1424,8 +1461,15 @@ int main()
|
||||
//Case 4 write starts object1 ends object3
|
||||
metadataJournalTest((10*sizeKB),(7*sizeKB));
|
||||
//Case 5 write starts in new object at offset >0
|
||||
//TODO add zero padding to writes in this scenario
|
||||
//metadataJournalTest((8*sizeKB),4*sizeKB);
|
||||
|
||||
//append test
|
||||
// first one appends file to end of 8K object
|
||||
metadataJournalTest_append((7*sizeKB));
|
||||
// this apppends that starts on new object
|
||||
metadataJournalTest_append((7*sizeKB));
|
||||
// this starts in one object and crosses into new object
|
||||
metadataJournalTest_append((7*sizeKB));
|
||||
|
||||
|
||||
//writetask();
|
||||
//appendtask();
|
||||
|
||||
Reference in New Issue
Block a user