You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
Merge branch 'develop' into S3-project
This commit is contained in:
@ -65,7 +65,7 @@ namespace BRM
|
||||
{
|
||||
|
||||
SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
|
||||
slave(s), currentSaveFD(-1), currentSaveFile(NULL), journalh(NULL)
|
||||
slave(s), currentSaveFile(NULL), journalh(NULL)
|
||||
#ifdef _MSC_VER
|
||||
, fPids(0), fMaxPids(64)
|
||||
#endif
|
||||
@ -138,21 +138,10 @@ SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
|
||||
firstSlave = true;
|
||||
journalName = savefile + "_journal";
|
||||
const char* filename = journalName.c_str();
|
||||
uint32_t utmp = ::umask(0);
|
||||
|
||||
if (true || IDBPolicy::useHdfs())
|
||||
{
|
||||
journalh = IDBDataFile::open(
|
||||
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "a", 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
journal.open(filename, ios_base::binary | ios_base::out | ios_base::app);
|
||||
}
|
||||
|
||||
::umask(utmp);
|
||||
|
||||
if ((journal.is_open() == false) && (journalh == NULL))
|
||||
journalh = IDBDataFile::open(
|
||||
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "a", 0);
|
||||
if (journalh == NULL)
|
||||
throw runtime_error("Could not open the BRM journal for writing!");
|
||||
}
|
||||
else
|
||||
@ -180,7 +169,7 @@ SlaveComm::SlaveComm(string hostname, SlaveDBRMNode* s) :
|
||||
}
|
||||
|
||||
SlaveComm::SlaveComm()
|
||||
: currentSaveFD(-1), currentSaveFile(NULL), journalh(NULL)
|
||||
: currentSaveFile(NULL), journalh(NULL)
|
||||
#ifdef _MSC_VER
|
||||
, fPids(0), fMaxPids(64)
|
||||
#endif
|
||||
@ -222,7 +211,6 @@ SlaveComm::~SlaveComm()
|
||||
|
||||
if (firstSlave)
|
||||
{
|
||||
close(currentSaveFD);
|
||||
delete currentSaveFile;
|
||||
currentSaveFile = NULL;
|
||||
}
|
||||
@ -2007,19 +1995,13 @@ void SlaveComm::do_confirm()
|
||||
if (firstSlave && (takeSnapshot ||
|
||||
(journalCount >= snapshotInterval && snapshotInterval >= 0)))
|
||||
{
|
||||
const char* filename = tmp.c_str();
|
||||
|
||||
if (false && !IDBPolicy::useHdfs() && currentSaveFD < 0)
|
||||
{
|
||||
currentSaveFD = open(filename, O_WRONLY | O_CREAT, 0664);
|
||||
}
|
||||
else if ((true || IDBPolicy::useHdfs()) && !currentSaveFile)
|
||||
if (!currentSaveFile)
|
||||
{
|
||||
currentSaveFile = IDBDataFile::open(
|
||||
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "wb", 0);
|
||||
IDBPolicy::getType(tmp.c_str(), IDBPolicy::WRITEENG), tmp.c_str(), "wb", 0);
|
||||
}
|
||||
|
||||
if (currentSaveFD < 0 && currentSaveFile == NULL)
|
||||
if (currentSaveFile == NULL)
|
||||
{
|
||||
ostringstream os;
|
||||
os << "WorkerComm: failed to open the current savefile. errno: "
|
||||
@ -2028,9 +2010,7 @@ void SlaveComm::do_confirm()
|
||||
throw runtime_error(os.str());
|
||||
}
|
||||
|
||||
#ifndef _MSC_VER
|
||||
chmod(filename, 0664);
|
||||
#endif
|
||||
|
||||
tmp = savefile + (saveFileToggle ? 'A' : 'B');
|
||||
slave->saveState(tmp);
|
||||
#ifndef _MSC_VER
|
||||
@ -2038,74 +2018,33 @@ void SlaveComm::do_confirm()
|
||||
#endif
|
||||
int err = 0;
|
||||
|
||||
if (currentSaveFile)
|
||||
// MCOL-1558. Make the _current file relative to DBRMRoot.
|
||||
string relative = tmp.substr(tmp.find_last_of('/') + 1);
|
||||
err = currentSaveFile->write(relative.c_str(), relative.length());
|
||||
|
||||
if (err < (int) relative.length())
|
||||
{
|
||||
// MCOL-1558. Make the _current file relative to DBRMRoot.
|
||||
string relative = tmp.substr(tmp.find_last_of('/') + 1);
|
||||
err = currentSaveFile->write(relative.c_str(), relative.length());
|
||||
ostringstream os;
|
||||
os << "WorkerComm: currentfile write() returned " << err
|
||||
<< " file pointer is " << currentSaveFile;
|
||||
|
||||
if (err < (int) relative.length())
|
||||
{
|
||||
ostringstream os;
|
||||
os << "WorkerComm: currentfile write() returned " << err
|
||||
<< " file pointer is " << currentSaveFile;
|
||||
if (err < 0)
|
||||
os << " errno: " << strerror(errno);
|
||||
|
||||
if (err < 0)
|
||||
os << " errno: " << strerror(errno);
|
||||
|
||||
log(os.str());
|
||||
}
|
||||
|
||||
currentSaveFile->flush();
|
||||
delete currentSaveFile;
|
||||
currentSaveFile = NULL;
|
||||
saveFileToggle = !saveFileToggle;
|
||||
|
||||
const char* filename = journalName.c_str();
|
||||
//uint32_t utmp = ::umask(0);
|
||||
delete journalh;
|
||||
journalh = IDBDataFile::open(
|
||||
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "w+b", 0);
|
||||
//::umask(utmp);
|
||||
|
||||
if (!journalh)
|
||||
throw runtime_error("Could not open the BRM journal for writing!");
|
||||
log(os.str());
|
||||
}
|
||||
else
|
||||
{
|
||||
lseek(currentSaveFD, 0, SEEK_SET);
|
||||
// MCOL-1558. Make the _current file relative to DBRMRoot.
|
||||
string relative = tmp.substr(tmp.find_last_of('/') + 1);
|
||||
err = write(currentSaveFD, relative.c_str(), relative.length());
|
||||
|
||||
if (err < (int) relative.length())
|
||||
{
|
||||
ostringstream os;
|
||||
os << "WorkerComm: currentfile write() returned " << err
|
||||
<< " fd is " << currentSaveFD;
|
||||
currentSaveFile->flush();
|
||||
delete currentSaveFile;
|
||||
currentSaveFile = NULL;
|
||||
saveFileToggle = !saveFileToggle;
|
||||
|
||||
if (err < 0)
|
||||
os << " errno: " << strerror(errno);
|
||||
delete journalh;
|
||||
journalh = IDBDataFile::open(
|
||||
IDBPolicy::getType(journalName.c_str(), IDBPolicy::WRITEENG), journalName.c_str(), "w+b", 0);
|
||||
|
||||
log(os.str());
|
||||
}
|
||||
|
||||
#ifdef _MSC_VER
|
||||
//FIXME: Do we need to account for Windows EOL conversions?
|
||||
_chsize_s(currentSaveFD, tmp.length());
|
||||
_commit(currentSaveFD);
|
||||
#else
|
||||
err = ftruncate(currentSaveFD, relative.length());
|
||||
fsync(currentSaveFD);
|
||||
#endif
|
||||
saveFileToggle = !saveFileToggle;
|
||||
|
||||
/* Is there a nicer way to truncate the file using an ofstream? */
|
||||
journal.close();
|
||||
uint32_t utmp = ::umask(0);
|
||||
journal.open(journalName.c_str(), ios_base::binary | ios_base::out | ios_base::trunc);
|
||||
::umask(utmp);
|
||||
}
|
||||
if (!journalh)
|
||||
throw runtime_error("Could not open the BRM journal for writing!");
|
||||
|
||||
takeSnapshot = false;
|
||||
doSaveDelta = false;
|
||||
@ -2258,70 +2197,29 @@ int SlaveComm::replayJournal(string prefix)
|
||||
|
||||
const char* filename = journalName.c_str();
|
||||
|
||||
if (true || IDBPolicy::useHdfs())
|
||||
IDBDataFile* journalf = IDBDataFile::open(
|
||||
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);
|
||||
|
||||
if (!journalf)
|
||||
{
|
||||
IDBDataFile* journalf = IDBDataFile::open(
|
||||
IDBPolicy::getType(filename, IDBPolicy::WRITEENG), filename, "rb", 0);
|
||||
|
||||
if (!journalf)
|
||||
{
|
||||
cout << "Error opening journal file " << fName << endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (journalf->size() == 0) // empty file, nothing to replay
|
||||
return 0;
|
||||
|
||||
ssize_t readIn = 0;
|
||||
|
||||
do
|
||||
{
|
||||
readIn = journalf->read((char*) &len, sizeof(len));
|
||||
|
||||
if (readIn > 0)
|
||||
{
|
||||
cmd.needAtLeast(len);
|
||||
readIn = journalf->read((char*) cmd.getInputPtr(), len);
|
||||
cmd.advanceInputPtr(len);
|
||||
|
||||
try
|
||||
{
|
||||
processCommand(cmd);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
cout << e.what() << " Journal replay was incomplete." << endl;
|
||||
slave->undoChanges();
|
||||
return -1;
|
||||
}
|
||||
|
||||
slave->confirmChanges();
|
||||
cmd.restart();
|
||||
ret++;
|
||||
}
|
||||
}
|
||||
while (readIn > 0);
|
||||
cout << "Error opening journal file " << fName << endl;
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
|
||||
if (journalf->size() == 0) // empty file, nothing to replay
|
||||
return 0;
|
||||
|
||||
ssize_t readIn = 0;
|
||||
|
||||
do
|
||||
{
|
||||
ifstream journalf;
|
||||
journalf.open(filename, ios_base::in | ios_base::binary);
|
||||
readIn = journalf->read((char*) &len, sizeof(len));
|
||||
|
||||
if (!journalf.is_open())
|
||||
if (readIn > 0)
|
||||
{
|
||||
cout << "Error opening journal file " << fName << endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
while (journalf)
|
||||
{
|
||||
journalf.read((char*) &len, sizeof(len));
|
||||
|
||||
if (!journalf)
|
||||
break;
|
||||
|
||||
cmd.needAtLeast(len);
|
||||
journalf.read((char*) cmd.getInputPtr(), len);
|
||||
readIn = journalf->read((char*) cmd.getInputPtr(), len);
|
||||
cmd.advanceInputPtr(len);
|
||||
|
||||
try
|
||||
@ -2339,7 +2237,7 @@ int SlaveComm::replayJournal(string prefix)
|
||||
cmd.restart();
|
||||
ret++;
|
||||
}
|
||||
}
|
||||
} while (readIn > 0);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -2368,20 +2266,10 @@ void SlaveComm::saveDelta()
|
||||
{
|
||||
uint32_t len = delta.length();
|
||||
|
||||
if (true || IDBPolicy::useHdfs())
|
||||
{
|
||||
journalh->write((const char*) &len, sizeof(len));
|
||||
journalh->write((const char*) delta.buf(), delta.length());
|
||||
journalh->flush();
|
||||
}
|
||||
else
|
||||
{
|
||||
journal.seekg(0, ios_base::end);
|
||||
journal.write((const char*) &len, sizeof(len));
|
||||
journal.write((const char*) delta.buf(), delta.length());
|
||||
journal.sync();
|
||||
}
|
||||
|
||||
|
||||
journalh->write((const char*) &len, sizeof(len));
|
||||
journalh->write((const char*) delta.buf(), delta.length());
|
||||
journalh->flush();
|
||||
journalCount++;
|
||||
}
|
||||
catch (exception& e)
|
||||
|
Reference in New Issue
Block a user