1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Otto Kekäläinen 70124ecc01 Fix trivial spelling errors
- occured -> occurred
- reponse -> response
- seperated -> separated

All new code of the whole pull request, including one or several files
that are either new files or modified ones, are contributed under the
BSD-new license. I am contributing on behalf of my employer Amazon Web
Services, Inc.
2023-03-11 11:59:47 -08:00

1199 lines
32 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*****************************************************************************
* $Id: vbbm.cpp 1926 2013-06-30 21:18:14Z wweeks $
*
****************************************************************************/
#include <iostream>
#include <vector>
#include <sstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <values.h>
#include <errno.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <ext/stdio_filebuf.h>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/scoped_array.hpp>
#include <boost/scoped_ptr.hpp>
#include "blocksize.h"
#include "rwlock.h"
#include "brmtypes.h"
#include "mastersegmenttable.h"
#include "vss.h"
#include "configcpp.h"
#include "exceptclasses.h"
#include "hasher.h"
#include "cacheutils.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#define VBBM_DLLEXPORT
#include "vbbm.h"
#undef VBBM_DLLEXPORT
#define VBBM_MAGIC_V1 0x7b27ec13
#define VBBM_MAGIC_V2 0x1fb58c7a
#define VBBM_CHUNK_SIZE 100
using namespace std;
using namespace boost;
using namespace idbdatafile;
namespace BRM
{
VBBMEntry::VBBMEntry()
{
lbid = -1;
verID = 0;
vbOID = 0;
vbFBO = 0;
next = -1;
}
/*static*/
boost::mutex VBBMImpl::fInstanceMutex;
boost::mutex VBBM::mutex;
/*static*/
VBBMImpl* VBBMImpl::fInstance = 0;
/*static*/
VBBMImpl* VBBMImpl::makeVBBMImpl(unsigned key, off_t size, bool readOnly)
{
boost::mutex::scoped_lock lk(fInstanceMutex);
if (fInstance)
{
if (key != fInstance->fVBBM.key())
{
BRMShmImpl newShm(key, size);
fInstance->swapout(newShm);
}
idbassert(key == fInstance->fVBBM.key());
return fInstance;
}
fInstance = new VBBMImpl(key, size, readOnly);
return fInstance;
}
VBBMImpl::VBBMImpl(unsigned key, off_t size, bool readOnly) : fVBBM(key, size, readOnly)
{
}
VBBM::VBBM()
{
vbbm = NULL;
currentVBBMShmkey = -1;
vbbmShmid = 0;
vbbmShminfo = NULL;
r_only = false;
fPVBBMImpl = 0;
currentFileSize = 0;
}
VBBM::~VBBM()
{
}
void VBBM::initShmseg(int nFiles)
{
// VBFileMetadata *newfiles;
int* newBuckets;
VBBMEntry* newStorage;
int i;
char* shmseg;
vbbm->vbCapacity = VBSTORAGE_INITIAL_SIZE / sizeof(VBBMEntry);
vbbm->vbCurrentSize = 0;
vbbm->vbLWM = 0;
vbbm->numHashBuckets = VBTABLE_INITIAL_SIZE / sizeof(int);
shmseg = reinterpret_cast<char*>(vbbm);
// newfiles = reinterpret_cast<VBFileMetadata*>
// (&shmseg[sizeof(VBShmsegHeader)]);
newBuckets = reinterpret_cast<int*>(&shmseg[sizeof(VBShmsegHeader) + nFiles * sizeof(VBFileMetadata)]);
newStorage = reinterpret_cast<VBBMEntry*>(
&shmseg[sizeof(VBShmsegHeader) + nFiles * sizeof(VBFileMetadata) + vbbm->numHashBuckets * sizeof(int)]);
setCurrentFileSize();
vbbm->nFiles = nFiles;
for (i = 0; i < vbbm->numHashBuckets; i++)
newBuckets[i] = -1;
for (i = 0; i < vbbm->vbCapacity; i++)
newStorage[i].lbid = -1;
}
// ported from ExtentMap
void VBBM::lock(OPS op)
{
char* shmseg;
if (op == READ)
{
vbbmShminfo = mst.getTable_read(MasterSegmentTable::VBBMSegment);
mutex.lock();
}
else
vbbmShminfo = mst.getTable_write(MasterSegmentTable::VBBMSegment);
// this means that either the VBBM isn't attached or that it was resized
if (currentVBBMShmkey != vbbmShminfo->tableShmkey)
{
if (vbbm != NULL)
{
vbbm = NULL;
}
if (vbbmShminfo->allocdSize == 0)
{
if (op == READ)
{
mutex.unlock();
mst.getTable_upgrade(MasterSegmentTable::VBBMSegment);
if (vbbmShminfo->allocdSize == 0)
{
try
{
growVBBM();
}
catch (...)
{
release(WRITE);
throw;
}
}
mst.getTable_downgrade(MasterSegmentTable::VBBMSegment);
}
else
{
try
{
growVBBM();
}
catch (...)
{
release(WRITE);
throw;
}
}
}
else
{
currentVBBMShmkey = vbbmShminfo->tableShmkey;
fPVBBMImpl = VBBMImpl::makeVBBMImpl(currentVBBMShmkey, 0);
idbassert(fPVBBMImpl);
if (r_only)
fPVBBMImpl->makeReadOnly();
vbbm = fPVBBMImpl->get();
shmseg = reinterpret_cast<char*>(vbbm);
files = reinterpret_cast<VBFileMetadata*>(&shmseg[sizeof(VBShmsegHeader)]);
hashBuckets =
reinterpret_cast<int*>(&shmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]);
storage = reinterpret_cast<VBBMEntry*>(
&shmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) +
vbbm->numHashBuckets * sizeof(int)]);
if (op == READ)
mutex.unlock();
}
}
else if (op == READ)
mutex.unlock();
}
// ported from ExtentMap
void VBBM::release(OPS op)
{
if (op == READ)
mst.releaseTable_read(MasterSegmentTable::VBBMSegment);
else
mst.releaseTable_write(MasterSegmentTable::VBBMSegment);
}
// assumes write lock is held
// Right now, adding a file and growing are mutually exclusive ops.
void VBBM::growVBBM(bool addAFile)
{
int allocSize;
int nFiles = -1;
key_t newshmkey;
char* newshmseg;
if (vbbmShminfo->allocdSize == 0)
{
if (addAFile)
nFiles = 1;
else
nFiles = 0;
allocSize = (sizeof(VBShmsegHeader) + (nFiles * sizeof(VBFileMetadata)) + VBSTORAGE_INITIAL_SIZE +
VBTABLE_INITIAL_SIZE);
}
else
{
if (!addAFile)
allocSize = vbbmShminfo->allocdSize + VBBM_INCREMENT;
else
{
vbbm->nFiles++;
allocSize = vbbmShminfo->allocdSize + sizeof(VBFileMetadata);
}
}
newshmkey = chooseShmkey();
if (fPVBBMImpl)
{
BRMShmImpl newShm(newshmkey, allocSize);
newshmseg = static_cast<char*>(newShm.fMapreg.get_address());
memset(newshmseg, 0, allocSize);
if (vbbm != NULL)
{
VBShmsegHeader* tmp = reinterpret_cast<VBShmsegHeader*>(newshmseg);
tmp->vbCapacity = vbbm->vbCapacity;
tmp->numHashBuckets = vbbm->numHashBuckets;
if (!addAFile)
{
tmp->vbCapacity += VBSTORAGE_INCREMENT / sizeof(VBBMEntry);
tmp->numHashBuckets += VBTABLE_INCREMENT / sizeof(int);
}
tmp->vbLWM = 0;
copyVBBM(tmp);
}
undoRecords.clear();
fPVBBMImpl->swapout(newShm);
}
else
{
fPVBBMImpl = VBBMImpl::makeVBBMImpl(newshmkey, allocSize);
newshmseg = reinterpret_cast<char*>(fPVBBMImpl->get());
memset(newshmseg, 0, allocSize);
}
vbbm = fPVBBMImpl->get();
if (vbbmShminfo->allocdSize == 0) // this means the shmseg was created by this call
initShmseg(nFiles);
vbbmShminfo->tableShmkey = currentVBBMShmkey = newshmkey;
vbbmShminfo->allocdSize = allocSize;
if (r_only)
fPVBBMImpl->makeReadOnly();
files = reinterpret_cast<VBFileMetadata*>(&newshmseg[sizeof(VBShmsegHeader)]);
hashBuckets =
reinterpret_cast<int*>(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]);
storage =
reinterpret_cast<VBBMEntry*>(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) +
vbbm->numHashBuckets * sizeof(int)]);
}
void VBBM::growForLoad(int count)
{
int allocSize;
int nFiles;
key_t newshmkey;
char* newshmseg;
int i;
if (vbbm)
nFiles = vbbm->nFiles;
else
nFiles = 0;
if (count < VBSTORAGE_INITIAL_COUNT)
count = VBSTORAGE_INITIAL_COUNT;
// round up to next normal increment point out of paranoia.
if (count % VBSTORAGE_INCREMENT_COUNT)
count = ((count / VBSTORAGE_INCREMENT_COUNT) + 1) * VBSTORAGE_INCREMENT_COUNT;
allocSize = VBBM_SIZE(nFiles, count);
newshmkey = chooseShmkey();
if (fPVBBMImpl)
{
BRMShmImpl newShm(newshmkey, allocSize);
newshmseg = static_cast<char*>(newShm.fMapreg.get_address());
// copy the file meta to the new segment
memcpy((char*)&newshmseg[sizeof(VBShmsegHeader)], files, sizeof(VBFileMetadata) * nFiles);
fPVBBMImpl->swapout(newShm);
}
else
{
fPVBBMImpl = VBBMImpl::makeVBBMImpl(newshmkey, allocSize);
}
vbbm = fPVBBMImpl->get();
vbbm->nFiles = nFiles;
vbbm->vbCapacity = count;
vbbm->vbLWM = 0;
vbbm->numHashBuckets = count / 4;
vbbmShminfo->tableShmkey = currentVBBMShmkey = newshmkey;
vbbmShminfo->allocdSize = allocSize;
newshmseg = (char*)vbbm;
files = reinterpret_cast<VBFileMetadata*>(&newshmseg[sizeof(VBShmsegHeader)]);
hashBuckets =
reinterpret_cast<int*>(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]);
storage =
reinterpret_cast<VBBMEntry*>(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) +
vbbm->numHashBuckets * sizeof(int)]);
for (i = 0; i < vbbm->numHashBuckets; i++)
hashBuckets[i] = -1;
for (i = 0; i < vbbm->vbCapacity; i++)
storage[i].lbid = -1;
undoRecords.clear();
}
// assumes write lock is held and the src is vbbm
// and that dest->{numHashBuckets, vbCapacity, vbLWM} have been set.
void VBBM::copyVBBM(VBShmsegHeader* dest)
{
int i;
int* newHashtable;
VBBMEntry* newStorage;
VBFileMetadata* newFiles;
char* cDest = reinterpret_cast<char*>(dest);
// copy metadata
dest->nFiles = vbbm->nFiles;
dest->vbCurrentSize = vbbm->vbCurrentSize;
newFiles = reinterpret_cast<VBFileMetadata*>(&cDest[sizeof(VBShmsegHeader)]);
newHashtable =
reinterpret_cast<int*>(&cDest[sizeof(VBShmsegHeader) + dest->nFiles * sizeof(VBFileMetadata)]);
newStorage =
reinterpret_cast<VBBMEntry*>(&cDest[sizeof(VBShmsegHeader) + dest->nFiles * sizeof(VBFileMetadata) +
dest->numHashBuckets * sizeof(int)]);
memcpy(newFiles, files, sizeof(VBFileMetadata) * vbbm->nFiles);
// initialize new storage & hash
for (i = 0; i < dest->numHashBuckets; i++)
newHashtable[i] = -1;
for (i = 0; i < dest->vbCapacity; i++)
newStorage[i].lbid = -1;
// walk the storage & re-hash all entries;
for (i = 0; i < vbbm->vbCurrentSize; i++)
if (storage[i].lbid != -1)
{
_insert(storage[i], dest, newHashtable, newStorage, true);
// confirmChanges();
}
}
key_t VBBM::chooseShmkey() const
{
int fixedKeys = 1;
key_t ret;
if (vbbmShminfo->tableShmkey + 1 == (key_t)(fShmKeys.KEYRANGE_VBBM_BASE + fShmKeys.KEYRANGE_SIZE - 1) ||
(unsigned)vbbmShminfo->tableShmkey < fShmKeys.KEYRANGE_VBBM_BASE)
ret = fShmKeys.KEYRANGE_VBBM_BASE + fixedKeys;
else
ret = vbbmShminfo->tableShmkey + 1;
return ret;
}
// write lock
void VBBM::insert(LBID_t lbid, VER_t verID, OID_t vbOID, uint32_t vbFBO, bool loading)
{
VBBMEntry entry;
#ifdef BRM_DEBUG
int i;
if (lbid < 0)
{
log("VBBM::insert(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::insert(): lbid must be >= 0");
}
if (verID < 0)
{
log("VBBM::insert(): verID must be >= 0", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::insert(): verID must be >= 0");
}
for (i = 0; i < vbbm->nFiles; i++)
if (vbOID == files[i].OID)
break;
if (i == vbbm->nFiles)
{
log("VBBM::insert(): vbOID must be the OID of a Version Buffer file", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::insert(): vbOID must be the OID of a Version Buffer file");
}
if (vbFBO > (files[i].fileSize / BLOCK_SIZE) || vbFBO < 0)
{
log("VBBM::insert(): vbFBO is out of bounds for that vbOID", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::insert(): vbFBO is out of bounds for that vbOID");
}
#endif
entry.lbid = lbid;
entry.verID = verID;
entry.vbOID = vbOID;
entry.vbFBO = vbFBO;
// check for resize
if (vbbm->vbCurrentSize == vbbm->vbCapacity)
growVBBM();
_insert(entry, vbbm, hashBuckets, storage, loading);
if (!loading)
makeUndoRecord(&vbbm->vbCurrentSize, sizeof(vbbm->vbCurrentSize));
vbbm->vbCurrentSize++;
}
// assumes write lock is held and that it is properly sized already
void VBBM::_insert(VBBMEntry& e, VBShmsegHeader* dest, int* destHash, VBBMEntry* destStorage, bool loading)
{
int hashIndex, cHashlen = sizeof(LBID_t) + sizeof(VER_t), insertIndex;
char* cHash = (char*)alloca(cHashlen);
utils::Hasher hasher;
memcpy(cHash, &e.lbid, sizeof(LBID_t));
memcpy(&cHash[sizeof(LBID_t)], &e.verID, sizeof(VER_t));
hashIndex = hasher(cHash, cHashlen) % dest->numHashBuckets;
insertIndex = dest->vbLWM;
while (destStorage[insertIndex].lbid != -1)
{
insertIndex++;
#ifdef BRM_DEBUG
if (insertIndex == dest->vbCapacity)
{
log("VBBM:_insert(): There are no empty entries. Possibly bad resize condition.",
logging::LOG_TYPE_DEBUG);
throw logic_error("VBBM:_insert(): There are no empty entries. Possibly bad resize condition.");
}
#endif
}
if (!loading)
{
makeUndoRecord(dest, sizeof(VBShmsegHeader));
makeUndoRecord(&destStorage[insertIndex], sizeof(VBBMEntry));
makeUndoRecord(&destHash[hashIndex], sizeof(int));
}
dest->vbLWM = insertIndex;
e.next = destHash[hashIndex];
destStorage[insertIndex] = e;
destHash[hashIndex] = insertIndex;
}
// assumes read lock is held
int VBBM::lookup(LBID_t lbid, VER_t verID, OID_t& oid, uint32_t& fbo) const
{
int index, prev, bucket;
//#ifdef BRM_DEBUG
if (lbid < 0)
{
log("VBBM::lookup(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::lookup(): lbid must be >= 0");
}
if (verID < 0)
{
log("VBBM::lookup(): verID must be > 1)", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::lookup(): verID must be > 1)");
}
//#endif
index = getIndex(lbid, verID, prev, bucket);
if (index == -1)
return -1;
oid = storage[index].vbOID;
fbo = storage[index].vbFBO;
return 0;
}
// assumes write lock
void VBBM::getBlocks(int num, OID_t vbOID, vector<VBRange>& freeRanges, VSS& vss, bool flushPMCache)
{
int blocksLeftInFile, blocksGathered = 0, i;
uint32_t fileIndex;
uint32_t firstFBO, lastFBO;
VBRange range;
vector<VBRange>::iterator it;
vector<LBID_t> flushList;
freeRanges.clear();
fileIndex = addVBFileIfNotExists(vbOID);
/*
for (i = 0; i < vbbm->nFiles; i++) {
cout << "file " << i << " vbOID=" << files[i].OID << " size=" << files[i].fileSize
<< endl;
}
*/
if ((uint32_t)num > files[fileIndex].fileSize / BLOCK_SIZE)
{
cout << "num = " << num << " filesize = " << files[fileIndex].fileSize << endl;
log("VBBM::getBlocks(): num is larger than the size of the version buffer", logging::LOG_TYPE_DEBUG);
throw logging::VBBMBufferOverFlowExcept(
"VBBM::getBlocks(): num is larger than the size of the version buffer");
}
while ((vbbm->vbCurrentSize + num) > vbbm->vbCapacity)
{
growVBBM();
// cout << " requested num = " << num << " and Growing vbbm ... " << endl;
}
while (blocksGathered < num)
{
blocksLeftInFile = (files[fileIndex].fileSize - files[fileIndex].nextOffset) / BLOCK_SIZE;
int blocksLeft = num - blocksGathered;
range.vbOID = files[fileIndex].OID;
range.vbFBO = files[fileIndex].nextOffset / BLOCK_SIZE;
range.size = (blocksLeftInFile >= blocksLeft ? blocksLeft : blocksLeftInFile);
makeUndoRecord(&files[fileIndex], sizeof(VBFileMetadata));
if (range.size == (uint32_t)blocksLeftInFile)
files[fileIndex].nextOffset = 0;
else
files[fileIndex].nextOffset += range.size * BLOCK_SIZE;
blocksGathered += range.size;
freeRanges.push_back(range);
}
// age the returned blocks out of the VB
for (it = freeRanges.begin(); it != freeRanges.end(); it++)
{
uint32_t firstChunk, lastChunk;
vbOID = it->vbOID;
firstFBO = it->vbFBO;
lastFBO = it->vbFBO + it->size - 1;
/* Age out at least 100 blocks at a time to reduce the # of times we have to do it.
* How to detect when it needs to be done and when it doesn't?
*
* Split VB space into 100-block chunks. When a chunk boundary is crossed,
* clear the whole chunk.
*/
firstChunk = firstFBO / VBBM_CHUNK_SIZE;
lastChunk = lastFBO / VBBM_CHUNK_SIZE;
// if the current range falls in the middle of a chunk and doesn't span chunks,
// there's nothing to do b/c the chunk is assumed to have been cleared already
if (((firstFBO % VBBM_CHUNK_SIZE) != 0) && (firstChunk == lastChunk))
continue;
// round up to the next chunk boundaries
if ((firstFBO % VBBM_CHUNK_SIZE) != 0) // this implies the range spans chunks
firstFBO = (firstChunk + 1) * VBBM_CHUNK_SIZE; // the first FBO of the next chunk
lastFBO = ((lastChunk + 1) * VBBM_CHUNK_SIZE - 1); // the last FBO of the last chunk
// don't go past the end of the file
if (lastFBO > files[fileIndex].fileSize / BLOCK_SIZE)
lastFBO = files[fileIndex].fileSize / BLOCK_SIZE;
// at this point [firstFBO, lastFBO] is the range to age out.
// ugh, walk the whole vbbm looking for matches.
for (i = 0; i < vbbm->vbCapacity; i++)
if (storage[i].lbid != -1 && storage[i].vbOID == vbOID && storage[i].vbFBO >= firstFBO &&
storage[i].vbFBO <= lastFBO)
{
if (vss.isEntryLocked(storage[i].lbid, storage[i].verID))
{
ostringstream msg;
msg << "VBBM::getBlocks(): version buffer overflow. Increase VersionBufferFileSize. Overflow "
"occurred in aged blocks. Requested NumBlocks:VbOid:vbFBO:lastFBO = "
<< num << ":" << vbOID << ":" << firstFBO << ":" << lastFBO << " lbid locked is "
<< storage[i].lbid << endl;
log(msg.str(), logging::LOG_TYPE_CRITICAL);
freeRanges.clear();
throw logging::VBBMBufferOverFlowExcept(msg.str());
}
vss.removeEntry(storage[i].lbid, storage[i].verID, &flushList);
removeEntry(storage[i].lbid, storage[i].verID);
}
}
if (flushPMCache && !flushList.empty())
cacheutils::flushPrimProcAllverBlocks(flushList);
}
// read lock
int VBBM::getIndex(LBID_t lbid, VER_t verID, int& prev, int& bucket) const
{
int cHashlen = sizeof(LBID_t) + sizeof(VER_t), currentIndex;
char* cHash = (char*)alloca(cHashlen);
VBBMEntry* listEntry;
utils::Hasher hasher;
memcpy(cHash, &lbid, sizeof(LBID_t));
memcpy(&cHash[sizeof(LBID_t)], &verID, sizeof(VER_t));
bucket = hasher(cHash, cHashlen) % vbbm->numHashBuckets;
prev = -1;
if (hashBuckets[bucket] == -1)
return -1;
currentIndex = hashBuckets[bucket];
while (currentIndex != -1)
{
listEntry = &storage[currentIndex];
if (listEntry->lbid == lbid && listEntry->verID == verID)
return currentIndex;
prev = currentIndex;
currentIndex = listEntry->next;
}
return -1;
}
void VBBM::removeEntry(LBID_t lbid, VER_t verID)
{
int index, prev, bucket;
#ifdef BRM_DEBUG
if (lbid < 0)
{
log("VBBM::removeEntry(): lbid must be >= 0", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::removeEntry(): lbid must be >= 0");
}
if (verID < 0)
{
log("VBBM::removeEntry(): verID must be >= 0", logging::LOG_TYPE_DEBUG);
throw invalid_argument("VBBM::removeEntry(): verID must be >= 0");
}
#endif
index = getIndex(lbid, verID, prev, bucket);
if (index == -1)
{
#ifdef BRM_DEBUG
ostringstream ostr;
ostr << "VBBM::removeEntry(): that entry doesn't exist lbid = " << lbid << " verID = " << verID << endl;
log(ostr.str(), logging::LOG_TYPE_DEBUG);
throw logic_error(ostr.str());
#else
return;
#endif
}
makeUndoRecord(&storage[index], sizeof(VBBMEntry));
storage[index].lbid = -1;
if (prev != -1)
{
makeUndoRecord(&storage[prev], sizeof(VBBMEntry));
storage[prev].next = storage[index].next;
}
else
{
makeUndoRecord(&hashBuckets[bucket], sizeof(int));
hashBuckets[bucket] = storage[index].next;
}
makeUndoRecord(vbbm, sizeof(VBShmsegHeader));
vbbm->vbCurrentSize--;
if (vbbm->vbLWM > index)
vbbm->vbLWM = index;
}
// read lock
int VBBM::size() const
{
#ifdef BRM_DEBUG
int i, ret = 0;
for (i = 0; i < vbbm->vbCapacity; i++)
if (storage[i].lbid != -1)
ret++;
if (ret != vbbm->vbCurrentSize)
{
ostringstream ostr;
ostr << "VBBM::checkConsistency(): actual size is " << ret << ", recorded size is "
<< vbbm->vbCurrentSize;
log(ostr.str(), logging::LOG_TYPE_DEBUG);
throw logic_error(ostr.str());
}
return ret;
#else
return vbbm->vbCurrentSize;
#endif
}
// read lock
bool VBBM::hashEmpty() const
{
int i;
for (i = 0; i < vbbm->numHashBuckets; i++)
if (hashBuckets[i] != -1)
return false;
return true;
}
// write lock
void VBBM::clear()
{
int allocSize;
int newshmkey;
int nFiles = -1;
char* newshmseg;
// save vars we need after the clear()
boost::scoped_array<VBFileMetadata> newFiles(new VBFileMetadata[vbbm->nFiles]);
memcpy(&newFiles[0], files, vbbm->nFiles * sizeof(VBFileMetadata));
setCurrentFileSize();
for (int i = 0; i < vbbm->nFiles; i++)
{
newFiles[i].fileSize = currentFileSize;
newFiles[i].nextOffset = 0;
}
nFiles = vbbm->nFiles;
allocSize = (sizeof(VBShmsegHeader) + (nFiles * sizeof(VBFileMetadata)) + VBSTORAGE_INITIAL_SIZE +
VBTABLE_INITIAL_SIZE);
// cout << "clear:: allocSize = " << allocSize << endl;
newshmkey = chooseShmkey();
fPVBBMImpl->clear(newshmkey, allocSize);
vbbm = fPVBBMImpl->get();
newshmseg = reinterpret_cast<char*>(vbbm);
initShmseg(nFiles);
vbbmShminfo->tableShmkey = currentVBBMShmkey = newshmkey;
vbbmShminfo->allocdSize = allocSize;
files = reinterpret_cast<VBFileMetadata*>(&newshmseg[sizeof(VBShmsegHeader)]);
hashBuckets =
reinterpret_cast<int*>(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]);
storage =
reinterpret_cast<VBBMEntry*>(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) +
vbbm->numHashBuckets * sizeof(int)]);
memcpy(files, &newFiles[0], vbbm->nFiles * sizeof(VBFileMetadata));
}
// read lock
int VBBM::checkConsistency() const
{
/*
Struct integrity tests
1: Verify that the recorded size matches the actual size
2: Verify there are no empty entries reachable from the hash table
3: Verify there are no empty entries below the LWM
4a: Make sure every VBBM entry points to a unique position in the VB
4b: Make sure every VBBM entry has unique LBID & VERID.
*/
int i, j, k;
/* Test 1 is already implemented */
size();
/* Test 2 - no empty elements reachable from the hash table */
int nextElement;
for (i = 0; i < vbbm->numHashBuckets; i++)
{
if (hashBuckets[i] != -1)
for (nextElement = hashBuckets[i]; nextElement != -1; nextElement = storage[nextElement].next)
if (storage[nextElement].lbid == -1)
throw logic_error(
"VBBM::checkConsistency(): an empty storage entry is reachable from the hash table");
}
/* Test 3 - verify that there are no empty entries below the LWM */
for (i = 0; i < vbbm->vbLWM; i++)
{
if (storage[i].lbid == -1)
{
cerr << "VBBM: LWM=" << vbbm->vbLWM << " first empty entry=" << i << endl;
throw logic_error("VBBM::checkConsistency(): LWM accounting error");
}
}
/* Test 4b - verify the uniqueness of the entries */
for (i = 0; i < vbbm->numHashBuckets; i++)
if (hashBuckets[i] != -1)
for (j = hashBuckets[i]; j != -1; j = storage[j].next)
for (k = storage[j].next; k != -1; k = storage[k].next)
if (storage[j].lbid == storage[k].lbid && storage[j].verID == storage[k].verID)
{
cerr << "VBBM: lbid=" << storage[j].lbid << " verID=" << storage[j].verID << endl;
throw logic_error("VBBM::checkConsistency(): Duplicate entry found");
}
/* Test 4a - verify the uniqueness of vbOID, vbFBO fields */
for (i = 0; i < vbbm->vbCapacity; i++)
if (storage[i].lbid != -1)
for (j = i + 1; j < vbbm->vbCapacity; j++)
if (storage[j].lbid != -1)
if (storage[j].vbOID == storage[i].vbOID && storage[j].vbFBO == storage[i].vbFBO)
{
cerr << "VBBM: lbid1=" << storage[i].lbid << " lbid2=" << storage[j].lbid
<< " verID1=" << storage[i].verID << " verID2=" << storage[j].verID
<< " share vbOID=" << storage[j].vbOID << " vbFBO=" << storage[j].vbFBO << endl;
throw logic_error("VBBM::checkConsistency(): 2 VBBM entries share space in the VB");
}
return 0;
}
void VBBM::setReadOnly()
{
r_only = true;
}
/* File Format (V1)
VBBM V1 magic (32-bits)
# of VBBM entries in capacity (32-bits)
struct VBBMEntry * #
These entries are considered optional to support going backward
and forward between versions.
nFiles (32-bits)
currentFileIndex (32-bits)
VBFileMetadata * nFiles
*/
/* File Format (V2):
*
* Version 2 magic (int)
* number of used VBBM entries (numEntries) (int)
* current number of VB files (nFiles) (int)
* VBFileMetadata * nFiles
* struct VBBMEntry * numEntries
*/
void VBBM::loadVersion2(IDBDataFile* in)
{
int vbbmEntries;
int nFiles;
int i;
VBBMEntry entry;
if (in->read((char*)&vbbmEntries, 4) != 4)
{
log_errno("VBBM::load()");
throw runtime_error("VBBM::load(): Failed to read entry number");
}
if (in->read((char*)&nFiles, 4) != 4)
{
log_errno("VBBM::load()");
throw runtime_error("VBBM::load(): Failed to read file number");
}
// Need to make clear() truncate the files section
if (vbbm->nFiles > nFiles)
vbbm->nFiles = nFiles;
clear();
while (vbbm->nFiles < nFiles)
growVBBM(true); // this allocates one file, doesn't grow the main storage
growForLoad(vbbmEntries);
const int nfileSize = sizeof(VBFileMetadata) * nFiles;
if (in->read((char*)files, nfileSize) != nfileSize)
{
log_errno("VBBM::load()");
throw runtime_error("VBBM::load(): Failed to load vb file meta data");
}
size_t readSize = vbbmEntries * sizeof(entry);
char* readBuf = new char[readSize];
size_t progress = 0;
int err;
while (progress < readSize)
{
err = in->read(readBuf + progress, readSize - progress);
if (err < 0)
{
log_errno("VBBM::load()");
throw runtime_error("VBBM::load(): Failed to load, check the critical log file");
}
else if (err == 0)
{
log("VBBM::load(): Got early EOF");
throw runtime_error("VBBM::load(): Got early EOF");
}
progress += err;
}
VBBMEntry* loadedEntries = (VBBMEntry*)readBuf;
for (i = 0; i < vbbmEntries; i++)
insert(loadedEntries[i].lbid, loadedEntries[i].verID, loadedEntries[i].vbOID, loadedEntries[i].vbFBO,
true);
}
//#include "boost/date_time/posix_time/posix_time.hpp"
// using namespace boost::posix_time;
void VBBM::load(string filename)
{
int magic;
const char* filename_p = filename.c_str();
scoped_ptr<IDBDataFile> in(
IDBDataFile::open(IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG), filename_p, "rb", 0));
// ptime time1, time2;
// time1 = microsec_clock::local_time();
// cout << "loading the VBBM " << time1 << endl;
if (!in)
{
log_errno("VBBM::load()");
throw runtime_error("VBBM::load(): Failed to open the file");
}
int bytes = in->read((char*)&magic, 4);
if (bytes != 4)
{
log("VBBM::load(): failed to read magic.");
throw runtime_error("VBBM::load(): failed to read magic.");
}
switch (magic)
{
case VBBM_MAGIC_V2: loadVersion2(in.get()); break;
default:
log("VBBM::load(): Bad magic. Not a VBBM file?");
throw runtime_error("VBBM::load(): Bad magic. Not a VBBM file?");
}
// time2 = microsec_clock::local_time();
// cout << "done loading " << time2 << " duration: " << time2-time1 << endl;
}
// read lock
void VBBM::save(string filename)
{
int i;
int var;
const char* filename_p = filename.c_str();
scoped_ptr<IDBDataFile> out(IDBDataFile::open(IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG),
filename_p, "wb", IDBDataFile::USE_VBUF));
if (!out)
{
log_errno("VBBM::save()");
throw runtime_error("VBBM::save(): Failed to open the file");
}
var = VBBM_MAGIC_V2;
[[maybe_unused]] int bytesWritten = 0;
[[maybe_unused]] int bytesToWrite = 12;
bytesWritten += out->write((char*)&var, 4);
bytesWritten += out->write((char*)&vbbm->vbCurrentSize, 4);
bytesWritten += out->write((char*)&vbbm->nFiles, 4);
bytesWritten += out->write((char*)files, sizeof(VBFileMetadata) * vbbm->nFiles);
bytesToWrite += sizeof(VBFileMetadata) * vbbm->nFiles;
int first = -1, last = -1, err;
size_t progress, writeSize;
for (i = 0; i < vbbm->vbCapacity; i++)
{
if (storage[i].lbid != -1 && first == -1)
first = i;
else if (storage[i].lbid == -1 && first != -1)
{
last = i;
writeSize = (last - first) * sizeof(VBBMEntry);
progress = 0;
char* writePos = (char*)&storage[first];
while (progress < writeSize)
{
err = out->write(writePos + progress, writeSize - progress);
if (err < 0)
{
log_errno("VBBM::save()");
throw runtime_error("VBBM::save(): Failed to write the file");
}
progress += err;
}
first = -1;
}
}
if (first != -1)
{
writeSize = (vbbm->vbCapacity - first) * sizeof(VBBMEntry);
progress = 0;
char* writePos = (char*)&storage[first];
while (progress < writeSize)
{
err = out->write(writePos + progress, writeSize - progress);
if (err < 0)
{
log_errno("VBBM::save()");
throw runtime_error("VBBM::save(): Failed to write the file");
}
progress += err;
}
}
#if 0
cout << "saving... nfiles=" << vbbm->nFiles << "\n";
for (i = 0; i < vbbm->nFiles; i++)
{
cout << "file " << i << " vboid=" << files[i].OID << " size=" << files[i].fileSize << endl;
}
#endif
}
uint32_t VBBM::addVBFileIfNotExists(OID_t vbOID)
{
int i;
/* Check if vbOID exists,
* add it if not, init to pos=0
*/
for (i = 0; i < vbbm->nFiles; i++)
if (files[i].OID == vbOID)
break;
if (i == vbbm->nFiles)
{
setCurrentFileSize();
growVBBM(true);
files[i].OID = vbOID;
files[i].fileSize = currentFileSize;
files[i].nextOffset = 0;
}
return i;
}
void VBBM::setCurrentFileSize()
{
config::Config* conf = config::Config::makeConfig();
string stmp;
int64_t ltmp;
currentFileSize = 2147483648ULL; // 2 GB default
try
{
stmp = conf->getConfig("VersionBuffer", "VersionBufferFileSize");
}
catch (std::exception& e)
{
log("VBBM: Missing a VersionBuffer/VersionBufferFileSize key in the config file");
throw invalid_argument("VBBM: Missing a VersionBuffer/VersionBufferFileSize key in the config file");
}
ltmp = conf->fromText(stmp.c_str());
if (ltmp < 1)
{
log("VBBM: Config error: VersionBuffer/VersionBufferFileSize must be positive");
throw invalid_argument("VBBM: Config error: VersionBuffer/VersionBufferFileSize must be positive");
}
else
{
currentFileSize = ltmp;
}
}
#ifdef BRM_DEBUG
// read lock
int VBBM::getShmid() const
{
return vbbmShmid;
}
#endif
} // namespace BRM