1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-27 21:01:50 +03:00

Incremental commit. Stripping out the OBE fstream IO code.

This commit is contained in:
Patrick LeBlanc
2019-07-12 10:06:56 -05:00
parent 3e77912dcd
commit 16dc887f20
8 changed files with 256 additions and 922 deletions

View File

@ -1133,85 +1133,6 @@ void ExtentMap::reserveLBIDRange(LBID_t start, uint8_t size)
... (* numFL)
*/
void ExtentMap::loadVersion4(ifstream& in)
{
int emNumElements, flNumElements;
in.read((char*) &emNumElements, sizeof(int));
in.read((char*) &flNumElements, sizeof(int));
idbassert(emNumElements > 0);
void *fExtentMapPtr = static_cast<void*>(fExtentMap);
memset(fExtentMapPtr, 0, fEMShminfo->allocdSize);
fEMShminfo->currentSize = 0;
// init the free list
memset(fFreeList, 0, fFLShminfo->allocdSize);
fFreeList[0].size = (1 << 26); // 2^36 LBIDs
fFLShminfo->currentSize = sizeof(InlineLBIDRange);
// @Bug 3498
// Calculate how big an extent map we're going to need and allocate it in one call
if ((fEMShminfo->allocdSize / sizeof(EMEntry)) < (unsigned)emNumElements)
{
size_t nrows = emNumElements;
//Round up to the nearest EM_INCREMENT_ROWS
if ((nrows % EM_INCREMENT_ROWS) != 0)
{
nrows /= EM_INCREMENT_ROWS;
nrows++;
nrows *= EM_INCREMENT_ROWS;
}
growEMShmseg(nrows);
}
for (int i = 0; i < emNumElements; i++)
{
in.read((char*) &fExtentMap[i], sizeof(EMEntry));
reserveLBIDRange(fExtentMap[i].range.start, fExtentMap[i].range.size);
//@bug 1911 - verify status value is valid
if (fExtentMap[i].status < EXTENTSTATUSMIN ||
fExtentMap[i].status > EXTENTSTATUSMAX)
fExtentMap[i].status = EXTENTAVAILABLE;
}
fEMShminfo->currentSize = emNumElements * sizeof(EMEntry);
#ifdef DUMP_EXTENT_MAP
EMEntry* emSrc = fExtentMap;
cout << "lbid\tsz\toid\tfbo\thwm\tpart#\tseg#\tDBRoot\twid\tst\thi\tlo\tsq\tv" << endl;
for (int i = 0; i < emNumElements; i++)
{
cout <<
emSrc[i].start
<< '\t' << emSrc[i].size
<< '\t' << emSrc[i].fileID
<< '\t' << emSrc[i].blockOffset
<< '\t' << emSrc[i].HWM
<< '\t' << emSrc[i].partitionNum
<< '\t' << emSrc[i].segmentNum
<< '\t' << emSrc[i].dbRoot
<< '\t' << emSrc[i].status
<< '\t' << emSrc[i].partition.cprange.hi_val
<< '\t' << emSrc[i].partition.cprange.lo_val
<< '\t' << emSrc[i].partition.cprange.sequenceNum
<< '\t' << (int)(emSrc[i].partition.cprange.isValid)
<< endl;
}
cout << "Free list entries:" << endl;
cout << "start\tsize" << endl;
for (int i = 0; i < flNumElements; i++)
cout << fFreeList[i].start << '\t' << fFreeList[i].size << endl;
#endif
}
void ExtentMap::loadVersion4(IDBDataFile* in)
{
int emNumElements = 0, flNumElements = 0;
@ -1337,79 +1258,37 @@ void ExtentMap::load(const string& filename, bool fixFL)
throw;
}
// XXXPAT: Forcing the IDB path. Remove the fstream path once we see this works.
if (true || IDBPolicy::useHdfs())
const char* filename_p = filename.c_str();
scoped_ptr<IDBDataFile> in(IDBDataFile::open(
IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG),
filename_p, "r", 0));
if (!in)
{
const char* filename_p = filename.c_str();
scoped_ptr<IDBDataFile> in(IDBDataFile::open(
IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG),
filename_p, "r", 0));
log_errno("ExtentMap::load(): open");
releaseFreeList(WRITE);
releaseEMEntryTable(WRITE);
throw ios_base::failure("ExtentMap::load(): open failed. Check the error log.");
}
if (!in)
{
log_errno("ExtentMap::load(): open");
releaseFreeList(WRITE);
releaseEMEntryTable(WRITE);
throw ios_base::failure("ExtentMap::load(): open failed. Check the error log.");
}
try
{
int emVersion = 0;
int bytes = in->read((char*) &emVersion, sizeof(int));
try
if (bytes == (int) sizeof(int) && emVersion == EM_MAGIC_V4)
loadVersion4(in.get());
else
{
int emVersion = 0;
int bytes = in->read((char*) &emVersion, sizeof(int));
if (bytes == (int) sizeof(int) && emVersion == EM_MAGIC_V4)
loadVersion4(in.get());
else
{
log("ExtentMap::load(): That file is not a valid ExtentMap image");
throw runtime_error("ExtentMap::load(): That file is not a valid ExtentMap image");
}
}
catch (...)
{
releaseFreeList(WRITE);
releaseEMEntryTable(WRITE);
throw;
log("ExtentMap::load(): That file is not a valid ExtentMap image");
throw runtime_error("ExtentMap::load(): That file is not a valid ExtentMap image");
}
}
else // fstream path to be remove
catch (...)
{
ifstream in;
in.open(filename.c_str(), ios_base::in | ios_base::binary);
if (!in)
{
log_errno("ExtentMap::load(): open");
releaseFreeList(WRITE);
releaseEMEntryTable(WRITE);
throw ios_base::failure("ExtentMap::load(): open failed. Check the error log.");
}
in.exceptions(ios_base::badbit | ios_base::failbit);
try
{
int emVersion;
in.read((char*) &emVersion, sizeof(int));
if (emVersion == EM_MAGIC_V4)
loadVersion4(in);
else
{
log("ExtentMap::load(): That file is not a valid ExtentMap image");
throw runtime_error("ExtentMap::load(): That file is not a valid ExtentMap image");
}
}
catch (...)
{
in.close();
releaseFreeList(WRITE);
releaseEMEntryTable(WRITE);
throw;
}
in.close();
releaseFreeList(WRITE);
releaseEMEntryTable(WRITE);
throw;
}
releaseFreeList(WRITE);
@ -1453,77 +1332,53 @@ void ExtentMap::save(const string& filename)
throw runtime_error("ExtentMap::save(): got request to save an empty BRM");
}
// XXXPAT: I don't know why there are two options here. It can just use the IDBDataFile stuff.
// Forcing the IDB option to execute for now. Leaving the old fstream version there in case we find there's
// a case the IDB option doesn't work.
if (true || IDBPolicy::useHdfs())
const char* filename_p = filename.c_str();
scoped_ptr<IDBDataFile> out(IDBDataFile::open(
IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG),
filename_p, "wb", IDBDataFile::USE_VBUF));
if (!out)
{
const char* filename_p = filename.c_str();
scoped_ptr<IDBDataFile> out(IDBDataFile::open(
IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG),
filename_p, "wb", IDBDataFile::USE_VBUF));
log_errno("ExtentMap::save(): open");
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw ios_base::failure("ExtentMap::save(): open failed. Check the error log.");
}
if (!out)
loadSize[0] = EM_MAGIC_V4;
loadSize[1] = fEMShminfo->currentSize / sizeof(EMEntry);
loadSize[2] = fFLShminfo->allocdSize / sizeof(InlineLBIDRange); // needs to send all entries
int bytes = 0;
try
{
const int wsize = 3 * sizeof(int);
bytes = out->write((char*)loadSize, wsize);
if (bytes != wsize)
throw ios_base::failure("ExtentMap::save(): write failed. Check the error log.");
}
catch (...)
{
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw;
}
allocdSize = fEMShminfo->allocdSize / sizeof(EMEntry);
//const int emEntrySize = sizeof(EMEntry);
int first = -1, last = -1, err;
size_t progress, writeSize;
for (i = 0; i < allocdSize; i++)
{
if (fExtentMap[i].range.size > 0 && first == -1)
first = i;
else if (fExtentMap[i].range.size <= 0 && first != -1)
{
log_errno("ExtentMap::save(): open");
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw ios_base::failure("ExtentMap::save(): open failed. Check the error log.");
}
loadSize[0] = EM_MAGIC_V4;
loadSize[1] = fEMShminfo->currentSize / sizeof(EMEntry);
loadSize[2] = fFLShminfo->allocdSize / sizeof(InlineLBIDRange); // needs to send all entries
int bytes = 0;
try
{
const int wsize = 3 * sizeof(int);
bytes = out->write((char*)loadSize, wsize);
if (bytes != wsize)
throw ios_base::failure("ExtentMap::save(): write failed. Check the error log.");
}
catch (...)
{
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw;
}
allocdSize = fEMShminfo->allocdSize / sizeof(EMEntry);
//const int emEntrySize = sizeof(EMEntry);
int first = -1, last = -1, err;
size_t progress, writeSize;
for (i = 0; i < allocdSize; i++)
{
if (fExtentMap[i].range.size > 0 && first == -1)
first = i;
else if (fExtentMap[i].range.size <= 0 && first != -1)
{
last = i;
writeSize = (last - first) * sizeof(EMEntry);
progress = 0;
char *writePos = (char *) &fExtentMap[first];
while (progress < writeSize)
{
err = out->write(writePos + progress, writeSize - progress);
if (err < 0)
{
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw ios_base::failure("ExtentMap::save(): write failed. Check the error log.");
}
progress += err;
}
first = -1;
}
}
if (first != -1)
{
writeSize = (allocdSize - first) * sizeof(EMEntry);
last = i;
writeSize = (last - first) * sizeof(EMEntry);
progress = 0;
char *writePos = (char *) &fExtentMap[first];
while (progress < writeSize)
@ -1537,14 +1392,14 @@ void ExtentMap::save(const string& filename)
}
progress += err;
}
first = -1;
}
//allocdSize = fFLShminfo->allocdSize / sizeof(InlineLBIDRange);
//const int inlineLbidRangeSize = sizeof(InlineLBIDRange);
}
if (first != -1)
{
writeSize = (allocdSize - first) * sizeof(EMEntry);
progress = 0;
writeSize = fFLShminfo->allocdSize;
char *writePos = (char *) fFreeList;
char *writePos = (char *) &fExtentMap[first];
while (progress < writeSize)
{
err = out->write(writePos + progress, writeSize - progress);
@ -1557,86 +1412,23 @@ void ExtentMap::save(const string& filename)
progress += err;
}
}
else // this is the fstream version to be expired
//allocdSize = fFLShminfo->allocdSize / sizeof(InlineLBIDRange);
//const int inlineLbidRangeSize = sizeof(InlineLBIDRange);
progress = 0;
writeSize = fFLShminfo->allocdSize;
char *writePos = (char *) fFreeList;
while (progress < writeSize)
{
ofstream out;
// Make em writes to disk use a buffer size of StrmBufSize bytes (instead of the default 8K)
const unsigned StrmBufSize = 1 * 1024 * 1024;
scoped_array<char> buf(new char[StrmBufSize]);
out.rdbuf()->pubsetbuf(buf.get(), StrmBufSize);
utmp = ::umask(0);
out.open(filename.c_str(), ios_base::out | ios_base::binary);
::umask(utmp);
if (!out)
err = out->write(writePos + progress, writeSize - progress);
if (err < 0)
{
log_errno("ExtentMap::save(): open");
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw ios_base::failure("ExtentMap::save(): open failed. Check the error log.");
throw ios_base::failure("ExtentMap::save(): write failed. Check the error log.");
}
out.exceptions(ios_base::badbit);
loadSize[0] = EM_MAGIC_V4;
loadSize[1] = fEMShminfo->currentSize / sizeof(EMEntry);
loadSize[2] = fFLShminfo->allocdSize / sizeof(InlineLBIDRange); // needs to send all entries
try
{
out.write((char*)loadSize, 3 * sizeof(int));
}
catch (...)
{
out.close();
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw;
}
allocdSize = fEMShminfo->allocdSize / sizeof(EMEntry);
for (i = 0; i < allocdSize; i++)
{
if (fExtentMap[i].range.size > 0)
{
try
{
out.write((char*) &fExtentMap[i], sizeof(EMEntry));
}
catch (...)
{
out.close();
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw;
}
}
}
allocdSize = fFLShminfo->allocdSize / sizeof(InlineLBIDRange);
for (i = 0; i < allocdSize; i++)
{
// if (fFreeList[i].size > 0) {
try
{
out.write((char*) &fFreeList[i], sizeof(InlineLBIDRange));
}
catch (...)
{
out.close();
releaseFreeList(READ);
releaseEMEntryTable(READ);
throw;
}
// }
}
out.close();
progress += err;
}
releaseFreeList(READ);