/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: vbbm.cpp 1926 2013-06-30 21:18:14Z wweeks $ * ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "blocksize.h" #include "rwlock.h" #include "brmtypes.h" #include "mastersegmenttable.h" #include "vss.h" #include "configcpp.h" #include "exceptclasses.h" #include "hasher.h" #include "cacheutils.h" #include "IDBDataFile.h" #include "IDBPolicy.h" #define VBBM_DLLEXPORT #include "vbbm.h" #undef VBBM_DLLEXPORT #define VBBM_MAGIC_V1 0x7b27ec13 #define VBBM_MAGIC_V2 0x1fb58c7a #define VBBM_CHUNK_SIZE 100 using namespace std; using namespace boost; using namespace idbdatafile; namespace BRM { VBBMEntry::VBBMEntry() { lbid = -1; verID = 0; vbOID = 0; vbFBO = 0; next = -1; } /*static*/ boost::mutex VBBMImpl::fInstanceMutex; boost::mutex VBBM::mutex; /*static*/ VBBMImpl* VBBMImpl::fInstance = 0; /*static*/ VBBMImpl* VBBMImpl::makeVBBMImpl(unsigned key, off_t size, bool readOnly) { boost::mutex::scoped_lock lk(fInstanceMutex); if (fInstance) { if (key != fInstance->fVBBM.key()) { BRMShmImpl newShm(key, size); fInstance->swapout(newShm); } idbassert(key == fInstance->fVBBM.key()); return fInstance; } fInstance = new VBBMImpl(key, size, readOnly); return fInstance; } VBBMImpl::VBBMImpl(unsigned key, off_t size, bool readOnly) : fVBBM(key, size, readOnly) { } VBBM::VBBM() { vbbm = NULL; currentVBBMShmkey = -1; vbbmShmid = 0; vbbmShminfo = NULL; r_only = false; fPVBBMImpl = 0; currentFileSize = 0; } VBBM::~VBBM() { } void VBBM::initShmseg(int nFiles) { // VBFileMetadata *newfiles; int* newBuckets; VBBMEntry* newStorage; int i; char* shmseg; vbbm->vbCapacity = VBSTORAGE_INITIAL_SIZE / sizeof(VBBMEntry); vbbm->vbCurrentSize = 0; vbbm->vbLWM = 0; vbbm->numHashBuckets = VBTABLE_INITIAL_SIZE / sizeof(int); shmseg = reinterpret_cast(vbbm); // newfiles = reinterpret_cast // (&shmseg[sizeof(VBShmsegHeader)]); newBuckets = reinterpret_cast(&shmseg[sizeof(VBShmsegHeader) + nFiles * sizeof(VBFileMetadata)]); newStorage = reinterpret_cast( &shmseg[sizeof(VBShmsegHeader) + nFiles * sizeof(VBFileMetadata) + vbbm->numHashBuckets * sizeof(int)]); setCurrentFileSize(); vbbm->nFiles = nFiles; for (i = 0; i < vbbm->numHashBuckets; i++) newBuckets[i] = -1; for (i = 0; i < vbbm->vbCapacity; i++) newStorage[i].lbid = -1; } // ported from ExtentMap void VBBM::lock(OPS op) { char* shmseg; if (op == READ) { vbbmShminfo = mst.getTable_read(MasterSegmentTable::VBBMSegment); mutex.lock(); } else vbbmShminfo = mst.getTable_write(MasterSegmentTable::VBBMSegment); // this means that either the VBBM isn't attached or that it was resized if (currentVBBMShmkey != vbbmShminfo->tableShmkey) { if (vbbm != NULL) { vbbm = NULL; } if (vbbmShminfo->allocdSize == 0) { if (op == READ) { mutex.unlock(); mst.getTable_upgrade(MasterSegmentTable::VBBMSegment); if (vbbmShminfo->allocdSize == 0) { try { growVBBM(); } catch (...) { release(WRITE); throw; } } mst.getTable_downgrade(MasterSegmentTable::VBBMSegment); } else { try { growVBBM(); } catch (...) { release(WRITE); throw; } } } else { currentVBBMShmkey = vbbmShminfo->tableShmkey; fPVBBMImpl = VBBMImpl::makeVBBMImpl(currentVBBMShmkey, 0); idbassert(fPVBBMImpl); if (r_only) fPVBBMImpl->makeReadOnly(); vbbm = fPVBBMImpl->get(); shmseg = reinterpret_cast(vbbm); files = reinterpret_cast(&shmseg[sizeof(VBShmsegHeader)]); hashBuckets = reinterpret_cast(&shmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]); storage = reinterpret_cast( &shmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) + vbbm->numHashBuckets * sizeof(int)]); if (op == READ) mutex.unlock(); } } else if (op == READ) mutex.unlock(); } // ported from ExtentMap void VBBM::release(OPS op) { if (op == READ) mst.releaseTable_read(MasterSegmentTable::VBBMSegment); else mst.releaseTable_write(MasterSegmentTable::VBBMSegment); } // assumes write lock is held // Right now, adding a file and growing are mutually exclusive ops. void VBBM::growVBBM(bool addAFile) { int allocSize; int nFiles = -1; key_t newshmkey; char* newshmseg; if (vbbmShminfo->allocdSize == 0) { if (addAFile) nFiles = 1; else nFiles = 0; allocSize = (sizeof(VBShmsegHeader) + (nFiles * sizeof(VBFileMetadata)) + VBSTORAGE_INITIAL_SIZE + VBTABLE_INITIAL_SIZE); } else { if (!addAFile) allocSize = vbbmShminfo->allocdSize + VBBM_INCREMENT; else { vbbm->nFiles++; allocSize = vbbmShminfo->allocdSize + sizeof(VBFileMetadata); } } newshmkey = chooseShmkey(); if (fPVBBMImpl) { BRMShmImpl newShm(newshmkey, allocSize); newshmseg = static_cast(newShm.fMapreg.get_address()); memset(newshmseg, 0, allocSize); if (vbbm != NULL) { VBShmsegHeader* tmp = reinterpret_cast(newshmseg); tmp->vbCapacity = vbbm->vbCapacity; tmp->numHashBuckets = vbbm->numHashBuckets; if (!addAFile) { tmp->vbCapacity += VBSTORAGE_INCREMENT / sizeof(VBBMEntry); tmp->numHashBuckets += VBTABLE_INCREMENT / sizeof(int); } tmp->vbLWM = 0; copyVBBM(tmp); } undoRecords.clear(); fPVBBMImpl->swapout(newShm); } else { fPVBBMImpl = VBBMImpl::makeVBBMImpl(newshmkey, allocSize); newshmseg = reinterpret_cast(fPVBBMImpl->get()); memset(newshmseg, 0, allocSize); } vbbm = fPVBBMImpl->get(); if (vbbmShminfo->allocdSize == 0) // this means the shmseg was created by this call initShmseg(nFiles); vbbmShminfo->tableShmkey = currentVBBMShmkey = newshmkey; vbbmShminfo->allocdSize = allocSize; if (r_only) fPVBBMImpl->makeReadOnly(); files = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader)]); hashBuckets = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]); storage = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) + vbbm->numHashBuckets * sizeof(int)]); } void VBBM::growForLoad(int count) { int allocSize; int nFiles; key_t newshmkey; char* newshmseg; int i; if (vbbm) nFiles = vbbm->nFiles; else nFiles = 0; if (count < VBSTORAGE_INITIAL_COUNT) count = VBSTORAGE_INITIAL_COUNT; // round up to next normal increment point out of paranoia. if (count % VBSTORAGE_INCREMENT_COUNT) count = ((count / VBSTORAGE_INCREMENT_COUNT) + 1) * VBSTORAGE_INCREMENT_COUNT; allocSize = VBBM_SIZE(nFiles, count); newshmkey = chooseShmkey(); if (fPVBBMImpl) { BRMShmImpl newShm(newshmkey, allocSize); newshmseg = static_cast(newShm.fMapreg.get_address()); // copy the file meta to the new segment memcpy((char*)&newshmseg[sizeof(VBShmsegHeader)], files, sizeof(VBFileMetadata) * nFiles); fPVBBMImpl->swapout(newShm); } else { fPVBBMImpl = VBBMImpl::makeVBBMImpl(newshmkey, allocSize); } vbbm = fPVBBMImpl->get(); vbbm->nFiles = nFiles; vbbm->vbCapacity = count; vbbm->vbLWM = 0; vbbm->numHashBuckets = count / 4; vbbmShminfo->tableShmkey = currentVBBMShmkey = newshmkey; vbbmShminfo->allocdSize = allocSize; newshmseg = (char*)vbbm; files = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader)]); hashBuckets = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]); storage = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) + vbbm->numHashBuckets * sizeof(int)]); for (i = 0; i < vbbm->numHashBuckets; i++) hashBuckets[i] = -1; for (i = 0; i < vbbm->vbCapacity; i++) storage[i].lbid = -1; undoRecords.clear(); } // assumes write lock is held and the src is vbbm // and that dest->{numHashBuckets, vbCapacity, vbLWM} have been set. void VBBM::copyVBBM(VBShmsegHeader* dest) { int i; int* newHashtable; VBBMEntry* newStorage; VBFileMetadata* newFiles; char* cDest = reinterpret_cast(dest); // copy metadata dest->nFiles = vbbm->nFiles; dest->vbCurrentSize = vbbm->vbCurrentSize; newFiles = reinterpret_cast(&cDest[sizeof(VBShmsegHeader)]); newHashtable = reinterpret_cast(&cDest[sizeof(VBShmsegHeader) + dest->nFiles * sizeof(VBFileMetadata)]); newStorage = reinterpret_cast(&cDest[sizeof(VBShmsegHeader) + dest->nFiles * sizeof(VBFileMetadata) + dest->numHashBuckets * sizeof(int)]); memcpy(newFiles, files, sizeof(VBFileMetadata) * vbbm->nFiles); // initialize new storage & hash for (i = 0; i < dest->numHashBuckets; i++) newHashtable[i] = -1; for (i = 0; i < dest->vbCapacity; i++) newStorage[i].lbid = -1; // walk the storage & re-hash all entries; for (i = 0; i < vbbm->vbCurrentSize; i++) if (storage[i].lbid != -1) { _insert(storage[i], dest, newHashtable, newStorage, true); // confirmChanges(); } } key_t VBBM::chooseShmkey() const { int fixedKeys = 1; key_t ret; if (vbbmShminfo->tableShmkey + 1 == (key_t)(fShmKeys.KEYRANGE_VBBM_BASE + fShmKeys.KEYRANGE_SIZE - 1) || (unsigned)vbbmShminfo->tableShmkey < fShmKeys.KEYRANGE_VBBM_BASE) ret = fShmKeys.KEYRANGE_VBBM_BASE + fixedKeys; else ret = vbbmShminfo->tableShmkey + 1; return ret; } // write lock void VBBM::insert(LBID_t lbid, VER_t verID, OID_t vbOID, uint32_t vbFBO, bool loading) { VBBMEntry entry; #ifdef BRM_DEBUG int i; if (lbid < 0) { log("VBBM::insert(): lbid must be >= 0", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::insert(): lbid must be >= 0"); } if (verID < 0) { log("VBBM::insert(): verID must be >= 0", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::insert(): verID must be >= 0"); } for (i = 0; i < vbbm->nFiles; i++) if (vbOID == files[i].OID) break; if (i == vbbm->nFiles) { log("VBBM::insert(): vbOID must be the OID of a Version Buffer file", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::insert(): vbOID must be the OID of a Version Buffer file"); } if (vbFBO > (files[i].fileSize / BLOCK_SIZE) || vbFBO < 0) { log("VBBM::insert(): vbFBO is out of bounds for that vbOID", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::insert(): vbFBO is out of bounds for that vbOID"); } #endif entry.lbid = lbid; entry.verID = verID; entry.vbOID = vbOID; entry.vbFBO = vbFBO; // check for resize if (vbbm->vbCurrentSize == vbbm->vbCapacity) growVBBM(); _insert(entry, vbbm, hashBuckets, storage, loading); if (!loading) makeUndoRecord(&vbbm->vbCurrentSize, sizeof(vbbm->vbCurrentSize)); vbbm->vbCurrentSize++; } // assumes write lock is held and that it is properly sized already void VBBM::_insert(VBBMEntry& e, VBShmsegHeader* dest, int* destHash, VBBMEntry* destStorage, bool loading) { int hashIndex, cHashlen = sizeof(LBID_t) + sizeof(VER_t), insertIndex; char* cHash = (char*)alloca(cHashlen); utils::Hasher hasher; memcpy(cHash, &e.lbid, sizeof(LBID_t)); memcpy(&cHash[sizeof(LBID_t)], &e.verID, sizeof(VER_t)); hashIndex = hasher(cHash, cHashlen) % dest->numHashBuckets; insertIndex = dest->vbLWM; while (destStorage[insertIndex].lbid != -1) { insertIndex++; #ifdef BRM_DEBUG if (insertIndex == dest->vbCapacity) { log("VBBM:_insert(): There are no empty entries. Possibly bad resize condition.", logging::LOG_TYPE_DEBUG); throw logic_error("VBBM:_insert(): There are no empty entries. Possibly bad resize condition."); } #endif } if (!loading) { makeUndoRecord(dest, sizeof(VBShmsegHeader)); makeUndoRecord(&destStorage[insertIndex], sizeof(VBBMEntry)); makeUndoRecord(&destHash[hashIndex], sizeof(int)); } dest->vbLWM = insertIndex; e.next = destHash[hashIndex]; destStorage[insertIndex] = e; destHash[hashIndex] = insertIndex; } // assumes read lock is held int VBBM::lookup(LBID_t lbid, VER_t verID, OID_t& oid, uint32_t& fbo) const { int index, prev, bucket; //#ifdef BRM_DEBUG if (lbid < 0) { log("VBBM::lookup(): lbid must be >= 0", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::lookup(): lbid must be >= 0"); } if (verID < 0) { log("VBBM::lookup(): verID must be > 1)", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::lookup(): verID must be > 1)"); } //#endif index = getIndex(lbid, verID, prev, bucket); if (index == -1) return -1; oid = storage[index].vbOID; fbo = storage[index].vbFBO; return 0; } // assumes write lock void VBBM::getBlocks(int num, OID_t vbOID, vector& freeRanges, VSS& vss, bool flushPMCache) { int blocksLeftInFile, blocksGathered = 0, i; uint32_t fileIndex; uint32_t firstFBO, lastFBO; VBRange range; vector::iterator it; vector flushList; freeRanges.clear(); fileIndex = addVBFileIfNotExists(vbOID); /* for (i = 0; i < vbbm->nFiles; i++) { cout << "file " << i << " vbOID=" << files[i].OID << " size=" << files[i].fileSize << endl; } */ if ((uint32_t)num > files[fileIndex].fileSize / BLOCK_SIZE) { cout << "num = " << num << " filesize = " << files[fileIndex].fileSize << endl; log("VBBM::getBlocks(): num is larger than the size of the version buffer", logging::LOG_TYPE_DEBUG); throw logging::VBBMBufferOverFlowExcept( "VBBM::getBlocks(): num is larger than the size of the version buffer"); } while ((vbbm->vbCurrentSize + num) > vbbm->vbCapacity) { growVBBM(); // cout << " requested num = " << num << " and Growing vbbm ... " << endl; } while (blocksGathered < num) { blocksLeftInFile = (files[fileIndex].fileSize - files[fileIndex].nextOffset) / BLOCK_SIZE; int blocksLeft = num - blocksGathered; range.vbOID = files[fileIndex].OID; range.vbFBO = files[fileIndex].nextOffset / BLOCK_SIZE; range.size = (blocksLeftInFile >= blocksLeft ? blocksLeft : blocksLeftInFile); makeUndoRecord(&files[fileIndex], sizeof(VBFileMetadata)); if (range.size == (uint32_t)blocksLeftInFile) files[fileIndex].nextOffset = 0; else files[fileIndex].nextOffset += range.size * BLOCK_SIZE; blocksGathered += range.size; freeRanges.push_back(range); } // age the returned blocks out of the VB for (it = freeRanges.begin(); it != freeRanges.end(); it++) { uint32_t firstChunk, lastChunk; vbOID = it->vbOID; firstFBO = it->vbFBO; lastFBO = it->vbFBO + it->size - 1; /* Age out at least 100 blocks at a time to reduce the # of times we have to do it. * How to detect when it needs to be done and when it doesn't? * * Split VB space into 100-block chunks. When a chunk boundary is crossed, * clear the whole chunk. */ firstChunk = firstFBO / VBBM_CHUNK_SIZE; lastChunk = lastFBO / VBBM_CHUNK_SIZE; // if the current range falls in the middle of a chunk and doesn't span chunks, // there's nothing to do b/c the chunk is assumed to have been cleared already if (((firstFBO % VBBM_CHUNK_SIZE) != 0) && (firstChunk == lastChunk)) continue; // round up to the next chunk boundaries if ((firstFBO % VBBM_CHUNK_SIZE) != 0) // this implies the range spans chunks firstFBO = (firstChunk + 1) * VBBM_CHUNK_SIZE; // the first FBO of the next chunk lastFBO = ((lastChunk + 1) * VBBM_CHUNK_SIZE - 1); // the last FBO of the last chunk // don't go past the end of the file if (lastFBO > files[fileIndex].fileSize / BLOCK_SIZE) lastFBO = files[fileIndex].fileSize / BLOCK_SIZE; // at this point [firstFBO, lastFBO] is the range to age out. // ugh, walk the whole vbbm looking for matches. for (i = 0; i < vbbm->vbCapacity; i++) if (storage[i].lbid != -1 && storage[i].vbOID == vbOID && storage[i].vbFBO >= firstFBO && storage[i].vbFBO <= lastFBO) { if (vss.isEntryLocked(storage[i].lbid, storage[i].verID)) { ostringstream msg; msg << "VBBM::getBlocks(): version buffer overflow. Increase VersionBufferFileSize. Overflow " "occurred in aged blocks. Requested NumBlocks:VbOid:vbFBO:lastFBO = " << num << ":" << vbOID << ":" << firstFBO << ":" << lastFBO << " lbid locked is " << storage[i].lbid << endl; log(msg.str(), logging::LOG_TYPE_CRITICAL); freeRanges.clear(); throw logging::VBBMBufferOverFlowExcept(msg.str()); } vss.removeEntry(storage[i].lbid, storage[i].verID, &flushList); removeEntry(storage[i].lbid, storage[i].verID); } } if (flushPMCache && !flushList.empty()) cacheutils::flushPrimProcAllverBlocks(flushList); } // read lock int VBBM::getIndex(LBID_t lbid, VER_t verID, int& prev, int& bucket) const { int cHashlen = sizeof(LBID_t) + sizeof(VER_t), currentIndex; char* cHash = (char*)alloca(cHashlen); VBBMEntry* listEntry; utils::Hasher hasher; memcpy(cHash, &lbid, sizeof(LBID_t)); memcpy(&cHash[sizeof(LBID_t)], &verID, sizeof(VER_t)); bucket = hasher(cHash, cHashlen) % vbbm->numHashBuckets; prev = -1; if (hashBuckets[bucket] == -1) return -1; currentIndex = hashBuckets[bucket]; while (currentIndex != -1) { listEntry = &storage[currentIndex]; if (listEntry->lbid == lbid && listEntry->verID == verID) return currentIndex; prev = currentIndex; currentIndex = listEntry->next; } return -1; } void VBBM::removeEntry(LBID_t lbid, VER_t verID) { int index, prev, bucket; #ifdef BRM_DEBUG if (lbid < 0) { log("VBBM::removeEntry(): lbid must be >= 0", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::removeEntry(): lbid must be >= 0"); } if (verID < 0) { log("VBBM::removeEntry(): verID must be >= 0", logging::LOG_TYPE_DEBUG); throw invalid_argument("VBBM::removeEntry(): verID must be >= 0"); } #endif index = getIndex(lbid, verID, prev, bucket); if (index == -1) { #ifdef BRM_DEBUG ostringstream ostr; ostr << "VBBM::removeEntry(): that entry doesn't exist lbid = " << lbid << " verID = " << verID << endl; log(ostr.str(), logging::LOG_TYPE_DEBUG); throw logic_error(ostr.str()); #else return; #endif } makeUndoRecord(&storage[index], sizeof(VBBMEntry)); storage[index].lbid = -1; if (prev != -1) { makeUndoRecord(&storage[prev], sizeof(VBBMEntry)); storage[prev].next = storage[index].next; } else { makeUndoRecord(&hashBuckets[bucket], sizeof(int)); hashBuckets[bucket] = storage[index].next; } makeUndoRecord(vbbm, sizeof(VBShmsegHeader)); vbbm->vbCurrentSize--; if (vbbm->vbLWM > index) vbbm->vbLWM = index; } // read lock int VBBM::size() const { #ifdef BRM_DEBUG int i, ret = 0; for (i = 0; i < vbbm->vbCapacity; i++) if (storage[i].lbid != -1) ret++; if (ret != vbbm->vbCurrentSize) { ostringstream ostr; ostr << "VBBM::checkConsistency(): actual size is " << ret << ", recorded size is " << vbbm->vbCurrentSize; log(ostr.str(), logging::LOG_TYPE_DEBUG); throw logic_error(ostr.str()); } return ret; #else return vbbm->vbCurrentSize; #endif } // read lock bool VBBM::hashEmpty() const { int i; for (i = 0; i < vbbm->numHashBuckets; i++) if (hashBuckets[i] != -1) return false; return true; } // write lock void VBBM::clear() { int allocSize; int newshmkey; int nFiles = -1; char* newshmseg; // save vars we need after the clear() boost::scoped_array newFiles(new VBFileMetadata[vbbm->nFiles]); memcpy(&newFiles[0], files, vbbm->nFiles * sizeof(VBFileMetadata)); setCurrentFileSize(); for (int i = 0; i < vbbm->nFiles; i++) { newFiles[i].fileSize = currentFileSize; newFiles[i].nextOffset = 0; } nFiles = vbbm->nFiles; allocSize = (sizeof(VBShmsegHeader) + (nFiles * sizeof(VBFileMetadata)) + VBSTORAGE_INITIAL_SIZE + VBTABLE_INITIAL_SIZE); // cout << "clear:: allocSize = " << allocSize << endl; newshmkey = chooseShmkey(); fPVBBMImpl->clear(newshmkey, allocSize); vbbm = fPVBBMImpl->get(); newshmseg = reinterpret_cast(vbbm); initShmseg(nFiles); vbbmShminfo->tableShmkey = currentVBBMShmkey = newshmkey; vbbmShminfo->allocdSize = allocSize; files = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader)]); hashBuckets = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata)]); storage = reinterpret_cast(&newshmseg[sizeof(VBShmsegHeader) + vbbm->nFiles * sizeof(VBFileMetadata) + vbbm->numHashBuckets * sizeof(int)]); memcpy(files, &newFiles[0], vbbm->nFiles * sizeof(VBFileMetadata)); } // read lock int VBBM::checkConsistency() const { /* Struct integrity tests 1: Verify that the recorded size matches the actual size 2: Verify there are no empty entries reachable from the hash table 3: Verify there are no empty entries below the LWM 4a: Make sure every VBBM entry points to a unique position in the VB 4b: Make sure every VBBM entry has unique LBID & VERID. */ int i, j, k; /* Test 1 is already implemented */ size(); /* Test 2 - no empty elements reachable from the hash table */ int nextElement; for (i = 0; i < vbbm->numHashBuckets; i++) { if (hashBuckets[i] != -1) for (nextElement = hashBuckets[i]; nextElement != -1; nextElement = storage[nextElement].next) if (storage[nextElement].lbid == -1) throw logic_error( "VBBM::checkConsistency(): an empty storage entry is reachable from the hash table"); } /* Test 3 - verify that there are no empty entries below the LWM */ for (i = 0; i < vbbm->vbLWM; i++) { if (storage[i].lbid == -1) { cerr << "VBBM: LWM=" << vbbm->vbLWM << " first empty entry=" << i << endl; throw logic_error("VBBM::checkConsistency(): LWM accounting error"); } } /* Test 4b - verify the uniqueness of the entries */ for (i = 0; i < vbbm->numHashBuckets; i++) if (hashBuckets[i] != -1) for (j = hashBuckets[i]; j != -1; j = storage[j].next) for (k = storage[j].next; k != -1; k = storage[k].next) if (storage[j].lbid == storage[k].lbid && storage[j].verID == storage[k].verID) { cerr << "VBBM: lbid=" << storage[j].lbid << " verID=" << storage[j].verID << endl; throw logic_error("VBBM::checkConsistency(): Duplicate entry found"); } /* Test 4a - verify the uniqueness of vbOID, vbFBO fields */ for (i = 0; i < vbbm->vbCapacity; i++) if (storage[i].lbid != -1) for (j = i + 1; j < vbbm->vbCapacity; j++) if (storage[j].lbid != -1) if (storage[j].vbOID == storage[i].vbOID && storage[j].vbFBO == storage[i].vbFBO) { cerr << "VBBM: lbid1=" << storage[i].lbid << " lbid2=" << storage[j].lbid << " verID1=" << storage[i].verID << " verID2=" << storage[j].verID << " share vbOID=" << storage[j].vbOID << " vbFBO=" << storage[j].vbFBO << endl; throw logic_error("VBBM::checkConsistency(): 2 VBBM entries share space in the VB"); } return 0; } void VBBM::setReadOnly() { r_only = true; } /* File Format (V1) VBBM V1 magic (32-bits) # of VBBM entries in capacity (32-bits) struct VBBMEntry * # These entries are considered optional to support going backward and forward between versions. nFiles (32-bits) currentFileIndex (32-bits) VBFileMetadata * nFiles */ /* File Format (V2): * * Version 2 magic (int) * number of used VBBM entries (numEntries) (int) * current number of VB files (nFiles) (int) * VBFileMetadata * nFiles * struct VBBMEntry * numEntries */ void VBBM::loadVersion2(IDBDataFile* in) { int vbbmEntries; int nFiles; int i; VBBMEntry entry; if (in->read((char*)&vbbmEntries, 4) != 4) { log_errno("VBBM::load()"); throw runtime_error("VBBM::load(): Failed to read entry number"); } if (in->read((char*)&nFiles, 4) != 4) { log_errno("VBBM::load()"); throw runtime_error("VBBM::load(): Failed to read file number"); } // Need to make clear() truncate the files section if (vbbm->nFiles > nFiles) vbbm->nFiles = nFiles; clear(); while (vbbm->nFiles < nFiles) growVBBM(true); // this allocates one file, doesn't grow the main storage growForLoad(vbbmEntries); const int nfileSize = sizeof(VBFileMetadata) * nFiles; if (in->read((char*)files, nfileSize) != nfileSize) { log_errno("VBBM::load()"); throw runtime_error("VBBM::load(): Failed to load vb file meta data"); } size_t readSize = vbbmEntries * sizeof(entry); char* readBuf = new char[readSize]; size_t progress = 0; int err; while (progress < readSize) { err = in->read(readBuf + progress, readSize - progress); if (err < 0) { log_errno("VBBM::load()"); throw runtime_error("VBBM::load(): Failed to load, check the critical log file"); } else if (err == 0) { log("VBBM::load(): Got early EOF"); throw runtime_error("VBBM::load(): Got early EOF"); } progress += err; } VBBMEntry* loadedEntries = (VBBMEntry*)readBuf; for (i = 0; i < vbbmEntries; i++) insert(loadedEntries[i].lbid, loadedEntries[i].verID, loadedEntries[i].vbOID, loadedEntries[i].vbFBO, true); } //#include "boost/date_time/posix_time/posix_time.hpp" // using namespace boost::posix_time; void VBBM::load(string filename) { int magic; const char* filename_p = filename.c_str(); scoped_ptr in( IDBDataFile::open(IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG), filename_p, "rb", 0)); // ptime time1, time2; // time1 = microsec_clock::local_time(); // cout << "loading the VBBM " << time1 << endl; if (!in) { log_errno("VBBM::load()"); throw runtime_error("VBBM::load(): Failed to open the file"); } int bytes = in->read((char*)&magic, 4); if (bytes != 4) { log("VBBM::load(): failed to read magic."); throw runtime_error("VBBM::load(): failed to read magic."); } switch (magic) { case VBBM_MAGIC_V2: loadVersion2(in.get()); break; default: log("VBBM::load(): Bad magic. Not a VBBM file?"); throw runtime_error("VBBM::load(): Bad magic. Not a VBBM file?"); } // time2 = microsec_clock::local_time(); // cout << "done loading " << time2 << " duration: " << time2-time1 << endl; } // read lock void VBBM::save(string filename) { int i; int var; const char* filename_p = filename.c_str(); scoped_ptr out(IDBDataFile::open(IDBPolicy::getType(filename_p, IDBPolicy::WRITEENG), filename_p, "wb", IDBDataFile::USE_VBUF)); if (!out) { log_errno("VBBM::save()"); throw runtime_error("VBBM::save(): Failed to open the file"); } var = VBBM_MAGIC_V2; [[maybe_unused]] int bytesWritten = 0; [[maybe_unused]] int bytesToWrite = 12; bytesWritten += out->write((char*)&var, 4); bytesWritten += out->write((char*)&vbbm->vbCurrentSize, 4); bytesWritten += out->write((char*)&vbbm->nFiles, 4); bytesWritten += out->write((char*)files, sizeof(VBFileMetadata) * vbbm->nFiles); bytesToWrite += sizeof(VBFileMetadata) * vbbm->nFiles; int first = -1, last = -1, err; size_t progress, writeSize; for (i = 0; i < vbbm->vbCapacity; i++) { if (storage[i].lbid != -1 && first == -1) first = i; else if (storage[i].lbid == -1 && first != -1) { last = i; writeSize = (last - first) * sizeof(VBBMEntry); progress = 0; char* writePos = (char*)&storage[first]; while (progress < writeSize) { err = out->write(writePos + progress, writeSize - progress); if (err < 0) { log_errno("VBBM::save()"); throw runtime_error("VBBM::save(): Failed to write the file"); } progress += err; } first = -1; } } if (first != -1) { writeSize = (vbbm->vbCapacity - first) * sizeof(VBBMEntry); progress = 0; char* writePos = (char*)&storage[first]; while (progress < writeSize) { err = out->write(writePos + progress, writeSize - progress); if (err < 0) { log_errno("VBBM::save()"); throw runtime_error("VBBM::save(): Failed to write the file"); } progress += err; } } #if 0 cout << "saving... nfiles=" << vbbm->nFiles << "\n"; for (i = 0; i < vbbm->nFiles; i++) { cout << "file " << i << " vboid=" << files[i].OID << " size=" << files[i].fileSize << endl; } #endif } uint32_t VBBM::addVBFileIfNotExists(OID_t vbOID) { int i; /* Check if vbOID exists, * add it if not, init to pos=0 */ for (i = 0; i < vbbm->nFiles; i++) if (files[i].OID == vbOID) break; if (i == vbbm->nFiles) { setCurrentFileSize(); growVBBM(true); files[i].OID = vbOID; files[i].fileSize = currentFileSize; files[i].nextOffset = 0; } return i; } void VBBM::setCurrentFileSize() { config::Config* conf = config::Config::makeConfig(); string stmp; int64_t ltmp; currentFileSize = 2147483648ULL; // 2 GB default try { stmp = conf->getConfig("VersionBuffer", "VersionBufferFileSize"); } catch (std::exception& e) { log("VBBM: Missing a VersionBuffer/VersionBufferFileSize key in the config file"); throw invalid_argument("VBBM: Missing a VersionBuffer/VersionBufferFileSize key in the config file"); } ltmp = conf->fromText(stmp.c_str()); if (ltmp < 1) { log("VBBM: Config error: VersionBuffer/VersionBufferFileSize must be positive"); throw invalid_argument("VBBM: Config error: VersionBuffer/VersionBufferFileSize must be positive"); } else { currentFileSize = ltmp; } } #ifdef BRM_DEBUG // read lock int VBBM::getShmid() const { return vbbmShmid; } #endif } // namespace BRM