/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016-2021 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: tdriver-dbrm2.cpp 1823 2013-01-21 14:13:09Z rdempsey $ * ****************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "brm.h" #include "extentmap.h" #include "IDBPolicy.h" //#define BRM_VERBOSE 1 using namespace BRM; using namespace std; int threadStop; int oid = 1; uint64_t opCount = 0; LBID_t lbidCounter = 0; VER_t nextTxnID = 1; u_int64_t vbOffset = 0; pthread_mutex_t pthreadMutex; const std::vector colWidthsAvailable = {1, 2, 4, 8, 16}; const DBRootT dbroot = 1; struct Range { LBID_t start, end, nextBlock; VER_t txnID; Range() { start = end = nextBlock = 0; txnID = 0; } }; struct EMEntries { LBID_t LBIDstart; uint32_t size; OID_t OID; uint32_t FBO; uint32_t HWM; uint32_t secondHWM; int32_t txnID; DBRootT dbroot; PartitionNumberT partNum; SegmentT segNum; struct EMEntries* next; EMEntries() : HWM(0), secondHWM(0), txnID(0), next(nullptr) { } EMEntries(const uint32_t aSize, const OID_t aOid, const uint32_t aFbo, const LBID_t aLBIDStart, EMEntries* aNext) : LBIDstart(aLBIDStart), size(aSize), OID(aOid), FBO(aFbo), HWM(0), secondHWM(0), txnID(0), next(aNext) { } EMEntries(const uint32_t aSize, const OID_t aOid, const uint32_t aFbo, const LBID_t aLBIDStart, EMEntries* aNext, const DBRootT aDbroot, const PartitionNumberT aPartNum, const SegmentT aSegNum) : LBIDstart(aLBIDStart), size(aSize), OID(aOid), FBO(aFbo), HWM(0), secondHWM(0), txnID(0), dbroot(aDbroot), partNum(aPartNum), segNum(aSegNum), next(aNext) { } }; /* static void* BRMRunner_2(void* arg) { vector copyList, copiedList, committedList; vector::iterator rit; vector writtenList; vector::iterator lit; pthread_mutex_t listMutex; int op; uint32_t randstate; DBRM* brm; struct timeval tv; VER_t txnID; pthread_mutex_init(&listMutex, NULL); gettimeofday(&tv, NULL); randstate = static_cast(tv.tv_usec); brm = new DBRM(); while (!threadStop) { op = rand_r(&randstate) % 9; switch (op) { case 0: // beginVBCopy { int blockCount, size, err; Range newEntry; VBRange_v vbRanges; VBRange_v::iterator vit; LBIDRange_v ranges; LBIDRange range; size = rand_r(&randstate) % 10000; pthread_mutex_lock(&pthreadMutex); newEntry.start = lbidCounter; lbidCounter += size; txnID = nextTxnID++; pthread_mutex_unlock(&pthreadMutex); newEntry.nextBlock = newEntry.start; newEntry.end = newEntry.start + size; range.start = newEntry.start; range.size = size; err = brm->beginVBCopy(txnID, dbroot, ranges, vbRanges); CPPUNIT_ASSERT(err == 0); for (blockCount = 0, vit = vbRanges.begin(); vit != vbRanges.end(); vit++) blockCount += (*vit).size; CPPUNIT_ASSERT(blockCount == size); pthread_mutex_lock(&listMutex); copyList.push_back(newEntry); pthread_mutex_unlock(&listMutex); err = brm->beginVBCopy(txnID, dbroot, ranges, vbRanges); CPPUNIT_ASSERT(err == -1); break; } case 1: // writeVBEntry { int randIndex; Range* entry; pthread_mutex_lock(&listMutex); if (copyList.size() == 0) break; randIndex = rand_r(&randstate) % copyList.size(); entry = &(copyList[randIndex]); entry->nextBlock++; txnID = entry->txnID; break; } default: cerr << "not finished yet" << endl; } } return NULL; } */ /* static void* BRMRunner_1(void* arg) { // keep track of LBID ranges allocated here and // randomly allocate, lookup, delete, get/set HWM, and // destroy the EM object. #ifdef BRM_VERBOSE int threadNum = reinterpret_cast(arg); #endif int op, listSize = 0, i; uint32_t randstate; struct EMEntries* head = NULL, *tmp; struct timeval tv; DBRM* brm; ExtentMap em; vector lbids; LBID_t lbid; uint32_t colWidth; PartitionNumberT partNum; SegmentT segmentNum; execplan::CalpontSystemCatalog::ColDataType colDataType; #ifdef BRM_VERBOSE cerr << "thread number " << threadNum << " started." << endl; #endif gettimeofday(&tv, NULL); randstate = static_cast(tv.tv_usec); brm = new DBRM(); while (!threadStop) { auto randNumber = rand_r(&randstate); op = randNumber % 10; colWidth = colWidthsAvailable[randNumber % colWidthsAvailable.size()]; partNum = randNumber % std::numeric_limits::max(); segmentNum = randNumber % std::numeric_limits::max(); colDataType = (execplan::CalpontSystemCatalog::ColDataType) (randNumber % (int)execplan::CalpontSystemCatalog::ColDataType::TIMESTAMP); #ifdef BRM_VERBOSE cerr << "next op is " << op << endl; #endif switch (op) { case 0: //allocate space for a new file { struct EMEntries* newEm; size_t size = rand_r(&randstate) % 102399 + 1; int entries, OID, allocdSize, err; uint32_t startBlockOffset; pthread_mutex_lock(&pthreadMutex); OID = oid++; opCount++; pthread_mutex_unlock(&pthreadMutex); lbids.clear(); for (size_t i = 0; i < size; ++i) { err = brm->createColumnExtent_DBroot(OID, colWidth, dbroot, partNum, segmentNum, colDataType, lbid, allocdSize, startBlockOffset); CPPUNIT_ASSERT(err == 0); lbids.push_back(lbid); } entries = size / brm->getExtentSize(); if ((size % brm->getExtentSize()) != 0) entries++; if ((uint32_t)entries != lbids.size()) cerr << "entries = " << entries << " lbids.size = " << lbids.size() << endl; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0 ; i < entries; i++) { newEm = new EMEntries(brm->getExtentSize(), OID, brm->getExtentSize(), lbids[i], head, dbroot, partNum, segmentNum); head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created new space for OID " << newEm->OID << endl; #endif em.checkConsistency(); break; } case 1: //allocate space for an existing file { if (listSize == 0) break; struct EMEntries* newEm, *tmp; int size = rand_r(&randstate) % 102399 + 1; int fileRand = rand_r(&randstate) % listSize; int i, lastExtent, blockEnd, oid; int tmpHWM, entries, allocdSize, err; uint32_t startBlockOffset; vector lbids; LBID_t lbid; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next) { if (tmp->OID != oid) continue; tmpHWM = tmp->HWM; blockEnd = tmp->FBO + tmp->size; if (lastExtent < blockEnd) lastExtent = blockEnd; } err = brm->createColumnExtentExactFile(oid, colWidth, dbroot, partNum, segmentNum, colDataType, lbid, allocdSize, startBlockOffset); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); entries = size / brm->getExtentSize(); if ((size % brm->getExtentSize()) != 0) entries++; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0; i < entries; i++) { newEm = new EMEntries((i != entries) ? brm->getExtentSize() : size % brm->getExtentSize(), oid, lastExtent + (i * brm->getExtentSize()), lbids[i], head, dbroot, partNum, segmentNum); newEm->HWM = tmpHWM; head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created another extent for OID " << newEm->OID << endl; #endif em.checkConsistency(); break; } case 2: //delete an OID { if (listSize == 0) break; struct EMEntries* tmp, *prev; int fileRand = rand_r(&randstate) % listSize; int i, oid, err; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; err = brm->deleteOID(oid); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); for (tmp = head; tmp != NULL;) { if (tmp->OID == oid) { if (tmp == head) { head = head->next; delete tmp; tmp = head; } else { prev->next = tmp->next; delete tmp; tmp = prev->next; } listSize--; } else { prev = tmp; tmp = tmp->next; } } #ifdef BRM_VERBOSE cerr << "deleted OID " << oid << endl; #endif em.checkConsistency(); break; } case 3: //lookup by LBID { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset, oid; struct EMEntries* tmp; LBID_t target; uint32_t fbo; DBRootT localDbroot; PartitionNumberT localPartNum; SegmentT localSegmentNum; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; target = tmp->LBIDstart + offset; err = brm->lookupLocal(target, 0, false, oid, localDbroot, localPartNum, localSegmentNum, fbo); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); #ifdef BRM_VERBOSE cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl; cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(oid == tmp->OID); CPPUNIT_ASSERT(fbo == offset + tmp->FBO); em.checkConsistency(); break; } case 4: //lookup by OID, FBO { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, oid, err, offset; struct EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; oid = tmp->OID; err = brm->lookupLocal(oid, partNum, segmentNum, offset + tmp->FBO, lbid); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); #ifdef BRM_VERBOSE cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl; cerr << " lbid should be " << tmp->LBIDstart + offset << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(lbid == static_cast(tmp->LBIDstart + offset)); em.checkConsistency(); break; } case 5: //getHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, status; struct EMEntries* tmp; uint32_t hwm; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; err = brm->getLocalHWM(tmp->OID, partNum, segmentNum, hwm, status); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); #ifdef BRM_VERBOSE cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl; #endif CPPUNIT_ASSERT(hwm == tmp->HWM); em.checkConsistency(); break; } case 6: //setHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, hwm, oid, err; struct EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; oid = tmp->OID; hwm = rand_r(&randstate) % (tmp->FBO + brm->getExtentSize()); err = brm->setLocalHWM(oid, tmp->partNum, tmp->segNum, hwm); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); for (tmp = head; tmp != NULL; tmp = tmp->next) if (tmp->OID == oid) tmp->HWM = hwm; #ifdef BRM_VERBOSE cerr << "setHWM of OID " << oid << " to " << hwm << endl; #endif em.checkConsistency(); break; } case 7: // renew this EM object { delete brm; brm = new DBRM(); #ifdef BRM_VERBOSE cerr << "got a new BRM instance" << endl; #endif em.checkConsistency(); break; } #if 0 case 8: //getBulkInsertVars { if (listSize == 0) break; HWM_t hwm; VER_t txnID; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; lbid = tmp->LBIDstart + offset; err = brm->getBulkInsertVars(lbid, hwm, txnID); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(hwm == tmp->secondHWM); CPPUNIT_ASSERT(txnID == tmp->txnID); break; } case 9: //setBulkInsertVars { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; tmp->secondHWM = rand_r(&randstate) % MAXINT; tmp->txnID = rand_r(&randstate) % MAXINT; err = brm->setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); break; } #endif default: break; } } delete brm; while (head != NULL) { tmp = head->next; delete head; head = tmp; } #ifdef BRM_VERBOSE cerr << "thread " << threadNum << " exiting" << endl; #endif return NULL; } */ DBRM brm_si; /* static void* BRMRunner_si(void* arg) { // keep track of LBID ranges allocated here and // randomly allocate, lookup, delete, get/set HWM, and // destroy the EM object. #ifdef BRM_VERBOSE int threadNum = reinterpret_cast(arg); #endif int op, listSize = 0, i; uint32_t randstate; struct EMEntries* head = NULL, *tmp; struct timeval tv; ExtentMap em; vector lbids; #ifdef BRM_VERBOSE cerr << "thread number " << threadNum << " started." << endl; #endif gettimeofday(&tv, NULL); randstate = static_cast(tv.tv_usec); while (!threadStop) { op = rand_r(&randstate) % 10; #ifdef BRM_VERBOSE cerr << "next op is " << op << endl; #endif switch (op) { case 0: //allocate space for a new file { struct EMEntries* newEm; int size = rand_r(&randstate) % 102399 + 1; int entries, OID, allocdSize, err; pthread_mutex_lock(&pthreadMutex); OID = oid++; opCount++; pthread_mutex_unlock(&pthreadMutex); err = brm_si.createExtent(size, OID, lbids, allocdSize); CPPUNIT_ASSERT(err == 0); entries = size / brm_si.getExtentSize(); if ((size % brm_si.getExtentSize()) != 0) entries++; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0 ; i < entries; i++) { newEm = new EMEntries(); newEm->size = brm_si.getExtentSize(); newEm->OID = OID; newEm->FBO = i * brm_si.getExtentSize(); newEm->LBIDstart = lbids[i]; newEm->next = head; head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created new space for OID " << newEm->OID << endl; #endif em.checkConsistency(); break; } case 1: //allocate space for an existing file { if (listSize == 0) break; struct EMEntries* newEm, *tmp; int size = rand_r(&randstate) % 102399 + 1; int fileRand = rand_r(&randstate) % listSize; int i, lastExtent, blockEnd, oid; int tmpHWM, entries, allocdSize, err; vector lbids; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next) { if (tmp->OID != oid) continue; tmpHWM = tmp->HWM; blockEnd = tmp->FBO + tmp->size; if (lastExtent < blockEnd) lastExtent = blockEnd; } err = brm_si.createExtent(size, oid, lbids, allocdSize); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); entries = size / brm_si.getExtentSize(); if ((size % brm_si.getExtentSize()) != 0) entries++; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0; i < entries; i++) { newEm = new EMEntries(); if (i != entries) newEm->size = brm_si.getExtentSize(); else newEm->size = size % brm_si.getExtentSize(); newEm->OID = oid; newEm->FBO = lastExtent + (i * brm_si.getExtentSize()); newEm->LBIDstart = lbids[i]; newEm->HWM = tmpHWM; newEm->next = head; head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created another extent for OID " << newEm->OID << endl; #endif em.checkConsistency(); break; } case 2: //delete an OID { if (listSize == 0) break; struct EMEntries* tmp, *prev; int fileRand = rand_r(&randstate) % listSize; int i, oid, err; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; err = brm_si.deleteOID(oid); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); for (tmp = head; tmp != NULL;) { if (tmp->OID == oid) { if (tmp == head) { head = head->next; delete tmp; tmp = head; } else { prev->next = tmp->next; delete tmp; tmp = prev->next; } listSize--; } else { prev = tmp; tmp = tmp->next; } } #ifdef BRM_VERBOSE cerr << "deleted OID " << oid << endl; #endif em.checkConsistency(); break; } case 3: //lookup by LBID { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset, oid; struct EMEntries* tmp; LBID_t target; uint32_t fbo; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; target = tmp->LBIDstart + offset; err = brm_si.lookup(target, 0, false, oid, fbo); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); #ifdef BRM_VERBOSE cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl; cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(oid == tmp->OID); CPPUNIT_ASSERT(fbo == offset + tmp->FBO); em.checkConsistency(); break; } case 4: //lookup by OID, FBO { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, oid, err, offset; struct EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; oid = tmp->OID; err = brm_si.lookup(oid, offset + tmp->FBO, lbid); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); #ifdef BRM_VERBOSE cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl; cerr << " lbid should be " << tmp->LBIDstart + offset << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(lbid == static_cast(tmp->LBIDstart + offset)); em.checkConsistency(); break; } case 5: //getHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err; struct EMEntries* tmp; uint32_t hwm; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; err = brm_si.getHWM(tmp->OID, hwm); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); #ifdef BRM_VERBOSE cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl; #endif CPPUNIT_ASSERT(hwm == tmp->HWM); em.checkConsistency(); break; } case 6: //setHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, hwm, oid, err; struct EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; oid = tmp->OID; hwm = rand_r(&randstate) % (tmp->FBO + brm_si.getExtentSize()); err = brm_si.setHWM(oid, hwm); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); for (tmp = head; tmp != NULL; tmp = tmp->next) if (tmp->OID == oid) tmp->HWM = hwm; #ifdef BRM_VERBOSE cerr << "setHWM of OID " << oid << " to " << hwm << endl; #endif em.checkConsistency(); break; } case 7: //getBulkInsertVars { if (listSize == 0) break; HWM_t hwm; VER_t txnID; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; lbid = tmp->LBIDstart + offset; err = brm_si.getBulkInsertVars(lbid, hwm, txnID); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(hwm == tmp->secondHWM); CPPUNIT_ASSERT(txnID == tmp->txnID); break; } case 8: //setBulkInsertVars { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; tmp->secondHWM = rand_r(&randstate) % MAXINT; tmp->txnID = rand_r(&randstate) % MAXINT; err = brm_si.setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID); pthread_mutex_lock(&pthreadMutex); opCount++; pthread_mutex_unlock(&pthreadMutex); CPPUNIT_ASSERT(err == 0); break; } default: break; } } while (head != NULL) { tmp = head->next; delete head; head = tmp; } #ifdef BRM_VERBOSE cerr << "thread " << threadNum << " exiting" << endl; #endif return NULL; } */ static void* EMRunner(void* arg) { // keep track of LBID ranges allocated here and // randomly allocate, lookup, delete, get/set HWM, and // destroy the EM object. #ifdef BRM_VERBOSE uint64_t threadNum = (uint64_t)arg; #endif int op, listSize = 0; uint32_t randstate; struct EMEntries* head = NULL, *tmp; struct timeval tv; ExtentMap* em; LBID_t lbid; uint32_t colWidth; PartitionNumberT partNum; SegmentT segmentNum; execplan::CalpontSystemCatalog::ColDataType colDataType; #ifdef BRM_VERBOSE cerr << "thread number " << threadNum << " started." << endl; #endif gettimeofday(&tv, NULL); randstate = static_cast(tv.tv_usec); //pthread_mutex_lock(&pthreadMutex); em = new ExtentMap(); //pthread_mutex_unlock(&pthreadMutex); while (!threadStop) { auto randNumber = rand_r(&randstate); op = randNumber % 10; colWidth = colWidthsAvailable[randNumber % colWidthsAvailable.size()]; partNum = randNumber % std::numeric_limits::max(); segmentNum = randNumber % std::numeric_limits::max(); colDataType = (execplan::CalpontSystemCatalog::ColDataType) (randNumber % (int)execplan::CalpontSystemCatalog::ColDataType::TIMESTAMP); #ifdef BRM_VERBOSE cerr << "next op is " << op << endl; #endif switch (op) { case 0: //allocate space for a new file { vector emEntriesVec; struct EMEntries* newEm; size_t numberOfExtents = randNumber % 4 + 1; int OID; uint32_t startBlockOffset; pthread_mutex_lock(&pthreadMutex); OID = oid++; pthread_mutex_unlock(&pthreadMutex); em->getExtents(OID, emEntriesVec, false, false, true); size_t extentsNumberBefore = emEntriesVec.size(); int allocdsize; for (size_t i = 0; i < numberOfExtents; ++i) { em->createColumnExtent_DBroot(OID, colWidth, dbroot, colDataType, partNum, segmentNum, lbid, allocdsize, startBlockOffset); em->confirmChanges(); newEm = new EMEntries(allocdsize, OID, startBlockOffset, lbid, head, dbroot, partNum, segmentNum); head = newEm; listSize++; } emEntriesVec.clear(); em->getExtents(OID, emEntriesVec, false, false, true); size_t extentsNumberAfter = emEntriesVec.size(); CPPUNIT_ASSERT(extentsNumberBefore + numberOfExtents == extentsNumberAfter); #ifdef BRM_VERBOSE cerr << "created new space for OID " << newEm->OID << endl; #endif //em->checkConsistency(); break; } /* case 1: //allocate space for an existing file { if (listSize == 0) break; struct EMEntries* newEm, *tmp; size_t size = rand_r(&randstate) % 10; int fileRand = rand_r(&randstate) % listSize; int i, lastExtent, blockEnd, oid; int tmpHWM, entries, allocdSize; uint32_t startBlockOffset; lbids.clear(); for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next) { if (tmp->OID != oid) continue; tmpHWM = tmp->HWM; blockEnd = tmp->FBO + tmp->size; if (lastExtent < blockEnd) lastExtent = blockEnd; } for (size_t i = 0; i < size; ++i) { em->createColumnExtent_DBroot(oid, colWidth, dbroot, colDataType, partNum, segmentNum, lbid, allocdSize, startBlockOffset); em->confirmChanges(); lbids.push_back(lbid); } //em->createExtent(size, oid, lbids, allocdSize); //em->confirmChanges(); entries = size / em->getExtentSize(); if ((size % em->getExtentSize()) != 0) entries++; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0; i < entries; i++) { newEm = new EMEntries((i != entries) ? em->getExtentSize() : size % em->getExtentSize(), oid, lastExtent + (i * em->getExtentSize()), lbids[i], head, dbroot, partNum, segmentNum); newEm->HWM = tmpHWM; head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created another extent for OID " << newEm->OID << endl; #endif em->checkConsistency(); break; } */ case 2: //delete an OID { if (listSize == 0) break; struct EMEntries* tmp = nullptr, *prev = nullptr; int fileRand = rand_r(&randstate) % listSize; int i, oid; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; em->deleteOID(oid); em->confirmChanges(); vector emEntriesVec; em->getExtents(oid, emEntriesVec, false, false, true); CPPUNIT_ASSERT(emEntriesVec.empty()); for (tmp = head; tmp != NULL;) { if (tmp->OID == oid) { if (tmp == head) { head = head->next; delete tmp; tmp = head; } else { prev->next = tmp->next; delete tmp; tmp = prev->next; } listSize--; } else { prev = tmp; tmp = tmp->next; } } #ifdef BRM_VERBOSE cerr << "deleted OID " << oid << endl; #endif //em->checkConsistency(); break; } case 3: //lookup by LBID { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset, oid; struct EMEntries* tmp; LBID_t target; uint32_t fbo; DBRootT localDbroot; PartitionNumberT localPartNum; SegmentT localSegmentNum; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % (tmp->size - 1); target = tmp->LBIDstart + offset; err = em->lookupLocal(target, oid, localDbroot, localPartNum, localSegmentNum, fbo); #ifdef BRM_VERBOSE cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl; cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl; cerr << "op 3 fbo " << fbo << " offset + tmp->FBO " << offset + tmp->FBO << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(oid == tmp->OID); CPPUNIT_ASSERT(fbo == offset + tmp->FBO); //em->checkConsistency(); break; } case 4: //lookup by OID, FBO { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, oid, err, offset; struct EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % (tmp->size - 1); oid = tmp->OID; err = em->lookupLocal(oid, tmp->partNum, tmp->segNum, offset + tmp->FBO, lbid); #ifdef BRM_VERBOSE cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl; cerr << " lbid should be " << tmp->LBIDstart + offset << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(lbid == tmp->LBIDstart + offset); //em->checkConsistency(); break; } case 5: //getHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, status; struct EMEntries* tmp; uint32_t hwm; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; hwm = em->getLocalHWM(tmp->OID, tmp->partNum, tmp->segNum, status); #ifdef BRM_VERBOSE_I cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl; #endif CPPUNIT_ASSERT(hwm == tmp->HWM); //em->checkConsistency(); break; } case 6: //setHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, hwm, oid; struct EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; oid = tmp->OID; hwm = rand_r(&randstate) % (tmp->size - 1); bool firstNode = true; em->setLocalHWM(oid, tmp->partNum, tmp->segNum, hwm, firstNode); em->confirmChanges(); tmp->HWM = hwm; #ifdef BRM_VERBOSE cerr << "setHWM of OID " << oid << " to " << hwm << endl; #endif //em->checkConsistency(); break; } /* case 7: // renew this EM object { delete em; em = new ExtentMap(); #ifdef BRM_VERBOSE cerr << "got a new EM instance" << endl; #endif em->checkConsistency(); break; } case 8: //getBulkInsertVars { if (listSize == 0) break; HWM_t hwm; VER_t txnID; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; lbid = tmp->LBIDstart + offset; err = em->getBulkInsertVars(lbid, hwm, txnID); CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(hwm == tmp->secondHWM); CPPUNIT_ASSERT(txnID == tmp->txnID); break; } case 9: //setBulkInsertVars { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; tmp->secondHWM = rand_r(&randstate) % MAXINT; tmp->txnID = rand_r(&randstate) % MAXINT; err = em->setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID); em->confirmChanges(); CPPUNIT_ASSERT(err == 0); break; } */ default: break; } } delete em; while (head != NULL) { tmp = head->next; delete head; head = tmp; } #ifdef BRM_VERBOSE cerr << "thread " << threadNum << " exiting" << endl; #endif return NULL; } /* ExtentMap em_si; static void* EMRunner_si(void* arg) { // keep track of LBID ranges allocated here and // randomly allocate, lookup, delete, get/set HWM, and // destroy the EM object. #ifdef BRM_VERBOSE int threadNum = reinterpret_cast(arg); #endif int op, listSize = 0, i; uint32_t randstate; struct EMEntries* head = NULL, *tmp; struct timeval tv; vector lbids; #ifdef BRM_VERBOSE cerr << "thread number " << threadNum << " started." << endl; #endif gettimeofday(&tv, NULL); randstate = static_cast(tv.tv_usec); while (!threadStop) { op = rand_r(&randstate) % 9; #ifdef BRM_VERBOSE cerr << "next op is " << op << endl; #endif switch (op) { case 0: //allocate space for a new file { struct EMEntries* newEm; int size = rand_r(&randstate) % 102399 + 1; int entries, OID, allocdSize; pthread_mutex_lock(&pthreadMutex); OID = oid++; pthread_mutex_unlock(&pthreadMutex); em_si.createExtent(size, OID, lbids, allocdSize); em_si.confirmChanges(); entries = size / em_si.getExtentSize(); if ((size % em_si.getExtentSize()) != 0) entries++; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0 ; i < entries; i++) { newEm = new EMEntries(); newEm->size = em_si.getExtentSize(); newEm->OID = OID; newEm->FBO = i * em_si.getExtentSize(); newEm->LBIDstart = lbids[i]; newEm->next = head; head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created new space for OID " << newEm->OID << endl; #endif em_si.checkConsistency(); break; } case 1: //allocate space for an existing file { if (listSize == 0) break; struct EMEntries* newEm, *tmp; int size = rand_r(&randstate) % 102399 + 1; int fileRand = rand_r(&randstate) % listSize; int i, lastExtent, blockEnd, oid; int tmpHWM, entries, allocdSize; vector lbids; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next) { if (tmp->OID != oid) continue; tmpHWM = tmp->HWM; blockEnd = tmp->FBO + tmp->size; if (lastExtent < blockEnd) lastExtent = blockEnd; } em_si.createExtent(size, oid, lbids, allocdSize); em_si.confirmChanges(); entries = size / em_si.getExtentSize(); if ((size % em_si.getExtentSize()) != 0) entries++; CPPUNIT_ASSERT((uint32_t)entries == lbids.size()); for (i = 0; i < entries; i++) { newEm = new EMEntries(); if (i != entries) newEm->size = em_si.getExtentSize(); else newEm->size = size % em_si.getExtentSize(); newEm->OID = oid; newEm->FBO = lastExtent + (i * em_si.getExtentSize()); newEm->LBIDstart = lbids[i]; newEm->HWM = tmpHWM; newEm->next = head; head = newEm; listSize++; } #ifdef BRM_VERBOSE cerr << "created another extent for OID " << newEm->OID << endl; #endif em_si.checkConsistency(); break; } case 2: //delete an OID { if (listSize == 0) break; struct EMEntries* tmp, *prev; int fileRand = rand_r(&randstate) % listSize; int i, oid; for (i = 0, tmp = head; i < fileRand; i++) tmp = tmp->next; oid = tmp->OID; em_si.deleteOID(oid); em_si.confirmChanges(); for (tmp = head; tmp != NULL;) { if (tmp->OID == oid) { if (tmp == head) { head = head->next; delete tmp; tmp = head; } else { prev->next = tmp->next; delete tmp; tmp = prev->next; } listSize--; } else { prev = tmp; tmp = tmp->next; } } #ifdef BRM_VERBOSE cerr << "deleted OID " << oid << endl; #endif em_si.checkConsistency(); break; } case 3: //lookup by LBID { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset, oid; struct EMEntries* tmp; LBID_t target; uint32_t fbo; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; target = tmp->LBIDstart + offset; err = em_si.lookup(target, oid, fbo); #ifdef BRM_VERBOSE cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl; cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(oid == tmp->OID); CPPUNIT_ASSERT(fbo == offset + tmp->FBO); em_si.checkConsistency(); break; } case 4: //lookup by OID, FBO { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, oid, err, offset; struct EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; oid = tmp->OID; err = em_si.lookup(oid, offset + tmp->FBO, lbid); #ifdef BRM_VERBOSE cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl; cerr << " lbid should be " << tmp->LBIDstart + offset << endl; #endif CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(lbid == tmp->LBIDstart + offset); em_si.checkConsistency(); break; } case 5: //getHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i; struct EMEntries* tmp; uint32_t hwm; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; hwm = em_si.getHWM(tmp->OID); #ifdef BRM_VERBOSE cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl; #endif CPPUNIT_ASSERT(hwm == tmp->HWM); em_si.checkConsistency(); break; } case 6: //setHWM { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, hwm, oid; struct EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; oid = tmp->OID; hwm = rand_r(&randstate) % (tmp->FBO + em_si.getExtentSize()); em_si.setHWM(oid, hwm); em_si.confirmChanges(); for (tmp = head; tmp != NULL; tmp = tmp->next) if (tmp->OID == oid) tmp->HWM = hwm; #ifdef BRM_VERBOSE cerr << "setHWM of OID " << oid << " to " << hwm << endl; #endif em_si.checkConsistency(); break; } case 7: //getBulkInsertVars { if (listSize == 0) break; HWM_t hwm; VER_t txnID; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; LBID_t lbid; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; lbid = tmp->LBIDstart + offset; err = em_si.getBulkInsertVars(lbid, hwm, txnID); CPPUNIT_ASSERT(err == 0); CPPUNIT_ASSERT(hwm == tmp->secondHWM); CPPUNIT_ASSERT(txnID == tmp->txnID); break; } case 8: //setBulkInsertVars { if (listSize == 0) break; int entryRand = rand_r(&randstate) % listSize; int i, err, offset; EMEntries* tmp; for (i = 0, tmp = head; i < entryRand; i++) tmp = tmp->next; offset = rand_r(&randstate) % tmp->size; tmp->secondHWM = rand_r(&randstate) % MAXINT; tmp->txnID = rand_r(&randstate) % MAXINT; err = em_si.setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID); em_si.confirmChanges(); CPPUNIT_ASSERT(err == 0); break; } default: break; } } while (head != NULL) { tmp = head->next; delete head; head = tmp; } #ifdef BRM_VERBOSE cerr << "thread " << threadNum << " exiting" << endl; #endif return NULL; } */ class LongBRMTests : public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(LongBRMTests); CPPUNIT_TEST(longEMTest_1); // CPPUNIT_TEST(longEMTest_2); // CPPUNIT_TEST(longBRMTest_1); // CPPUNIT_TEST(longBRMTest_2); CPPUNIT_TEST_SUITE_END(); private: public: void longEMTest_1() { const int threadCount = 10; int i; pthread_t threads[threadCount]; cerr << endl << "Multithreaded, multiple instance ExtentMap test. " "This runs for 5 minutes." << endl; threadStop = 0; pthread_mutex_init(&pthreadMutex, nullptr); for (i = 0; i < threadCount; i++) { if (pthread_create(&threads[i], NULL, EMRunner, reinterpret_cast(i + 1)) < 0) throw logic_error("Error creating threads for the ExtentMap test"); usleep(1000); } sleep(300); threadStop = 1; for (i = 0; i < threadCount; i++) { cerr << "Waiting for thread #" << i << endl; pthread_join(threads[i], nullptr); } } /* void longEMTest_2() { const int threadCount = 10; int i; pthread_t threads[threadCount]; cerr << endl << "Multithreaded, single instance ExtentMap test. " "This runs for 5 minutes." << endl; threadStop = 0; pthread_mutex_init(&pthreadMutex, NULL); for (i = 0; i < threadCount; i++) { if (pthread_create(&threads[i], NULL, EMRunner_si, reinterpret_cast(i + 1)) < 0) throw logic_error("Error creating threads for the ExtentMap test"); usleep(1000); } sleep(60); threadStop = 1; for (i = 0; i < threadCount; i++) { cerr << "Waiting for thread #" << i << endl; pthread_join(threads[i], NULL); } } void longBRMTest_1() { const int threadCount = 10; int i; pthread_t threads[threadCount]; cerr << endl << "Multithreaded, multiple instance DBRM test. " "This runs for 5 minutes." << endl; threadStop = 0; pthread_mutex_init(&pthreadMutex, NULL); opCount = 0; for (i = 0; i < threadCount; i++) { if (pthread_create(&threads[i], NULL, BRMRunner_1, reinterpret_cast(i + 1)) < 0) throw logic_error("Error creating threads for the DBRM test"); usleep(1000); } sleep(300); threadStop = 1; for (i = 0; i < threadCount; i++) { cerr << "Waiting for thread #" << i << endl; pthread_join(threads[i], NULL); } cerr << "opCount = " << opCount << endl; } void longBRMTest_2() { const int threadCount = 10; int i; pthread_t threads[threadCount]; cerr << endl << "Multithreaded, single instance DBRM test. " "This runs for 5 minutes." << endl; threadStop = 0; pthread_mutex_init(&pthreadMutex, NULL); opCount = 0; for (i = 0; i < threadCount; i++) { if (pthread_create(&threads[i], NULL, BRMRunner_si, reinterpret_cast(i + 1)) < 0) throw logic_error("Error creating threads for the DBRM test"); usleep(1000); } sleep(300); threadStop = 1; for (i = 0; i < threadCount; i++) { cerr << "Waiting for thread #" << i << endl; pthread_join(threads[i], NULL); } cerr << "opCount = " << opCount << endl; } */ }; CPPUNIT_TEST_SUITE_REGISTRATION( LongBRMTests ); #include #include int main( int argc, char** argv) { CppUnit::TextUi::TestRunner runner; CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry(); runner.addTest( registry.makeTest() ); idbdatafile::IDBPolicy::configIDBPolicy(); bool wasSuccessful = runner.run( "", false ); return (wasSuccessful ? 0 : 1); }