1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

Another incremental commit.

This commit is contained in:
Patrick LeBlanc
2019-07-12 10:33:51 -05:00
parent 16dc887f20
commit 9ced374c81
4 changed files with 56 additions and 254 deletions

View File

@ -65,7 +65,7 @@ namespace BRM
{
SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
slave(s), currentSaveFD(-1), currentSaveFile(NULL), journalh(NULL)
slave(s), currentSaveFile(NULL), journalh(NULL)
#ifdef _MSC_VER
, fPids(0), fMaxPids(64)
#endif
@ -139,17 +139,9 @@ SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
journalName = savefile + "_journal";
const char* filename = journalName.c_str();
if (true || IDBPolicy::useHdfs())
{
journalh = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0);
}
else
{
journal.open(filename, ios_base::binary | ios_base::out | ios_base::app);
}
if ((journal.is_open() == false) && (journalh == NULL))
journalh = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0);
if (journalh == NULL)
throw runtime_error("Could not open the BRM journal for writing!");
}
else
@ -177,7 +169,7 @@ SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
}
SlaveComm::SlaveComm()
: currentSaveFD(-1), currentSaveFile(NULL), journalh(NULL)
: currentSaveFile(NULL), journalh(NULL)
#ifdef _MSC_VER
, fPids(0), fMaxPids(64)
#endif
@ -219,7 +211,6 @@ SlaveComm::~SlaveComm()
if (firstSlave)
{
close(currentSaveFD);
delete currentSaveFile;
currentSaveFile = NULL;
}
@ -2004,19 +1995,13 @@ void SlaveComm::do_confirm()
if (firstSlave && (takeSnapshot ||
(journalCount >= snapshotInterval && snapshotInterval >= 0)))
{
const char* filename = tmp.c_str();
if ((false && !IDBPolicy::useHdfs()) && currentSaveFD < 0)
{
currentSaveFD = open(filename, O_WRONLY | O_CREAT, 0664);
}
else if ((true || IDBPolicy::useHdfs()) && !currentSaveFile)
if (!currentSaveFile)
{
currentSaveFile = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "wb", 0);
IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0);
}
if (currentSaveFD < 0 && currentSaveFile == NULL)
if (currentSaveFile == NULL)
{
ostringstream os;
os << "WorkerComm: failed to open the current savefile. errno: "
@ -2032,72 +2017,33 @@ void SlaveComm::do_confirm()
#endif
int err = 0;
if (currentSaveFile)
// MCOL-1558. Make the _current file relative to DBRMRoot.
string relative = tmp.substr(tmp.find_last_of('/') + 1);
err = currentSaveFile->write(relative.c_str(), relative.length());
if (err < (int) relative.length())
{
// MCOL-1558. Make the _current file relative to DBRMRoot.
string relative = tmp.substr(tmp.find_last_of('/') + 1);
err = currentSaveFile->write(relative.c_str(), relative.length());
ostringstream os;
os << "WorkerComm: currentfile write() returned " << err
<< " file pointer is " << currentSaveFile;
if (err < (int) relative.length())
{
ostringstream os;
os << "WorkerComm: currentfile write() returned " << err
<< " file pointer is " << currentSaveFile;
if (err < 0)
os << " errno: " << strerror(errno);
if (err < 0)
os << " errno: " << strerror(errno);
log(os.str());
}
currentSaveFile->flush();
delete currentSaveFile;
currentSaveFile = NULL;
saveFileToggle = !saveFileToggle;
const char* filename = journalName.c_str();
delete journalh;
journalh = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0);
if (!journalh)
throw runtime_error("Could not open the BRM journal for writing!");
log(os.str());
}
else
{
lseek(currentSaveFD, 0, SEEK_SET);
// MCOL-1558. Make the _current file relative to DBRMRoot.
string relative = tmp.substr(tmp.find_last_of('/') + 1);
err = write(currentSaveFD, relative.c_str(), relative.length());
if (err < (int) relative.length())
{
ostringstream os;
os << "WorkerComm: currentfile write() returned " << err
<< " fd is " << currentSaveFD;
currentSaveFile->flush();
delete currentSaveFile;
currentSaveFile = NULL;
saveFileToggle = !saveFileToggle;
if (err < 0)
os << " errno: " << strerror(errno);
delete journalh;
journalh = IDBDataFile::open(
IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG), journalName.c_str(), "w+b", 0);
log(os.str());
}
#ifdef _MSC_VER
//FIXME: Do we need to account for Windows EOL conversions?
_chsize_s(currentSaveFD, tmp.length());
_commit(currentSaveFD);
#else
err = ftruncate(currentSaveFD, relative.length());
fsync(currentSaveFD);
#endif
saveFileToggle = !saveFileToggle;
/* Is there a nicer way to truncate the file using an ofstream? */
journal.close();
uint32_t utmp = ::umask(0);
journal.open(journalName.c_str(), ios_base::binary | ios_base::out | ios_base::trunc);
::umask(utmp);
}
if (!journalh)
throw runtime_error("Could not open the BRM journal for writing!");
takeSnapshot = false;
doSaveDelta = false;
@ -2250,70 +2196,28 @@ int SlaveComm::replayJournal(string prefix)
const char* filename = journalName.c_str();
if (true || IDBPolicy::useHdfs())
IDBDataFile* journalf = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);
if (!journalf)
{
IDBDataFile* journalf = IDBDataFile::open(
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);
if (!journalf)
{
cout << "Error opening journal file " << fName << endl;
return -1;
}
if (journalf->size() == 0) // empty file, nothing to replay
return 0;
ssize_t readIn = 0;
do
{
readIn = journalf->read((char*) &len, sizeof(len));
if (readIn > 0)
{
cmd.needAtLeast(len);
readIn = journalf->read((char*) cmd.getInputPtr(), len);
cmd.advanceInputPtr(len);
try
{
processCommand(cmd);
}
catch (exception& e)
{
cout << e.what() << " Journal replay was incomplete." << endl;
slave->undoChanges();
return -1;
}
slave->confirmChanges();
cmd.restart();
ret++;
}
}
while (readIn > 0);
cout << "Error opening journal file " << fName << endl;
return -1;
}
else
if (journalf->size() == 0) // empty file, nothing to replay
return 0;
ssize_t readIn = 0;
do
{
ifstream journalf;
journalf.open(filename, ios_base::in | ios_base::binary);
readIn = journalf->read((char*) &len, sizeof(len));
if (!journalf.is_open())
if (readIn > 0)
{
cout << "Error opening journal file " << fName << endl;
return -1;
}
while (journalf)
{
journalf.read((char*) &len, sizeof(len));
if (!journalf)
break;
cmd.needAtLeast(len);
journalf.read((char*) cmd.getInputPtr(), len);
readIn = journalf->read((char*) cmd.getInputPtr(), len);
cmd.advanceInputPtr(len);
try
@ -2331,7 +2235,7 @@ int SlaveComm::replayJournal(string prefix)
cmd.restart();
ret++;
}
}
} while (readIn > 0);
return ret;
}
@ -2359,21 +2263,10 @@ void SlaveComm::saveDelta()
try
{
uint32_t len = delta.length();
if (true || IDBPolicy::useHdfs())
{
journalh->write((const char*) &len, sizeof(len));
journalh->write((const char*) delta.buf(), delta.length());
journalh->flush();
}
else
{
journal.seekg(0, ios_base::end);
journal.write((const char*) &len, sizeof(len));
journal.write((const char*) delta.buf(), delta.length());
journal.sync();
}
journalh->write((const char*) &len, sizeof(len));
journalh->write((const char*) delta.buf(), delta.length());
journalh->flush();
journalCount++;
}
catch (exception& e)