You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-27 21:01:50 +03:00
Revert "feat(BRM) MCOL-5555 Reduce a number of direct writes to BRM journal/tablelocks files" (#2972)
This reverts commit 02114b5b7c
.
This commit is contained in:
@ -1,37 +0,0 @@
|
|||||||
|
|
||||||
`Create meta version.`
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant MetaVersionNew
|
|
||||||
participant StorageManager
|
|
||||||
participant S3Bucket
|
|
||||||
participant ControllerNode
|
|
||||||
StorageManager->>ControllerNode: Send command to freeze. (similar to readonly)
|
|
||||||
StorageManager->>MetaVersionNew: Create new meta version
|
|
||||||
StorageManager->>S3Bucket: Pack and put file with meta version on S3.
|
|
||||||
StorageManager->>ControllerNode: Send command to unfreeze.
|
|
||||||
```
|
|
||||||
|
|
||||||
`Regular work flow.`
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant MetaVersion
|
|
||||||
participant StorageManager
|
|
||||||
participant S3Bucket
|
|
||||||
StorageManager->>S3Bucket: Request object from S3
|
|
||||||
S3Bucket->>StorageManager: Send object to worker node
|
|
||||||
StorageManager->>S3Bucket: Update and put new object on S3
|
|
||||||
StorageManager->>MetaVersion: Check object in meta version
|
|
||||||
StorageManager->>S3Bucket: Delete object if not in MetaVersion
|
|
||||||
```
|
|
||||||
|
|
||||||
`On rollback to version.`
|
|
||||||
```mermaid
|
|
||||||
sequenceDiagram
|
|
||||||
participant MetaVersion
|
|
||||||
participant StorageManager
|
|
||||||
participant S3Bucket
|
|
||||||
StorageManager->>S3Bucket: Request meta version.
|
|
||||||
S3Bucket->>StorageManager: Send meta version to worker node.
|
|
||||||
StorageManager->>MetaVersion: Unpack metaversion.
|
|
||||||
```
|
|
@ -444,40 +444,7 @@ void TableLockInfo::deserialize(istream& i)
|
|||||||
dbrootList.resize(dbrootListSize);
|
dbrootList.resize(dbrootListSize);
|
||||||
|
|
||||||
for (uint32_t j = 0; j < dbrootListSize; j++)
|
for (uint32_t j = 0; j < dbrootListSize; j++)
|
||||||
i.read((char*)&dbrootList[j], sizeof(uint32_t));
|
i.read((char*)&dbrootList[j], 4);
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t TableLockInfo::getInternalSize() const
|
|
||||||
{
|
|
||||||
const uint32_t nameLen = ownerName.length();
|
|
||||||
const uint32_t dbrootListSize = dbrootList.size();
|
|
||||||
return (32 + sizeof(time_t) + nameLen + (dbrootListSize * sizeof(uint32_t)));
|
|
||||||
}
|
|
||||||
|
|
||||||
void TableLockInfo::serializeElement(char* buffer, const char* src, const uint32_t size, uint32_t& offset)
|
|
||||||
{
|
|
||||||
std::memcpy(&buffer[offset], src, size);
|
|
||||||
offset += size;
|
|
||||||
}
|
|
||||||
|
|
||||||
void TableLockInfo::serialize(char* buffer, uint32_t& offset)
|
|
||||||
{
|
|
||||||
const uint32_t nameLen = ownerName.length();
|
|
||||||
const uint32_t dbrootListSize = dbrootList.size();
|
|
||||||
|
|
||||||
serializeElement(buffer, (char*)&id, 8, offset);
|
|
||||||
serializeElement(buffer, (char*)&tableOID, 4, offset);
|
|
||||||
serializeElement(buffer, (char*)&ownerPID, 4, offset);
|
|
||||||
serializeElement(buffer, (char*)&state, 4, offset);
|
|
||||||
serializeElement(buffer, (char*)&ownerSessionID, 4, offset);
|
|
||||||
serializeElement(buffer, (char*)&ownerTxnID, 4, offset);
|
|
||||||
serializeElement(buffer, (char*)&creationTime, sizeof(time_t), offset);
|
|
||||||
serializeElement(buffer, (char*)&nameLen, 2, offset);
|
|
||||||
serializeElement(buffer, (char*)ownerName.c_str(), nameLen, offset);
|
|
||||||
serializeElement(buffer, (char*)&dbrootListSize, 2, offset);
|
|
||||||
|
|
||||||
for (uint32_t j = 0; j < dbrootListSize; j++)
|
|
||||||
serializeElement(buffer, (char*)&dbrootList[j], sizeof(uint32_t), offset);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void TableLockInfo::serialize(IDBDataFile* o) const
|
void TableLockInfo::serialize(IDBDataFile* o) const
|
||||||
|
@ -254,12 +254,7 @@ struct TableLockInfo : public messageqcpp::Serializeable
|
|||||||
EXPORT void deserialize(messageqcpp::ByteStream& bs);
|
EXPORT void deserialize(messageqcpp::ByteStream& bs);
|
||||||
EXPORT void serialize(idbdatafile::IDBDataFile*) const;
|
EXPORT void serialize(idbdatafile::IDBDataFile*) const;
|
||||||
EXPORT void deserialize(idbdatafile::IDBDataFile*);
|
EXPORT void deserialize(idbdatafile::IDBDataFile*);
|
||||||
EXPORT void serialize(char* buffer, uint32_t& offset);
|
|
||||||
EXPORT uint32_t getInternalSize() const;
|
|
||||||
bool operator<(const TableLockInfo&) const;
|
bool operator<(const TableLockInfo&) const;
|
||||||
|
|
||||||
private:
|
|
||||||
void serializeElement(char* buffer, const char* src, const uint32_t size, uint32_t& offset);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A Serializeable version of InlineLBIDRange
|
/// A Serializeable version of InlineLBIDRange
|
||||||
|
@ -191,6 +191,7 @@ SlaveComm::SlaveComm()
|
|||||||
slave = std::make_unique<SlaveDBRMNode>();
|
slave = std::make_unique<SlaveDBRMNode>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void SlaveComm::stop()
|
void SlaveComm::stop()
|
||||||
{
|
{
|
||||||
die = true;
|
die = true;
|
||||||
@ -1927,8 +1928,7 @@ void SlaveComm::do_confirm()
|
|||||||
{
|
{
|
||||||
if (!currentSaveFile)
|
if (!currentSaveFile)
|
||||||
{
|
{
|
||||||
currentSaveFile.reset(
|
currentSaveFile.reset(IDBDataFile::open(IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0));
|
||||||
IDBDataFile::open(IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentSaveFile == NULL)
|
if (currentSaveFile == NULL)
|
||||||
@ -1951,8 +1951,7 @@ void SlaveComm::do_confirm()
|
|||||||
if (err < (int)relative.length())
|
if (err < (int)relative.length())
|
||||||
{
|
{
|
||||||
ostringstream os;
|
ostringstream os;
|
||||||
os << "WorkerComm: currentfile write() returned " << err << " file pointer is "
|
os << "WorkerComm: currentfile write() returned " << err << " file pointer is " << currentSaveFile.get();
|
||||||
<< currentSaveFile.get();
|
|
||||||
|
|
||||||
if (err < 0)
|
if (err < 0)
|
||||||
os << " errno: " << strerror(errno);
|
os << " errno: " << strerror(errno);
|
||||||
@ -1967,7 +1966,7 @@ void SlaveComm::do_confirm()
|
|||||||
|
|
||||||
;
|
;
|
||||||
journalh.reset(IDBDataFile::open(IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG),
|
journalh.reset(IDBDataFile::open(IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG),
|
||||||
journalName.c_str(), "w+b", 0));
|
journalName.c_str(), "w+b", 0));
|
||||||
|
|
||||||
if (!journalh)
|
if (!journalh)
|
||||||
throw runtime_error("Could not open the BRM journal for writing!");
|
throw runtime_error("Could not open the BRM journal for writing!");
|
||||||
@ -2183,16 +2182,11 @@ void SlaveComm::saveDelta()
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
const uint32_t deltaLen = delta.length();
|
uint32_t len = delta.length();
|
||||||
const uint32_t bufferSize = sizeof(deltaLen) + deltaLen;
|
|
||||||
std::unique_ptr<char[]> buffer(new char[bufferSize]);
|
|
||||||
uint32_t offset = 0;
|
|
||||||
std::memcpy(&buffer[offset], (char*)&deltaLen, sizeof(deltaLen));
|
|
||||||
offset += sizeof(deltaLen);
|
|
||||||
std::memcpy(&buffer[offset], (char*)delta.buf(), deltaLen);
|
|
||||||
|
|
||||||
journalh->seek(0, SEEK_END);
|
journalh->seek(0, SEEK_END);
|
||||||
journalh->write((const char*)buffer.get(), bufferSize);
|
journalh->write((const char*)&len, sizeof(len));
|
||||||
|
journalh->write((const char*)delta.buf(), delta.length());
|
||||||
journalh->flush();
|
journalh->flush();
|
||||||
journalCount++;
|
journalCount++;
|
||||||
}
|
}
|
||||||
@ -2332,3 +2326,4 @@ void SlaveComm::do_dmlReleaseLBIDRanges(ByteStream& msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
} // namespace BRM
|
} // namespace BRM
|
||||||
|
|
||||||
|
@ -70,21 +70,14 @@ void TableLockServer::save()
|
|||||||
if (!out)
|
if (!out)
|
||||||
throw runtime_error("TableLockServer::save(): could not open save file");
|
throw runtime_error("TableLockServer::save(): could not open save file");
|
||||||
|
|
||||||
uint32_t bufferSize = 4;
|
out->write((char*)&count, 4);
|
||||||
for (const auto& lock : locks)
|
|
||||||
bufferSize += lock.second.getInternalSize();
|
|
||||||
|
|
||||||
std::unique_ptr<char[]> buffer(new char[bufferSize]);
|
|
||||||
uint32_t offset = 0;
|
|
||||||
std::memcpy(&buffer[offset], (char*)&count, 4);
|
|
||||||
offset += 4;
|
|
||||||
|
|
||||||
for (it = locks.begin(); it != locks.end(); ++it)
|
for (it = locks.begin(); it != locks.end(); ++it)
|
||||||
{
|
{
|
||||||
if (!out)
|
if (!out)
|
||||||
throw runtime_error("TableLockServer::save(): could not write save file");
|
throw runtime_error("TableLockServer::save(): could not write save file");
|
||||||
|
|
||||||
it->second.serialize(buffer.get(), offset);
|
it->second.serialize(out.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user