1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
2022-01-21 16:43:49 +00:00

1893 lines
45 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*****************************************************************************
* $Id: tdriver-dbrm2.cpp 1823 2013-01-21 14:13:09Z rdempsey $
*
****************************************************************************/
#include <iostream>
#include <pthread.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <stdexcept>
#include <pthread.h>
#include <sys/time.h>
#include <values.h>
#include <cppunit/extensions/HelperMacros.h>
#include "brm.h"
#include "IDBPolicy.h"
using namespace BRM;
using namespace std;
int threadStop;
int oid = 1;
uint64_t opCount = 0;
LBID_t lbidCounter = 0;
VER_t nextTxnID = 1;
u_int64_t vbOffset = 0;
pthread_mutex_t mutex;
struct Range
{
LBID_t start, end, nextBlock;
VER_t txnID;
Range()
{
start = end = nextBlock = 0;
txnID = 0;
}
};
static void* BRMRunner_2(void* arg)
{
vector<Range> copyList, copiedList, committedList;
vector<Range>::iterator rit;
vector<LBID_t> writtenList;
vector<LBID_t>::iterator lit;
pthread_mutex_t listMutex;
int op;
uint32_t randstate;
DBRM* brm;
struct timeval tv;
pthread_mutex_init(&listMutex, NULL);
gettimeofday(&tv, NULL);
randstate = static_cast<uint32_t>(tv.tv_usec);
brm = new DBRM();
while (!threadStop)
{
op = rand_r(&randstate) % 9;
switch (op)
{
case 0: // beginVBCopy
{
int blockCount, size, err;
Range newEntry;
VBRange_v vbRanges;
VBRange_v::iterator vit;
LBIDRange_v ranges;
LBIDRange range;
VER_t txnID;
size = rand_r(&randstate) % 10000;
pthread_mutex_lock(&mutex);
newEntry.start = lbidCounter;
lbidCounter += size;
txnID = nextTxnID++;
pthread_mutex_unlock(&mutex);
newEntry.nextBlock = newEntry.start;
newEntry.end = newEntry.start + size;
range.start = newEntry.start;
range.size = size;
err = brm->beginVBCopy(txnID, ranges, vbRanges);
CPPUNIT_ASSERT(err == 0);
for (blockCount = 0, vit = vbRanges.begin(); vit != vbRanges.end(); vit++)
blockCount += (*vit).size;
CPPUNIT_ASSERT(blockCount == size);
pthread_mutex_lock(&listMutex);
copyList.push_back(newEntry);
pthread_mutex_unlock(&listMutex);
err = brm->beginVBCopy(txnID, ranges, vbRanges);
CPPUNIT_ASSERT(err == -1);
break;
}
case 1: // writeVBEntry
{
int randIndex;
VER_t txnID;
Range* entry;
pthread_mutex_lock(&listMutex);
if (copyList.size() == 0)
break;
randIndex = rand_r(&randstate) % copyList.size();
entry = &(copyList[randIndex]);
entry->nextBlock++;
txnID = entry->txnID;
/********** WORKING HERE **********/
}
default: cerr << "not finished yet" << endl;
}
}
return NULL;
}
static void* BRMRunner_1(void* arg)
{
// keep track of LBID ranges allocated here and
// randomly allocate, lookup, delete, get/set HWM, and
// destroy the EM object.
struct EMEntries
{
u_int64_t LBIDstart;
uint32_t size;
int OID;
uint32_t FBO;
uint32_t HWM;
uint32_t secondHWM;
uint32_t txnID;
struct EMEntries* next;
EMEntries()
{
next = NULL;
HWM = 0;
secondHWM = 0;
txnID = 0;
}
};
#ifdef BRM_VERBOSE
int threadNum = reinterpret_cast<int>(arg);
#endif
int op, listSize = 0, i;
uint32_t randstate;
struct EMEntries *head = NULL, *tmp;
struct timeval tv;
DBRM* brm;
ExtentMap em;
vector<LBID_t> lbids;
#ifdef BRM_VERBOSE
cerr << "thread number " << threadNum << " started." << endl;
#endif
gettimeofday(&tv, NULL);
randstate = static_cast<uint32_t>(tv.tv_usec);
brm = new DBRM();
while (!threadStop)
{
op = rand_r(&randstate) % 10;
#ifdef BRM_VERBOSE
cerr << "next op is " << op << endl;
#endif
switch (op)
{
case 0: // allocate space for a new file
{
struct EMEntries* newEm;
int size = rand_r(&randstate) % 102399 + 1;
int entries, OID, allocdSize, err;
pthread_mutex_lock(&mutex);
OID = oid++;
opCount++;
pthread_mutex_unlock(&mutex);
err = brm->createExtent(size, OID, lbids, allocdSize);
CPPUNIT_ASSERT(err == 0);
entries = size / brm->getExtentSize();
if ((size % brm->getExtentSize()) != 0)
entries++;
if ((uint32_t)entries != lbids.size())
cerr << "entries = " << entries << " lbids.size = " << lbids.size() << endl;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
newEm->size = brm->getExtentSize();
newEm->OID = OID;
newEm->FBO = i * brm->getExtentSize();
newEm->LBIDstart = lbids[i];
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created new space for OID " << newEm->OID << endl;
#endif
em.checkConsistency();
break;
}
case 1: // allocate space for an existing file
{
if (listSize == 0)
break;
struct EMEntries *newEm, *tmp;
int size = rand_r(&randstate) % 102399 + 1;
int fileRand = rand_r(&randstate) % listSize;
int i, lastExtent, blockEnd, oid;
int tmpHWM, entries, allocdSize, err;
vector<LBID_t> lbids;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next)
{
if (tmp->OID != oid)
continue;
tmpHWM = tmp->HWM;
blockEnd = tmp->FBO + tmp->size;
if (lastExtent < blockEnd)
lastExtent = blockEnd;
}
err = brm->createExtent(size, oid, lbids, allocdSize);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
entries = size / brm->getExtentSize();
if ((size % brm->getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
if (i != entries)
newEm->size = brm->getExtentSize();
else
newEm->size = size % brm->getExtentSize();
newEm->OID = oid;
newEm->FBO = lastExtent + (i * brm->getExtentSize());
newEm->LBIDstart = lbids[i];
newEm->HWM = tmpHWM;
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created another extent for OID " << newEm->OID << endl;
#endif
em.checkConsistency();
break;
}
case 2: // delete an OID
{
if (listSize == 0)
break;
struct EMEntries *tmp, *prev;
int fileRand = rand_r(&randstate) % listSize;
int i, oid, err;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
err = brm->deleteOID(oid);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
for (tmp = head; tmp != NULL;)
{
if (tmp->OID == oid)
{
if (tmp == head)
{
head = head->next;
delete tmp;
tmp = head;
}
else
{
prev->next = tmp->next;
delete tmp;
tmp = prev->next;
}
listSize--;
}
else
{
prev = tmp;
tmp = tmp->next;
}
}
#ifdef BRM_VERBOSE
cerr << "deleted OID " << oid << endl;
#endif
em.checkConsistency();
break;
}
case 3: // lookup by LBID
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset, oid;
struct EMEntries* tmp;
LBID_t target;
uint32_t fbo;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
target = tmp->LBIDstart + offset;
err = brm->lookup(target, 0, false, oid, fbo);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
#ifdef BRM_VERBOSE
cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl;
cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(oid == tmp->OID);
CPPUNIT_ASSERT(fbo == offset + tmp->FBO);
em.checkConsistency();
break;
}
case 4: // lookup by OID, FBO
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, oid, err, offset;
struct EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
oid = tmp->OID;
err = brm->lookup(oid, offset + tmp->FBO, lbid);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
#ifdef BRM_VERBOSE
cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl;
cerr << " lbid should be " << tmp->LBIDstart + offset << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(lbid == static_cast<LBID_t>(tmp->LBIDstart + offset));
em.checkConsistency();
break;
}
case 5: // getHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err;
struct EMEntries* tmp;
uint32_t hwm;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
err = brm->getHWM(tmp->OID, hwm);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
#ifdef BRM_VERBOSE
cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl;
#endif
CPPUNIT_ASSERT(hwm == tmp->HWM);
em.checkConsistency();
break;
}
case 6: // setHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, hwm, oid, err;
struct EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
oid = tmp->OID;
hwm = rand_r(&randstate) % (tmp->FBO + brm->getExtentSize());
err = brm->setHWM(oid, hwm);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
for (tmp = head; tmp != NULL; tmp = tmp->next)
if (tmp->OID == oid)
tmp->HWM = hwm;
#ifdef BRM_VERBOSE
cerr << "setHWM of OID " << oid << " to " << hwm << endl;
#endif
em.checkConsistency();
break;
}
case 7: // renew this EM object
{
delete brm;
brm = new DBRM();
#ifdef BRM_VERBOSE
cerr << "got a new BRM instance" << endl;
#endif
em.checkConsistency();
break;
}
case 8: // getBulkInsertVars
{
if (listSize == 0)
break;
HWM_t hwm;
VER_t txnID;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
lbid = tmp->LBIDstart + offset;
err = brm->getBulkInsertVars(lbid, hwm, txnID);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(hwm == tmp->secondHWM);
CPPUNIT_ASSERT(txnID == tmp->txnID);
break;
}
case 9: // setBulkInsertVars
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
tmp->secondHWM = rand_r(&randstate) % MAXINT;
tmp->txnID = rand_r(&randstate) % MAXINT;
err = brm->setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
break;
}
default: break;
}
}
delete brm;
while (head != NULL)
{
tmp = head->next;
delete head;
head = tmp;
}
#ifdef BRM_VERBOSE
cerr << "thread " << threadNum << " exiting" << endl;
#endif
return NULL;
}
DBRM brm_si;
static void* BRMRunner_si(void* arg)
{
// keep track of LBID ranges allocated here and
// randomly allocate, lookup, delete, get/set HWM, and
// destroy the EM object.
struct EMEntries
{
u_int64_t LBIDstart;
uint32_t size;
int OID;
uint32_t FBO;
uint32_t HWM;
uint32_t secondHWM;
uint32_t txnID;
struct EMEntries* next;
EMEntries()
{
next = NULL;
HWM = 0;
secondHWM = 0;
txnID = 0;
}
};
#ifdef BRM_VERBOSE
int threadNum = reinterpret_cast<int>(arg);
#endif
int op, listSize = 0, i;
uint32_t randstate;
struct EMEntries *head = NULL, *tmp;
struct timeval tv;
ExtentMap em;
vector<LBID_t> lbids;
#ifdef BRM_VERBOSE
cerr << "thread number " << threadNum << " started." << endl;
#endif
gettimeofday(&tv, NULL);
randstate = static_cast<uint32_t>(tv.tv_usec);
while (!threadStop)
{
op = rand_r(&randstate) % 10;
#ifdef BRM_VERBOSE
cerr << "next op is " << op << endl;
#endif
switch (op)
{
case 0: // allocate space for a new file
{
struct EMEntries* newEm;
int size = rand_r(&randstate) % 102399 + 1;
int entries, OID, allocdSize, err;
pthread_mutex_lock(&mutex);
OID = oid++;
opCount++;
pthread_mutex_unlock(&mutex);
err = brm_si.createExtent(size, OID, lbids, allocdSize);
CPPUNIT_ASSERT(err == 0);
entries = size / brm_si.getExtentSize();
if ((size % brm_si.getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
newEm->size = brm_si.getExtentSize();
newEm->OID = OID;
newEm->FBO = i * brm_si.getExtentSize();
newEm->LBIDstart = lbids[i];
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created new space for OID " << newEm->OID << endl;
#endif
em.checkConsistency();
break;
}
case 1: // allocate space for an existing file
{
if (listSize == 0)
break;
struct EMEntries *newEm, *tmp;
int size = rand_r(&randstate) % 102399 + 1;
int fileRand = rand_r(&randstate) % listSize;
int i, lastExtent, blockEnd, oid;
int tmpHWM, entries, allocdSize, err;
vector<LBID_t> lbids;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next)
{
if (tmp->OID != oid)
continue;
tmpHWM = tmp->HWM;
blockEnd = tmp->FBO + tmp->size;
if (lastExtent < blockEnd)
lastExtent = blockEnd;
}
err = brm_si.createExtent(size, oid, lbids, allocdSize);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
entries = size / brm_si.getExtentSize();
if ((size % brm_si.getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
if (i != entries)
newEm->size = brm_si.getExtentSize();
else
newEm->size = size % brm_si.getExtentSize();
newEm->OID = oid;
newEm->FBO = lastExtent + (i * brm_si.getExtentSize());
newEm->LBIDstart = lbids[i];
newEm->HWM = tmpHWM;
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created another extent for OID " << newEm->OID << endl;
#endif
em.checkConsistency();
break;
}
case 2: // delete an OID
{
if (listSize == 0)
break;
struct EMEntries *tmp, *prev;
int fileRand = rand_r(&randstate) % listSize;
int i, oid, err;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
err = brm_si.deleteOID(oid);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
for (tmp = head; tmp != NULL;)
{
if (tmp->OID == oid)
{
if (tmp == head)
{
head = head->next;
delete tmp;
tmp = head;
}
else
{
prev->next = tmp->next;
delete tmp;
tmp = prev->next;
}
listSize--;
}
else
{
prev = tmp;
tmp = tmp->next;
}
}
#ifdef BRM_VERBOSE
cerr << "deleted OID " << oid << endl;
#endif
em.checkConsistency();
break;
}
case 3: // lookup by LBID
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset, oid;
struct EMEntries* tmp;
LBID_t target;
uint32_t fbo;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
target = tmp->LBIDstart + offset;
err = brm_si.lookup(target, 0, false, oid, fbo);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
#ifdef BRM_VERBOSE
cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl;
cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(oid == tmp->OID);
CPPUNIT_ASSERT(fbo == offset + tmp->FBO);
em.checkConsistency();
break;
}
case 4: // lookup by OID, FBO
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, oid, err, offset;
struct EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
oid = tmp->OID;
err = brm_si.lookup(oid, offset + tmp->FBO, lbid);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
#ifdef BRM_VERBOSE
cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl;
cerr << " lbid should be " << tmp->LBIDstart + offset << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(lbid == static_cast<LBID_t>(tmp->LBIDstart + offset));
em.checkConsistency();
break;
}
case 5: // getHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err;
struct EMEntries* tmp;
uint32_t hwm;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
err = brm_si.getHWM(tmp->OID, hwm);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
#ifdef BRM_VERBOSE
cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl;
#endif
CPPUNIT_ASSERT(hwm == tmp->HWM);
em.checkConsistency();
break;
}
case 6: // setHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, hwm, oid, err;
struct EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
oid = tmp->OID;
hwm = rand_r(&randstate) % (tmp->FBO + brm_si.getExtentSize());
err = brm_si.setHWM(oid, hwm);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
for (tmp = head; tmp != NULL; tmp = tmp->next)
if (tmp->OID == oid)
tmp->HWM = hwm;
#ifdef BRM_VERBOSE
cerr << "setHWM of OID " << oid << " to " << hwm << endl;
#endif
em.checkConsistency();
break;
}
case 7: // getBulkInsertVars
{
if (listSize == 0)
break;
HWM_t hwm;
VER_t txnID;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
lbid = tmp->LBIDstart + offset;
err = brm_si.getBulkInsertVars(lbid, hwm, txnID);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(hwm == tmp->secondHWM);
CPPUNIT_ASSERT(txnID == tmp->txnID);
break;
}
case 8: // setBulkInsertVars
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
tmp->secondHWM = rand_r(&randstate) % MAXINT;
tmp->txnID = rand_r(&randstate) % MAXINT;
err = brm_si.setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID);
pthread_mutex_lock(&mutex);
opCount++;
pthread_mutex_unlock(&mutex);
CPPUNIT_ASSERT(err == 0);
break;
}
default: break;
}
}
while (head != NULL)
{
tmp = head->next;
delete head;
head = tmp;
}
#ifdef BRM_VERBOSE
cerr << "thread " << threadNum << " exiting" << endl;
#endif
return NULL;
}
static void* EMRunner(void* arg)
{
// keep track of LBID ranges allocated here and
// randomly allocate, lookup, delete, get/set HWM, and
// destroy the EM object.
struct EMEntries
{
u_int64_t LBIDstart;
uint32_t size;
int OID;
uint32_t FBO;
uint32_t HWM;
uint32_t secondHWM;
uint32_t txnID;
struct EMEntries* next;
EMEntries()
{
next = NULL;
HWM = 0;
secondHWM = 0;
txnID = 0;
}
};
#ifdef BRM_VERBOSE
int threadNum = reinterpret_cast<int>(arg);
#endif
int op, listSize = 0, i;
uint32_t randstate;
struct EMEntries *head = NULL, *tmp;
struct timeval tv;
ExtentMap* em;
vector<LBID_t> lbids;
#ifdef BRM_VERBOSE
cerr << "thread number " << threadNum << " started." << endl;
#endif
gettimeofday(&tv, NULL);
randstate = static_cast<uint32_t>(tv.tv_usec);
em = new ExtentMap();
while (!threadStop)
{
op = rand_r(&randstate) % 10;
#ifdef BRM_VERBOSE
cerr << "next op is " << op << endl;
#endif
switch (op)
{
case 0: // allocate space for a new file
{
struct EMEntries* newEm;
int size = rand_r(&randstate) % 102399 + 1;
int entries, OID, allocdSize;
pthread_mutex_lock(&mutex);
OID = oid++;
pthread_mutex_unlock(&mutex);
em->createExtent(size, OID, lbids, allocdSize);
em->confirmChanges();
entries = size / em->getExtentSize();
if ((size % em->getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
newEm->size = em->getExtentSize();
newEm->OID = OID;
newEm->FBO = i * em->getExtentSize();
newEm->LBIDstart = lbids[i];
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created new space for OID " << newEm->OID << endl;
#endif
em->checkConsistency();
break;
}
case 1: // allocate space for an existing file
{
if (listSize == 0)
break;
struct EMEntries *newEm, *tmp;
int size = rand_r(&randstate) % 102399 + 1;
int fileRand = rand_r(&randstate) % listSize;
int i, lastExtent, blockEnd, oid;
int tmpHWM, entries, allocdSize;
vector<LBID_t> lbids;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next)
{
if (tmp->OID != oid)
continue;
tmpHWM = tmp->HWM;
blockEnd = tmp->FBO + tmp->size;
if (lastExtent < blockEnd)
lastExtent = blockEnd;
}
em->createExtent(size, oid, lbids, allocdSize);
em->confirmChanges();
entries = size / em->getExtentSize();
if ((size % em->getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
if (i != entries)
newEm->size = em->getExtentSize();
else
newEm->size = size % em->getExtentSize();
newEm->OID = oid;
newEm->FBO = lastExtent + (i * em->getExtentSize());
newEm->LBIDstart = lbids[i];
newEm->HWM = tmpHWM;
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created another extent for OID " << newEm->OID << endl;
#endif
em->checkConsistency();
break;
}
case 2: // delete an OID
{
if (listSize == 0)
break;
struct EMEntries *tmp, *prev;
int fileRand = rand_r(&randstate) % listSize;
int i, oid;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
em->deleteOID(oid);
em->confirmChanges();
for (tmp = head; tmp != NULL;)
{
if (tmp->OID == oid)
{
if (tmp == head)
{
head = head->next;
delete tmp;
tmp = head;
}
else
{
prev->next = tmp->next;
delete tmp;
tmp = prev->next;
}
listSize--;
}
else
{
prev = tmp;
tmp = tmp->next;
}
}
#ifdef BRM_VERBOSE
cerr << "deleted OID " << oid << endl;
#endif
em->checkConsistency();
break;
}
case 3: // lookup by LBID
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset, oid;
struct EMEntries* tmp;
LBID_t target;
uint32_t fbo;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
target = tmp->LBIDstart + offset;
err = em->lookup(target, oid, fbo);
#ifdef BRM_VERBOSE
cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl;
cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(oid == tmp->OID);
CPPUNIT_ASSERT(fbo == offset + tmp->FBO);
em->checkConsistency();
break;
}
case 4: // lookup by OID, FBO
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, oid, err, offset;
struct EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
oid = tmp->OID;
err = em->lookup(oid, offset + tmp->FBO, lbid);
#ifdef BRM_VERBOSE
cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl;
cerr << " lbid should be " << tmp->LBIDstart + offset << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(lbid == tmp->LBIDstart + offset);
em->checkConsistency();
break;
}
case 5: // getHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i;
struct EMEntries* tmp;
uint32_t hwm;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
hwm = em->getHWM(tmp->OID);
#ifdef BRM_VERBOSE
cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl;
#endif
CPPUNIT_ASSERT(hwm == tmp->HWM);
em->checkConsistency();
break;
}
case 6: // setHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, hwm, oid;
struct EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
oid = tmp->OID;
hwm = rand_r(&randstate) % (tmp->FBO + em->getExtentSize());
em->setHWM(oid, hwm);
em->confirmChanges();
for (tmp = head; tmp != NULL; tmp = tmp->next)
if (tmp->OID == oid)
tmp->HWM = hwm;
#ifdef BRM_VERBOSE
cerr << "setHWM of OID " << oid << " to " << hwm << endl;
#endif
em->checkConsistency();
break;
}
case 7: // renew this EM object
{
delete em;
em = new ExtentMap();
#ifdef BRM_VERBOSE
cerr << "got a new EM instance" << endl;
#endif
em->checkConsistency();
break;
}
case 8: // getBulkInsertVars
{
if (listSize == 0)
break;
HWM_t hwm;
VER_t txnID;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
lbid = tmp->LBIDstart + offset;
err = em->getBulkInsertVars(lbid, hwm, txnID);
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(hwm == tmp->secondHWM);
CPPUNIT_ASSERT(txnID == tmp->txnID);
break;
}
case 9: // setBulkInsertVars
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
tmp->secondHWM = rand_r(&randstate) % MAXINT;
tmp->txnID = rand_r(&randstate) % MAXINT;
err = em->setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID);
em->confirmChanges();
CPPUNIT_ASSERT(err == 0);
break;
}
default: break;
}
}
delete em;
while (head != NULL)
{
tmp = head->next;
delete head;
head = tmp;
}
#ifdef BRM_VERBOSE
cerr << "thread " << threadNum << " exiting" << endl;
#endif
return NULL;
}
ExtentMap em_si;
static void* EMRunner_si(void* arg)
{
// keep track of LBID ranges allocated here and
// randomly allocate, lookup, delete, get/set HWM, and
// destroy the EM object.
struct EMEntries
{
u_int64_t LBIDstart;
uint32_t size;
int OID;
uint32_t FBO;
uint32_t HWM;
uint32_t secondHWM;
uint32_t txnID;
struct EMEntries* next;
EMEntries()
{
next = NULL;
HWM = 0;
secondHWM = 0;
txnID = 0;
}
};
#ifdef BRM_VERBOSE
int threadNum = reinterpret_cast<int>(arg);
#endif
int op, listSize = 0, i;
uint32_t randstate;
struct EMEntries *head = NULL, *tmp;
struct timeval tv;
vector<LBID_t> lbids;
#ifdef BRM_VERBOSE
cerr << "thread number " << threadNum << " started." << endl;
#endif
gettimeofday(&tv, NULL);
randstate = static_cast<uint32_t>(tv.tv_usec);
while (!threadStop)
{
op = rand_r(&randstate) % 9;
#ifdef BRM_VERBOSE
cerr << "next op is " << op << endl;
#endif
switch (op)
{
case 0: // allocate space for a new file
{
struct EMEntries* newEm;
int size = rand_r(&randstate) % 102399 + 1;
int entries, OID, allocdSize;
pthread_mutex_lock(&mutex);
OID = oid++;
pthread_mutex_unlock(&mutex);
em_si.createExtent(size, OID, lbids, allocdSize);
em_si.confirmChanges();
entries = size / em_si.getExtentSize();
if ((size % em_si.getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
newEm->size = em_si.getExtentSize();
newEm->OID = OID;
newEm->FBO = i * em_si.getExtentSize();
newEm->LBIDstart = lbids[i];
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created new space for OID " << newEm->OID << endl;
#endif
em_si.checkConsistency();
break;
}
case 1: // allocate space for an existing file
{
if (listSize == 0)
break;
struct EMEntries *newEm, *tmp;
int size = rand_r(&randstate) % 102399 + 1;
int fileRand = rand_r(&randstate) % listSize;
int i, lastExtent, blockEnd, oid;
int tmpHWM, entries, allocdSize;
vector<LBID_t> lbids;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
for (lastExtent = 0, tmp = head; tmp != NULL; tmp = tmp->next)
{
if (tmp->OID != oid)
continue;
tmpHWM = tmp->HWM;
blockEnd = tmp->FBO + tmp->size;
if (lastExtent < blockEnd)
lastExtent = blockEnd;
}
em_si.createExtent(size, oid, lbids, allocdSize);
em_si.confirmChanges();
entries = size / em_si.getExtentSize();
if ((size % em_si.getExtentSize()) != 0)
entries++;
CPPUNIT_ASSERT((uint32_t)entries == lbids.size());
for (i = 0; i < entries; i++)
{
newEm = new EMEntries();
if (i != entries)
newEm->size = em_si.getExtentSize();
else
newEm->size = size % em_si.getExtentSize();
newEm->OID = oid;
newEm->FBO = lastExtent + (i * em_si.getExtentSize());
newEm->LBIDstart = lbids[i];
newEm->HWM = tmpHWM;
newEm->next = head;
head = newEm;
listSize++;
}
#ifdef BRM_VERBOSE
cerr << "created another extent for OID " << newEm->OID << endl;
#endif
em_si.checkConsistency();
break;
}
case 2: // delete an OID
{
if (listSize == 0)
break;
struct EMEntries *tmp, *prev;
int fileRand = rand_r(&randstate) % listSize;
int i, oid;
for (i = 0, tmp = head; i < fileRand; i++)
tmp = tmp->next;
oid = tmp->OID;
em_si.deleteOID(oid);
em_si.confirmChanges();
for (tmp = head; tmp != NULL;)
{
if (tmp->OID == oid)
{
if (tmp == head)
{
head = head->next;
delete tmp;
tmp = head;
}
else
{
prev->next = tmp->next;
delete tmp;
tmp = prev->next;
}
listSize--;
}
else
{
prev = tmp;
tmp = tmp->next;
}
}
#ifdef BRM_VERBOSE
cerr << "deleted OID " << oid << endl;
#endif
em_si.checkConsistency();
break;
}
case 3: // lookup by LBID
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset, oid;
struct EMEntries* tmp;
LBID_t target;
uint32_t fbo;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
target = tmp->LBIDstart + offset;
err = em_si.lookup(target, oid, fbo);
#ifdef BRM_VERBOSE
cerr << "looked up LBID " << target << " got oid " << oid << " fbo " << fbo << endl;
cerr << " oid should be " << tmp->OID << " fbo should be " << offset + tmp->FBO << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(oid == tmp->OID);
CPPUNIT_ASSERT(fbo == offset + tmp->FBO);
em_si.checkConsistency();
break;
}
case 4: // lookup by OID, FBO
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, oid, err, offset;
struct EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
oid = tmp->OID;
err = em_si.lookup(oid, offset + tmp->FBO, lbid);
#ifdef BRM_VERBOSE
cerr << "looked up OID " << oid << " fbo " << offset + tmp->FBO << " got lbid " << lbid << endl;
cerr << " lbid should be " << tmp->LBIDstart + offset << endl;
#endif
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(lbid == tmp->LBIDstart + offset);
em_si.checkConsistency();
break;
}
case 5: // getHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i;
struct EMEntries* tmp;
uint32_t hwm;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
hwm = em_si.getHWM(tmp->OID);
#ifdef BRM_VERBOSE
cerr << "stored HWM for OID " << tmp->OID << " is " << tmp->HWM << " BRM says it's " << hwm << endl;
#endif
CPPUNIT_ASSERT(hwm == tmp->HWM);
em_si.checkConsistency();
break;
}
case 6: // setHWM
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, hwm, oid;
struct EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
oid = tmp->OID;
hwm = rand_r(&randstate) % (tmp->FBO + em_si.getExtentSize());
em_si.setHWM(oid, hwm);
em_si.confirmChanges();
for (tmp = head; tmp != NULL; tmp = tmp->next)
if (tmp->OID == oid)
tmp->HWM = hwm;
#ifdef BRM_VERBOSE
cerr << "setHWM of OID " << oid << " to " << hwm << endl;
#endif
em_si.checkConsistency();
break;
}
case 7: // getBulkInsertVars
{
if (listSize == 0)
break;
HWM_t hwm;
VER_t txnID;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
LBID_t lbid;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
lbid = tmp->LBIDstart + offset;
err = em_si.getBulkInsertVars(lbid, hwm, txnID);
CPPUNIT_ASSERT(err == 0);
CPPUNIT_ASSERT(hwm == tmp->secondHWM);
CPPUNIT_ASSERT(txnID == tmp->txnID);
break;
}
case 8: // setBulkInsertVars
{
if (listSize == 0)
break;
int entryRand = rand_r(&randstate) % listSize;
int i, err, offset;
EMEntries* tmp;
for (i = 0, tmp = head; i < entryRand; i++)
tmp = tmp->next;
offset = rand_r(&randstate) % tmp->size;
tmp->secondHWM = rand_r(&randstate) % MAXINT;
tmp->txnID = rand_r(&randstate) % MAXINT;
err = em_si.setBulkInsertVars(tmp->LBIDstart + offset, tmp->secondHWM, tmp->txnID);
em_si.confirmChanges();
CPPUNIT_ASSERT(err == 0);
break;
}
default: break;
}
}
while (head != NULL)
{
tmp = head->next;
delete head;
head = tmp;
}
#ifdef BRM_VERBOSE
cerr << "thread " << threadNum << " exiting" << endl;
#endif
return NULL;
}
class LongBRMTests : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE(LongBRMTests);
// CPPUNIT_TEST(longEMTest_1);
// CPPUNIT_TEST(longEMTest_2);
CPPUNIT_TEST(longBRMTest_1);
CPPUNIT_TEST(longBRMTest_2);
CPPUNIT_TEST_SUITE_END();
private:
public:
void longEMTest_1()
{
const int threadCount = 10;
int i;
pthread_t threads[threadCount];
cerr << endl
<< "Multithreaded, multiple instance ExtentMap test. "
"This runs for 5 minutes."
<< endl;
threadStop = 0;
pthread_mutex_init(&mutex, NULL);
for (i = 0; i < threadCount; i++)
{
if (pthread_create(&threads[i], NULL, EMRunner, reinterpret_cast<void*>(i + 1)) < 0)
throw logic_error("Error creating threads for the ExtentMap test");
usleep(1000);
}
sleep(60);
threadStop = 1;
for (i = 0; i < threadCount; i++)
{
cerr << "Waiting for thread #" << i << endl;
pthread_join(threads[i], NULL);
}
}
void longEMTest_2()
{
const int threadCount = 10;
int i;
pthread_t threads[threadCount];
cerr << endl
<< "Multithreaded, single instance ExtentMap test. "
"This runs for 5 minutes."
<< endl;
threadStop = 0;
pthread_mutex_init(&mutex, NULL);
for (i = 0; i < threadCount; i++)
{
if (pthread_create(&threads[i], NULL, EMRunner_si, reinterpret_cast<void*>(i + 1)) < 0)
throw logic_error("Error creating threads for the ExtentMap test");
usleep(1000);
}
sleep(60);
threadStop = 1;
for (i = 0; i < threadCount; i++)
{
cerr << "Waiting for thread #" << i << endl;
pthread_join(threads[i], NULL);
}
}
void longBRMTest_1()
{
const int threadCount = 10;
int i;
pthread_t threads[threadCount];
cerr << endl
<< "Multithreaded, multiple instance DBRM test. "
"This runs for 5 minutes."
<< endl;
threadStop = 0;
pthread_mutex_init(&mutex, NULL);
opCount = 0;
for (i = 0; i < threadCount; i++)
{
if (pthread_create(&threads[i], NULL, BRMRunner_1, reinterpret_cast<void*>(i + 1)) < 0)
throw logic_error("Error creating threads for the DBRM test");
usleep(1000);
}
sleep(300);
threadStop = 1;
for (i = 0; i < threadCount; i++)
{
cerr << "Waiting for thread #" << i << endl;
pthread_join(threads[i], NULL);
}
cerr << "opCount = " << opCount << endl;
}
void longBRMTest_2()
{
const int threadCount = 10;
int i;
pthread_t threads[threadCount];
cerr << endl
<< "Multithreaded, single instance DBRM test. "
"This runs for 5 minutes."
<< endl;
threadStop = 0;
pthread_mutex_init(&mutex, NULL);
opCount = 0;
for (i = 0; i < threadCount; i++)
{
if (pthread_create(&threads[i], NULL, BRMRunner_si, reinterpret_cast<void*>(i + 1)) < 0)
throw logic_error("Error creating threads for the DBRM test");
usleep(1000);
}
sleep(300);
threadStop = 1;
for (i = 0; i < threadCount; i++)
{
cerr << "Waiting for thread #" << i << endl;
pthread_join(threads[i], NULL);
}
cerr << "opCount = " << opCount << endl;
}
};
CPPUNIT_TEST_SUITE_REGISTRATION(LongBRMTests);
#include <cppunit/extensions/TestFactoryRegistry.h>
#include <cppunit/ui/text/TestRunner.h>
int main(int argc, char** argv)
{
CppUnit::TextUi::TestRunner runner;
CppUnit::TestFactoryRegistry& registry = CppUnit::TestFactoryRegistry::getRegistry();
runner.addTest(registry.makeTest());
idbdatafile::IDBPolicy::configIDBPolicy();
bool wasSuccessful = runner.run("", false);
return (wasSuccessful ? 0 : 1);
}