mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
2449 lines
66 KiB
C++
2449 lines
66 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2016 MariaDB Corporation
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: primitiveserver.cpp 2147 2013-08-14 20:44:44Z bwilkinson $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
|
|
#define _FILE_OFFSET_BITS 64
|
|
#define _LARGEFILE64_SOURCE
|
|
#include <unistd.h>
|
|
#include <sys/types.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <mutex>
|
|
#include <stdexcept>
|
|
|
|
// #define NDEBUG
|
|
#include <cassert>
|
|
#include <boost/thread.hpp>
|
|
#include <boost/thread/condition.hpp>
|
|
#include <boost/foreach.hpp>
|
|
#include <tr1/unordered_map>
|
|
#include <tr1/unordered_set>
|
|
#include <pthread.h>
|
|
#include <cerrno>
|
|
|
|
using namespace std;
|
|
|
|
#include <boost/scoped_ptr.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
#include <utility>
|
|
#include <boost/thread.hpp>
|
|
using namespace boost;
|
|
#include "distributedenginecomm.h"
|
|
#include "serviceexemgr.h"
|
|
#include "primproc.h"
|
|
#include "primitiveserver.h"
|
|
#include "primitivemsg.h"
|
|
#include "umsocketselector.h"
|
|
#include "brm.h"
|
|
using namespace BRM;
|
|
|
|
#include "writeengine.h"
|
|
|
|
#include "messagequeue.h"
|
|
#include "samenodepseudosocket.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "blockrequestprocessor.h"
|
|
#include "blockcacheclient.h"
|
|
#include "stats.h"
|
|
using namespace dbbc;
|
|
|
|
#include "liboamcpp.h"
|
|
using namespace oam;
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "bppseeder.h"
|
|
#include "primitiveprocessor.h"
|
|
#include "pp_logger.h"
|
|
using namespace primitives;
|
|
|
|
#include "errorcodes.h"
|
|
#include "exceptclasses.h"
|
|
|
|
#include "idbcompress.h"
|
|
using namespace compress;
|
|
|
|
#include "IDBDataFile.h"
|
|
#include "IDBPolicy.h"
|
|
using namespace idbdatafile;
|
|
|
|
using namespace threadpool;
|
|
|
|
#include "threadnaming.h"
|
|
|
|
#include "atomicops.h"
|
|
|
|
#ifndef O_BINARY
|
|
#define O_BINARY 0
|
|
#endif
|
|
#ifndef O_DIRECT
|
|
#define O_DIRECT 0
|
|
#endif
|
|
#ifndef O_LARGEFILE
|
|
#define O_LARGEFILE 0
|
|
#endif
|
|
#ifndef O_NOATIME
|
|
#define O_NOATIME 0
|
|
#endif
|
|
|
|
// make global for blockcache
|
|
//
|
|
static const char* statsName = {"pm"};
|
|
dbbc::Stats* gPMStatsPtr = nullptr;
|
|
bool gPMProfOn = false;
|
|
uint32_t gSession = 0;
|
|
dbbc::Stats pmstats(statsName);
|
|
|
|
// FIXME: there is an anon ns burried later in between 2 named namespaces...
|
|
namespace primitiveprocessor
|
|
{
|
|
|
|
BlockRequestProcessor** BRPp;
|
|
dbbc::Stats stats;
|
|
extern DebugLevel gDebugLevel;
|
|
BRM::DBRM* brm;
|
|
int fCacheCount;
|
|
bool fPMProfOn;
|
|
bool fLBIDTraceOn;
|
|
|
|
/* params from the config file */
|
|
uint32_t BPPCount;
|
|
uint32_t blocksReadAhead;
|
|
uint32_t defaultBufferSize;
|
|
uint32_t connectionsPerUM;
|
|
uint32_t highPriorityThreads;
|
|
uint32_t medPriorityThreads;
|
|
uint32_t lowPriorityThreads;
|
|
int directIOFlag = O_DIRECT;
|
|
int noVB = 0;
|
|
|
|
BPPMap bppMap;
|
|
boost::mutex bppLock;
|
|
|
|
boost::mutex djMutex; // lock for djLock, lol.
|
|
std::map<uint64_t, shared_mutex*> djLock; // djLock synchronizes destroy and joiner msgs, see bug 2619
|
|
|
|
volatile int32_t asyncCounter;
|
|
const int asyncMax = 20; // current number of asynchronous loads
|
|
|
|
struct preFetchCond
|
|
{
|
|
// uint64_t lbid;
|
|
boost::condition cond;
|
|
unsigned waiters;
|
|
|
|
preFetchCond(const uint64_t l)
|
|
{
|
|
waiters = 0;
|
|
}
|
|
|
|
~preFetchCond() = default;
|
|
};
|
|
|
|
typedef preFetchCond preFetchBlock_t;
|
|
typedef std::tr1::unordered_map<uint64_t, preFetchBlock_t*> pfBlockMap_t;
|
|
typedef std::tr1::unordered_map<uint64_t, preFetchBlock_t*>::iterator pfBlockMapIter_t;
|
|
|
|
pfBlockMap_t pfBlockMap;
|
|
boost::mutex pfbMutex; // = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
pfBlockMap_t pfExtentMap;
|
|
boost::mutex pfMutex; // = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
map<uint32_t, boost::shared_ptr<DictEqualityFilter> > dictEqualityFilters;
|
|
boost::mutex eqFilterMutex;
|
|
|
|
uint32_t cacheNum(uint64_t lbid)
|
|
{
|
|
return (lbid / brm->getExtentSize()) % fCacheCount;
|
|
}
|
|
|
|
void buildOidFileName(const BRM::OID_t oid, const uint16_t dbRoot, const uint16_t partNum,
|
|
const uint32_t segNum, char* file_name)
|
|
{
|
|
WriteEngine::FileOp fileOp(false);
|
|
|
|
if (fileOp.getFileName(oid, file_name, dbRoot, partNum, segNum) != WriteEngine::NO_ERROR)
|
|
{
|
|
file_name[0] = 0;
|
|
throw std::runtime_error("fileOp.getFileName failed");
|
|
}
|
|
|
|
// cout << "Oid2Filename o: " << oid << " n: " << file_name << endl;
|
|
}
|
|
|
|
void waitForRetry(long count)
|
|
{
|
|
timespec ts;
|
|
ts.tv_sec = 5L * count / 10L;
|
|
ts.tv_nsec = (5L * count % 10L) * 100000000L;
|
|
nanosleep(&ts, nullptr);
|
|
}
|
|
|
|
void prefetchBlocks(const uint64_t lbid, const int compType, uint32_t* rCount)
|
|
{
|
|
uint16_t dbRoot;
|
|
uint32_t partNum;
|
|
uint16_t segNum;
|
|
uint32_t hwm;
|
|
uint32_t fbo;
|
|
uint32_t lowfbo;
|
|
uint32_t highfbo;
|
|
BRM::OID_t oid;
|
|
pfBlockMap_t::const_iterator iter;
|
|
uint64_t lowlbid = (lbid / blocksReadAhead) * blocksReadAhead;
|
|
blockCacheClient bc(*BRPp[cacheNum(lbid)]);
|
|
BRM::InlineLBIDRange range;
|
|
int err;
|
|
|
|
pfbMutex.lock();
|
|
|
|
iter = pfBlockMap.find(lowlbid);
|
|
|
|
if (iter != pfBlockMap.end())
|
|
{
|
|
iter->second->waiters++;
|
|
iter->second->cond.wait(pfbMutex);
|
|
iter->second->waiters--;
|
|
pfbMutex.unlock();
|
|
return;
|
|
}
|
|
|
|
preFetchBlock_t* pfb = nullptr;
|
|
pfb = new preFetchBlock_t(lowlbid);
|
|
|
|
pfBlockMap[lowlbid] = pfb;
|
|
pfbMutex.unlock();
|
|
|
|
// loadBlock will catch a versioned block so vbflag can be set to false here
|
|
err = brm->lookupLocal(lbid, 0, false, oid, dbRoot, partNum, segNum, fbo); // need the oid
|
|
|
|
if (err < 0)
|
|
{
|
|
cerr << "prefetchBlocks(): BRM lookupLocal failed! Expect more errors.\n";
|
|
goto cleanup;
|
|
}
|
|
|
|
// We ignore extState that tells us whether the extent is
|
|
// an outOfService extent to be ignored. The filtering for
|
|
// outOfService extents is done elsewhere.
|
|
int extState;
|
|
err = brm->getLocalHWM(oid, partNum, segNum, hwm, extState);
|
|
|
|
if (err < 0)
|
|
{
|
|
cerr << "prefetchBlock(): BRM getLocalHWM failed! Expect more errors.\n";
|
|
goto cleanup;
|
|
}
|
|
|
|
lowfbo = fbo - (lbid - lowlbid);
|
|
highfbo = lowfbo + blocksReadAhead - 1;
|
|
range.start = lowlbid;
|
|
|
|
if (hwm < highfbo)
|
|
range.size = hwm - lowfbo + 1;
|
|
else
|
|
range.size = blocksReadAhead;
|
|
|
|
try
|
|
{
|
|
if (range.size > blocksReadAhead)
|
|
{
|
|
ostringstream os;
|
|
os << "Invalid Range from HWM for lbid " << lbid << ", range size should be <= blocksReadAhead: HWM "
|
|
<< hwm << ", dbroot " << dbRoot << ", highfbo " << highfbo << ", lowfbo " << lowfbo
|
|
<< ", blocksReadAhead " << blocksReadAhead << ", range size " << (int)range.size << endl;
|
|
throw logging::InvalidRangeHWMExcept(os.str());
|
|
}
|
|
|
|
idbassert(range.size <= blocksReadAhead);
|
|
|
|
bc.check(range, QueryContext(numeric_limits<VER_t>::max()), 0, compType, *rCount);
|
|
}
|
|
catch (...)
|
|
{
|
|
// Perform necessary cleanup before rethrowing the exception
|
|
pfb->cond.notify_all();
|
|
|
|
pfbMutex.lock();
|
|
|
|
while (pfb->waiters > 0)
|
|
{
|
|
pfbMutex.unlock();
|
|
// handle race condition with other threads going into wait before the broadcast above
|
|
pfb->cond.notify_one();
|
|
usleep(1);
|
|
pfbMutex.lock();
|
|
}
|
|
|
|
if (pfBlockMap.erase(lowlbid) > 0)
|
|
delete pfb;
|
|
|
|
pfb = nullptr;
|
|
pfbMutex.unlock();
|
|
throw;
|
|
}
|
|
|
|
cleanup:
|
|
pfb->cond.notify_all();
|
|
|
|
pfbMutex.lock();
|
|
|
|
while (pfb->waiters > 0)
|
|
{
|
|
pfbMutex.unlock();
|
|
// handle race condition with other threads going into wait before the broadcast above
|
|
pfb->cond.notify_one();
|
|
usleep(1);
|
|
pfbMutex.lock();
|
|
}
|
|
|
|
if (pfBlockMap.erase(lowlbid) > 0)
|
|
delete pfb;
|
|
|
|
pfb = nullptr;
|
|
pfbMutex.unlock();
|
|
|
|
} // prefetchBlocks()
|
|
|
|
// returns the # that were cached.
|
|
uint32_t loadBlocks(LBID_t* lbids, QueryContext qc, VER_t txn, int compType, uint8_t** bufferPtrs,
|
|
uint32_t* rCount, bool LBIDTrace, uint32_t sessionID, uint32_t blockCount,
|
|
bool* blocksWereVersioned, bool doPrefetch, VSSCache* vssCache)
|
|
{
|
|
blockCacheClient bc(*BRPp[cacheNum(lbids[0])]);
|
|
uint32_t blksRead = 0;
|
|
VSSCache::iterator it;
|
|
uint32_t i, ret;
|
|
bool* vbFlags;
|
|
int* vssRCs;
|
|
bool* cacheThisBlock;
|
|
bool* wasCached;
|
|
|
|
*blocksWereVersioned = false;
|
|
|
|
if (LBIDTrace)
|
|
{
|
|
for (i = 0; i < blockCount; i++)
|
|
{
|
|
stats.touchedLBID(lbids[i], pthread_self(), sessionID);
|
|
}
|
|
}
|
|
|
|
VER_t* vers = (VER_t*)alloca(blockCount * sizeof(VER_t));
|
|
vbFlags = (bool*)alloca(blockCount);
|
|
vssRCs = (int*)alloca(blockCount * sizeof(int));
|
|
cacheThisBlock = (bool*)alloca(blockCount);
|
|
wasCached = (bool*)alloca(blockCount);
|
|
|
|
for (i = 0; i < blockCount; i++)
|
|
{
|
|
if (vssCache)
|
|
{
|
|
it = vssCache->find(lbids[i]);
|
|
|
|
if (it != vssCache->end())
|
|
{
|
|
VSSData& vd = it->second;
|
|
vers[i] = vd.verID;
|
|
vbFlags[i] = vd.vbFlag;
|
|
vssRCs[i] = vd.returnCode;
|
|
|
|
if (vssRCs[i] == ERR_SNAPSHOT_TOO_OLD)
|
|
throw runtime_error("Snapshot too old");
|
|
}
|
|
}
|
|
|
|
if (!vssCache || it == vssCache->end())
|
|
vssRCs[i] = brm->vssLookup(lbids[i], qc, txn, &vers[i], &vbFlags[i]);
|
|
|
|
*blocksWereVersioned |= vbFlags[i];
|
|
|
|
// If the block is being modified by this txn, set the useCache flag to false
|
|
if (txn > 0 && vers[i] == txn && !vbFlags[i])
|
|
cacheThisBlock[i] = false;
|
|
else
|
|
cacheThisBlock[i] = true;
|
|
}
|
|
|
|
/*
|
|
cout << " resolved ver #s: ";
|
|
for (uint32_t i = 0; i < blockCount; i++)
|
|
cout << " <" << vers[i] << ", " << (int) vbFlags[i] << ", " << (int)
|
|
cacheThisBlock[i] << ">";
|
|
cout << endl;
|
|
*/
|
|
|
|
ret = bc.getCachedBlocks(lbids, vers, bufferPtrs, wasCached, blockCount);
|
|
|
|
// Do we want to check any VB flags here? Initial thought: no, because we have
|
|
// no idea whether any other blocks in the prefetch range are versioned,
|
|
// what's the difference if one in the visible range is?
|
|
if (ret != blockCount && doPrefetch)
|
|
{
|
|
prefetchBlocks(lbids[0], compType, &blksRead);
|
|
|
|
if (fPMProfOn)
|
|
pmstats.markEvent(lbids[0], (pthread_t)-1, sessionID, 'M');
|
|
|
|
/* After the prefetch they're all cached if they are in the same range, so
|
|
* prune the block list and try getCachedBlocks again first, then fall back
|
|
* to single-block IO requests if for some reason they aren't. */
|
|
uint32_t l_blockCount = 0;
|
|
|
|
for (i = 0; i < blockCount; i++)
|
|
{
|
|
if (!wasCached[i])
|
|
{
|
|
lbids[l_blockCount] = lbids[i];
|
|
vers[l_blockCount] = vers[i];
|
|
bufferPtrs[l_blockCount] = bufferPtrs[i];
|
|
vbFlags[l_blockCount] = vbFlags[i];
|
|
cacheThisBlock[l_blockCount] = cacheThisBlock[i];
|
|
++l_blockCount;
|
|
}
|
|
}
|
|
|
|
ret += bc.getCachedBlocks(lbids, vers, bufferPtrs, wasCached, l_blockCount);
|
|
|
|
if (ret != blockCount)
|
|
{
|
|
for (i = 0; i < l_blockCount; i++)
|
|
if (!wasCached[i])
|
|
{
|
|
bool ver;
|
|
|
|
qc.currentScn = vers[i];
|
|
bc.getBlock(lbids[i], qc, txn, compType, (void*)bufferPtrs[i], vbFlags[i], wasCached[i], &ver,
|
|
cacheThisBlock[i], false);
|
|
*blocksWereVersioned |= ver;
|
|
blksRead++;
|
|
}
|
|
}
|
|
}
|
|
/* Some blocks weren't cached, prefetch is disabled -> issue single-block IO requests,
|
|
* skip checking the cache again. */
|
|
else if (ret != blockCount)
|
|
{
|
|
for (i = 0; i < blockCount; i++)
|
|
{
|
|
if (!wasCached[i])
|
|
{
|
|
bool ver;
|
|
|
|
qc.currentScn = vers[i];
|
|
bc.getBlock(lbids[i], qc, txn, compType, (void*)bufferPtrs[i], vbFlags[i], wasCached[i], &ver,
|
|
cacheThisBlock[i], false);
|
|
*blocksWereVersioned |= ver;
|
|
blksRead++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rCount)
|
|
*rCount = blksRead;
|
|
|
|
// if (*verBlocks)
|
|
// cout << "loadBlock says there were versioned blocks\n";
|
|
return ret;
|
|
}
|
|
|
|
void loadBlock(uint64_t lbid, QueryContext v, uint32_t t, int compType, void* bufferPtr,
|
|
bool* pWasBlockInCache, uint32_t* rCount, bool LBIDTrace, uint32_t sessionID, bool doPrefetch,
|
|
VSSCache* vssCache)
|
|
{
|
|
bool flg = false;
|
|
BRM::OID_t oid;
|
|
BRM::VER_t txn = (BRM::VER_t)t;
|
|
uint16_t dbRoot = 0;
|
|
uint32_t partitionNum = 0;
|
|
uint16_t segmentNum = 0;
|
|
int rc;
|
|
BRM::VER_t ver;
|
|
blockCacheClient bc(*BRPp[cacheNum(lbid)]);
|
|
char file_name[WriteEngine::FILE_NAME_SIZE] = {0};
|
|
char* fileNamePtr = file_name;
|
|
uint32_t blksRead = 0;
|
|
VSSCache::iterator it;
|
|
|
|
if (LBIDTrace)
|
|
stats.touchedLBID(lbid, pthread_self(), sessionID);
|
|
|
|
if (vssCache)
|
|
{
|
|
it = vssCache->find(lbid);
|
|
|
|
if (it != vssCache->end())
|
|
{
|
|
VSSData& vd = it->second;
|
|
ver = vd.verID;
|
|
flg = vd.vbFlag;
|
|
rc = vd.returnCode;
|
|
}
|
|
}
|
|
|
|
if (!vssCache || it == vssCache->end())
|
|
rc = brm->vssLookup((BRM::LBID_t)lbid, v, txn, &ver, &flg);
|
|
|
|
v.currentScn = ver;
|
|
// cout << "VSS l/u: l=" << lbid << " v=" << ver << " t=" << txn << " flg=" << flg << " rc: " << rc << endl;
|
|
|
|
// if this block is locked by this session, don't cache it, just read it directly from disk
|
|
if (txn > 0 && ver == txn && !flg && !noVB)
|
|
{
|
|
uint64_t offset;
|
|
uint32_t fbo;
|
|
boost::scoped_array<uint8_t> newBufferSa;
|
|
boost::scoped_array<char> cmpHdrBufSa;
|
|
boost::scoped_array<char> cmpBufSa;
|
|
boost::scoped_array<unsigned char> uCmpBufSa;
|
|
|
|
ptrdiff_t alignedBuffer = 0;
|
|
void* readBufferPtr = nullptr;
|
|
char* cmpHdrBuf = nullptr;
|
|
char* cmpBuf = nullptr;
|
|
unsigned char* uCmpBuf = nullptr;
|
|
uint64_t cmpBufLen = 0;
|
|
int blockReadRetryCount = 0;
|
|
unsigned idx = 0;
|
|
int pageSize = getpagesize();
|
|
IDBDataFile* fp = nullptr;
|
|
|
|
try
|
|
{
|
|
rc = brm->lookupLocal((BRM::LBID_t)lbid, ver, flg, oid, dbRoot, partitionNum, segmentNum, fbo);
|
|
|
|
// load the block
|
|
buildOidFileName(oid, dbRoot, partitionNum, segmentNum, fileNamePtr);
|
|
int opts = directIOFlag ? IDBDataFile::USE_ODIRECT : 0;
|
|
fp = IDBDataFile::open(IDBPolicy::getType(fileNamePtr, IDBPolicy::PRIMPROC), fileNamePtr, "r", opts);
|
|
|
|
if (fp == nullptr)
|
|
{
|
|
int errCode = errno;
|
|
SUMMARY_INFO2("open failed: ", fileNamePtr);
|
|
char errbuf[80];
|
|
string errMsg;
|
|
// #if STRERROR_R_CHAR_P
|
|
const char* p;
|
|
|
|
if ((p = strerror_r(errCode, errbuf, 80)) != nullptr)
|
|
errMsg = p;
|
|
|
|
if (errCode == EINVAL)
|
|
{
|
|
throw logging::IDBExcept(logging::IDBErrorInfo::instance()->errorMsg(logging::ERR_O_DIRECT),
|
|
logging::ERR_O_DIRECT);
|
|
}
|
|
|
|
string errStr(fileNamePtr);
|
|
errStr += ": open: ";
|
|
errStr += errMsg;
|
|
throw std::runtime_error(errStr);
|
|
}
|
|
|
|
// fd >= 0 must be true, otherwise above exception thrown.
|
|
offset = (uint64_t)fbo * (uint64_t)DATA_BLOCK_SIZE;
|
|
idx = offset / (4 * 1024 * 1024);
|
|
|
|
errno = 0;
|
|
rc = 0;
|
|
int i = -1;
|
|
|
|
if (compType == 0)
|
|
{
|
|
newBufferSa.reset(new uint8_t[DATA_BLOCK_SIZE + pageSize]);
|
|
alignedBuffer = (ptrdiff_t)newBufferSa.get();
|
|
|
|
if ((alignedBuffer % pageSize) != 0)
|
|
{
|
|
alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
|
|
alignedBuffer += pageSize;
|
|
}
|
|
|
|
readBufferPtr = (void*)alignedBuffer;
|
|
i = fp->pread(readBufferPtr, offset, DATA_BLOCK_SIZE);
|
|
memcpy(bufferPtr, readBufferPtr, i);
|
|
#ifdef IDB_COMP_POC_DEBUG
|
|
{
|
|
boost::mutex::scoped_lock lk(primitiveprocessor::compDebugMutex);
|
|
cout << "pread2(" << fd << ", 0x" << hex << (ptrdiff_t)readBufferPtr << dec << ", "
|
|
<< DATA_BLOCK_SIZE << ", " << offset << ") = " << i << endl;
|
|
}
|
|
#endif // IDB_COMP_POC_DEBUG
|
|
} // if (compType == 0)
|
|
else // if (compType != 0)
|
|
{
|
|
// retry if file is out of sync -- compressed column file only.
|
|
blockReadRetry:
|
|
|
|
uCmpBufSa.reset(new unsigned char[4 * 1024 * 1024 + 4]);
|
|
uCmpBuf = uCmpBufSa.get();
|
|
cmpHdrBufSa.reset(new char[4096 * 3 + pageSize]);
|
|
alignedBuffer = (ptrdiff_t)cmpHdrBufSa.get();
|
|
|
|
if ((alignedBuffer % pageSize) != 0)
|
|
{
|
|
alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
|
|
alignedBuffer += pageSize;
|
|
}
|
|
|
|
cmpHdrBuf = (char*)alignedBuffer;
|
|
|
|
i = fp->pread(&cmpHdrBuf[0], 0, 4096 * 3);
|
|
|
|
CompChunkPtrList ptrList;
|
|
std::unique_ptr<CompressInterface> decompressor(compress::getCompressInterfaceByType(
|
|
compress::CompressInterface::getCompressionType(&cmpHdrBuf[0])));
|
|
|
|
if (!decompressor)
|
|
{
|
|
// Use default?
|
|
decompressor.reset(new compress::CompressInterfaceSnappy());
|
|
}
|
|
|
|
int dcrc = 0;
|
|
|
|
if (i == 4096 * 3)
|
|
{
|
|
uint64_t numHdrs = 0; // extra headers
|
|
dcrc = compress::CompressInterface::getPtrList(&cmpHdrBuf[4096], 4096, ptrList);
|
|
|
|
if (dcrc == 0 && ptrList.size() > 0)
|
|
numHdrs = ptrList[0].first / 4096ULL - 2ULL;
|
|
|
|
if (numHdrs > 0)
|
|
{
|
|
boost::scoped_array<char> nextHdrBufsa(new char[numHdrs * 4096 + pageSize]);
|
|
alignedBuffer = (ptrdiff_t)nextHdrBufsa.get();
|
|
|
|
if ((alignedBuffer % pageSize) != 0)
|
|
{
|
|
alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
|
|
alignedBuffer += pageSize;
|
|
}
|
|
|
|
char* nextHdrBufPtr = (char*)alignedBuffer;
|
|
|
|
i = fp->pread(&nextHdrBufPtr[0], 4096 * 2, numHdrs * 4096);
|
|
|
|
CompChunkPtrList nextPtrList;
|
|
dcrc = compress::CompressInterface::getPtrList(&nextHdrBufPtr[0], numHdrs * 4096, nextPtrList);
|
|
|
|
if (dcrc == 0)
|
|
ptrList.insert(ptrList.end(), nextPtrList.begin(), nextPtrList.end());
|
|
}
|
|
}
|
|
|
|
if (dcrc != 0 || idx >= ptrList.size())
|
|
{
|
|
// Due to race condition, the header on disk may not upated yet.
|
|
// Log an info message and retry.
|
|
if (blockReadRetryCount == 0)
|
|
{
|
|
logging::Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
infoMsg << "retry read from " << fileNamePtr << ". dcrc=" << dcrc << ", idx=" << idx
|
|
<< ", ptr.size=" << ptrList.size();
|
|
args.add(infoMsg.str());
|
|
mlp->logInfoMessage(logging::M0061, args);
|
|
}
|
|
|
|
if (++blockReadRetryCount < 30)
|
|
{
|
|
waitForRetry(blockReadRetryCount);
|
|
goto blockReadRetry;
|
|
}
|
|
else
|
|
{
|
|
rc = -1004;
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
unsigned cmpBlkOff = offset % (4 * 1024 * 1024);
|
|
uint64_t cmpBufOff = ptrList[idx].first;
|
|
uint64_t cmpBufSz = ptrList[idx].second;
|
|
|
|
if (cmpBufSa.get() == nullptr || cmpBufLen < cmpBufSz)
|
|
{
|
|
cmpBufSa.reset(new char[cmpBufSz + pageSize]);
|
|
cmpBufLen = cmpBufSz;
|
|
alignedBuffer = (ptrdiff_t)cmpBufSa.get();
|
|
|
|
if ((alignedBuffer % pageSize) != 0)
|
|
{
|
|
alignedBuffer &= ~((ptrdiff_t)pageSize - 1);
|
|
alignedBuffer += pageSize;
|
|
}
|
|
|
|
cmpBuf = (char*)alignedBuffer;
|
|
}
|
|
|
|
size_t blen = 4 * 1024 * 1024;
|
|
|
|
i = fp->pread(cmpBuf, cmpBufOff, cmpBufSz);
|
|
|
|
dcrc = decompressor->uncompressBlock(cmpBuf, cmpBufSz, uCmpBuf, blen);
|
|
|
|
if (dcrc == 0)
|
|
{
|
|
memcpy(bufferPtr, &uCmpBuf[cmpBlkOff], DATA_BLOCK_SIZE);
|
|
}
|
|
else
|
|
{
|
|
// Due to race condition, the header on disk may not upated yet.
|
|
// Log an info message and retry.
|
|
if (blockReadRetryCount == 0)
|
|
{
|
|
logging::Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
infoMsg << "retry read from " << fileNamePtr << ". dcrc=" << dcrc << ", idx=" << idx
|
|
<< ", ptr.size=" << ptrList.size();
|
|
args.add(infoMsg.str());
|
|
mlp->logInfoMessage(logging::M0061, args);
|
|
}
|
|
|
|
if (++blockReadRetryCount < 30)
|
|
{
|
|
waitForRetry(blockReadRetryCount);
|
|
goto blockReadRetry;
|
|
}
|
|
else
|
|
{
|
|
rc = -1006;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rc < 0)
|
|
{
|
|
string msg("pread failed");
|
|
ostringstream infoMsg;
|
|
infoMsg << " rc:" << rc << ")";
|
|
msg = msg + ", error:" + strerror(errno) + infoMsg.str();
|
|
SUMMARY_INFO(msg);
|
|
// FIXME: free-up allocated memory!
|
|
throw std::runtime_error(msg);
|
|
}
|
|
}
|
|
catch (...)
|
|
{
|
|
delete fp;
|
|
fp = nullptr;
|
|
throw;
|
|
}
|
|
|
|
delete fp;
|
|
fp = nullptr;
|
|
|
|
// log the retries
|
|
if (blockReadRetryCount > 0)
|
|
{
|
|
logging::Message::Args args;
|
|
args.add(oid);
|
|
ostringstream infoMsg;
|
|
infoMsg << "Successfully uncompress " << fileNamePtr << " chunk " << idx << " @"
|
|
<< " blockReadRetry:" << blockReadRetryCount;
|
|
args.add(infoMsg.str());
|
|
mlp->logInfoMessage(logging::M0006, args);
|
|
}
|
|
|
|
if (pWasBlockInCache)
|
|
*pWasBlockInCache = false;
|
|
|
|
if (rCount)
|
|
*rCount = 1;
|
|
|
|
return;
|
|
}
|
|
|
|
FileBuffer* fbPtr = nullptr;
|
|
bool wasBlockInCache = false;
|
|
|
|
fbPtr = bc.getBlockPtr(lbid, ver, flg);
|
|
|
|
if (fbPtr)
|
|
{
|
|
memcpy(bufferPtr, fbPtr->getData(), BLOCK_SIZE);
|
|
wasBlockInCache = true;
|
|
}
|
|
|
|
if (doPrefetch && !wasBlockInCache && !flg)
|
|
{
|
|
prefetchBlocks(lbid, compType, &blksRead);
|
|
|
|
if (fPMProfOn)
|
|
pmstats.markEvent(lbid, (pthread_t)-1, sessionID, 'M');
|
|
|
|
bc.getBlock(lbid, v, txn, compType, (uint8_t*)bufferPtr, flg, wasBlockInCache);
|
|
|
|
if (!wasBlockInCache)
|
|
blksRead++;
|
|
}
|
|
else if (!wasBlockInCache)
|
|
{
|
|
bc.getBlock(lbid, v, txn, compType, (uint8_t*)bufferPtr, flg, wasBlockInCache);
|
|
|
|
if (!wasBlockInCache)
|
|
blksRead++;
|
|
}
|
|
|
|
if (pWasBlockInCache)
|
|
*pWasBlockInCache = wasBlockInCache;
|
|
|
|
if (rCount)
|
|
*rCount = blksRead;
|
|
}
|
|
|
|
struct AsynchLoader
|
|
{
|
|
AsynchLoader(uint64_t l, QueryContext v, uint32_t t, int ct, uint32_t* cCount, uint32_t* rCount, bool trace,
|
|
uint32_t sesID, boost::mutex* m, uint32_t* loaderCount,
|
|
boost::shared_ptr<BPPSendThread> st, // sendThread for abort upon exception.
|
|
VSSCache* vCache)
|
|
: lbid(l)
|
|
, ver(std::move(v))
|
|
, txn(t)
|
|
, compType(ct)
|
|
, LBIDTrace(trace)
|
|
, sessionID(sesID)
|
|
, cacheCount(cCount)
|
|
, readCount(rCount)
|
|
, busyLoaders(loaderCount)
|
|
, mutex(m)
|
|
, sendThread(std::move(st))
|
|
, vssCache(vCache)
|
|
{
|
|
}
|
|
|
|
void operator()()
|
|
{
|
|
utils::setThreadName("PPAsyncLoader");
|
|
bool cached = false;
|
|
uint32_t rCount = 0;
|
|
char buf[BLOCK_SIZE];
|
|
|
|
// cout << "asynch started " << pthread_self() << " l: " << lbid << endl;
|
|
try
|
|
{
|
|
loadBlock(lbid, ver, txn, compType, buf, &cached, &rCount, LBIDTrace, true, vssCache);
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
sendThread->abort();
|
|
cerr << "AsynchLoader caught loadBlock exception: " << ex.what() << endl;
|
|
idbassert(asyncCounter > 0);
|
|
(void)atomicops::atomicDec(&asyncCounter);
|
|
mutex->lock();
|
|
--(*busyLoaders);
|
|
mutex->unlock();
|
|
logging::Message::Args args;
|
|
args.add(string("PrimProc AsyncLoader caught error: "));
|
|
args.add(ex.what());
|
|
primitiveprocessor::mlp->logMessage(logging::M0000, args, false);
|
|
return;
|
|
}
|
|
catch (...)
|
|
{
|
|
sendThread->abort();
|
|
cerr << "AsynchLoader caught unknown exception: " << endl;
|
|
// FIXME Use a locked processor primitive?
|
|
idbassert(asyncCounter > 0);
|
|
(void)atomicops::atomicDec(&asyncCounter);
|
|
mutex->lock();
|
|
--(*busyLoaders);
|
|
mutex->unlock();
|
|
logging::Message::Args args;
|
|
args.add(string("PrimProc AsyncLoader caught unknown error"));
|
|
primitiveprocessor::mlp->logMessage(logging::M0000, args, false);
|
|
return;
|
|
}
|
|
|
|
idbassert(asyncCounter > 0);
|
|
(void)atomicops::atomicDec(&asyncCounter);
|
|
mutex->lock();
|
|
|
|
if (cached)
|
|
(*cacheCount)++;
|
|
|
|
*readCount += rCount;
|
|
--(*busyLoaders);
|
|
mutex->unlock();
|
|
// cerr << "done\n";
|
|
}
|
|
|
|
private:
|
|
uint64_t lbid;
|
|
QueryContext ver;
|
|
uint32_t txn;
|
|
int compType;
|
|
bool LBIDTrace;
|
|
uint32_t sessionID;
|
|
uint32_t* cacheCount;
|
|
uint32_t* readCount;
|
|
uint32_t* busyLoaders;
|
|
boost::mutex* mutex;
|
|
boost::shared_ptr<BPPSendThread> sendThread;
|
|
VSSCache* vssCache;
|
|
};
|
|
|
|
void loadBlockAsync(uint64_t lbid, const QueryContext& c, uint32_t txn, int compType, uint32_t* cCount,
|
|
uint32_t* rCount, bool LBIDTrace, uint32_t sessionID, boost::mutex* m,
|
|
uint32_t* busyLoaders,
|
|
boost::shared_ptr<BPPSendThread> sendThread, // sendThread for abort upon exception.
|
|
VSSCache* vssCache)
|
|
{
|
|
blockCacheClient bc(*BRPp[cacheNum(lbid)]);
|
|
bool vbFlag;
|
|
BRM::VER_t ver;
|
|
VSSCache::iterator it;
|
|
|
|
if (vssCache)
|
|
{
|
|
it = vssCache->find(lbid);
|
|
|
|
if (it != vssCache->end())
|
|
{
|
|
// cout << "async: vss cache hit on " << lbid << endl;
|
|
VSSData& vd = it->second;
|
|
ver = vd.verID;
|
|
vbFlag = vd.vbFlag;
|
|
}
|
|
}
|
|
|
|
if (!vssCache || it == vssCache->end())
|
|
brm->vssLookup((BRM::LBID_t)lbid, c, txn, &ver, &vbFlag);
|
|
|
|
if (bc.exists(lbid, ver))
|
|
return;
|
|
|
|
/* a quick and easy stand-in for a threadpool for loaders */
|
|
atomicops::atomicMb();
|
|
|
|
if (asyncCounter >= asyncMax)
|
|
return;
|
|
|
|
(void)atomicops::atomicInc(&asyncCounter);
|
|
|
|
boost::mutex::scoped_lock sl(*m);
|
|
|
|
try
|
|
{
|
|
boost::thread thd(AsynchLoader(lbid, c, txn, compType, cCount, rCount, LBIDTrace, sessionID, m,
|
|
busyLoaders, sendThread, vssCache));
|
|
(*busyLoaders)++;
|
|
}
|
|
catch (boost::thread_resource_error& e)
|
|
{
|
|
cerr << "AsynchLoader: caught a thread resource error, need to lower asyncMax\n";
|
|
idbassert(asyncCounter > 0);
|
|
(void)atomicops::atomicDec(&asyncCounter);
|
|
}
|
|
}
|
|
|
|
} // namespace primitiveprocessor
|
|
|
|
// #define DCT_DEBUG 1
|
|
#define SETUP_GUARD \
|
|
{ \
|
|
unsigned char* o = outputp.get(); \
|
|
memset(o, 0xa5, ouput_buf_size * 3); \
|
|
}
|
|
#undef SETUP_GUARD
|
|
#define SETUP_GUARD
|
|
#define CHECK_GUARD(lbid) \
|
|
{ \
|
|
unsigned char* o = outputp.get(); \
|
|
for (int i = 0; i < ouput_buf_size; i++) \
|
|
{ \
|
|
if (*o++ != 0xa5) \
|
|
{ \
|
|
cerr << "Buffer underrun on LBID " << lbid << endl; \
|
|
idbassert(0); \
|
|
} \
|
|
} \
|
|
o += ouput_buf_size; \
|
|
for (int i = 0; i < ouput_buf_size; i++) \
|
|
{ \
|
|
if (*o++ != 0xa5) \
|
|
{ \
|
|
cerr << "Buffer overrun on LBID " << lbid << endl; \
|
|
idbassert(0); \
|
|
} \
|
|
} \
|
|
}
|
|
#undef CHECK_GUARD
|
|
#define CHECK_GUARD(x)
|
|
|
|
namespace
|
|
{
|
|
using namespace primitiveprocessor;
|
|
|
|
/** @brief The job type to process a dictionary scan (pDictionaryScan class on the UM)
|
|
* TODO: Move this & the impl into different files
|
|
*/
|
|
class DictScanJob : public threadpool::FairThreadPool::Functor
|
|
{
|
|
public:
|
|
DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock);
|
|
~DictScanJob() override;
|
|
|
|
void write(const SBS);
|
|
int operator()() override;
|
|
void catchHandler(const std::string& ex, uint32_t id, uint16_t code = logging::primitiveServerErr);
|
|
void sendErrorMsg(uint32_t id, uint16_t code);
|
|
|
|
private:
|
|
SP_UM_IOSOCK fIos;
|
|
SBS fByteStream;
|
|
SP_UM_MUTEX fWriteLock;
|
|
posix_time::ptime dieTime;
|
|
};
|
|
|
|
DictScanJob::DictScanJob(SP_UM_IOSOCK ios, SBS bs, SP_UM_MUTEX writeLock)
|
|
: fIos(std::move(ios)), fByteStream(std::move(bs)), fWriteLock(std::move(writeLock))
|
|
{
|
|
dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
|
|
}
|
|
|
|
DictScanJob::~DictScanJob() = default;
|
|
|
|
void DictScanJob::write(const SBS sbs)
|
|
{
|
|
// Here is the fast path for local EM to PM interaction. PM puts into the
|
|
// input EM DEC queue directly.
|
|
// !fWriteLock has a 'same host connection' semantics here.
|
|
if (!fWriteLock)
|
|
{
|
|
fIos->write(sbs);
|
|
return;
|
|
}
|
|
boost::mutex::scoped_lock lk(*fWriteLock);
|
|
fIos->write(*sbs);
|
|
}
|
|
|
|
int DictScanJob::operator()()
|
|
{
|
|
utils::setThreadName("PPDictScanJob");
|
|
uint8_t data[DATA_BLOCK_SIZE];
|
|
// Reducing this buffer one might face unrelated issues in DictScanStep.
|
|
const uint32_t output_buf_size = MAX_BUFFER_SIZE;
|
|
uint32_t session;
|
|
uint32_t uniqueId = 0;
|
|
bool wasBlockInCache = false;
|
|
uint32_t blocksRead = 0;
|
|
uint16_t runCount;
|
|
|
|
boost::shared_ptr<DictEqualityFilter> eqFilter;
|
|
TokenByScanRequestHeader* cmd;
|
|
PrimitiveProcessor pproc(gDebugLevel);
|
|
TokenByScanResultHeader* output;
|
|
QueryContext verInfo;
|
|
|
|
try
|
|
{
|
|
#ifdef DCT_DEBUG
|
|
DebugLevel oldDebugLevel = gDebugLevel;
|
|
gDebugLevel = VERBOSE;
|
|
#endif
|
|
fByteStream->advance(sizeof(TokenByScanRequestHeader));
|
|
*fByteStream >> verInfo;
|
|
cmd = (TokenByScanRequestHeader*)fByteStream->buf();
|
|
|
|
session = cmd->Hdr.SessionID;
|
|
uniqueId = cmd->Hdr.UniqueID;
|
|
runCount = cmd->Count;
|
|
#ifdef VALGRIND
|
|
memset(output, 0, sizeof(TokenByScanResultHeader));
|
|
#endif
|
|
|
|
/* Grab the equality filter if one is specified */
|
|
if (cmd->flags & HAS_EQ_FILTER)
|
|
{
|
|
boost::mutex::scoped_lock sl(eqFilterMutex);
|
|
map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
|
|
it = dictEqualityFilters.find(uniqueId);
|
|
|
|
if (it != dictEqualityFilters.end())
|
|
eqFilter = it->second;
|
|
|
|
sl.unlock();
|
|
|
|
if (!eqFilter)
|
|
{
|
|
if (posix_time::second_clock::universal_time() < dieTime)
|
|
{
|
|
fByteStream->rewind();
|
|
return -1; // it's still being built, wait for it
|
|
}
|
|
else
|
|
return 0; // was probably aborted, go away...
|
|
}
|
|
}
|
|
|
|
for (uint16_t i = 0; i < runCount; ++i)
|
|
{
|
|
SBS results(new ByteStream(output_buf_size));
|
|
output = (TokenByScanResultHeader*)results->getInputPtr();
|
|
|
|
loadBlock(cmd->LBID, verInfo, cmd->Hdr.TransactionID, cmd->CompType, data, &wasBlockInCache,
|
|
&blocksRead, fLBIDTraceOn, session);
|
|
pproc.setBlockPtr((int*)data);
|
|
pproc.p_TokenByScan(cmd, output, output_buf_size, eqFilter);
|
|
|
|
if (wasBlockInCache)
|
|
output->CacheIO++;
|
|
else
|
|
output->PhysicalIO += blocksRead;
|
|
|
|
results->advanceInputPtr(output->NBYTES);
|
|
write(results);
|
|
cmd->LBID++;
|
|
}
|
|
|
|
#ifdef DCT_DEBUG
|
|
gDebugLevel = oldDebugLevel;
|
|
#endif
|
|
}
|
|
catch (logging::IDBExcept& iex)
|
|
{
|
|
cerr << "DictScanJob caught an IDBException: " << iex.what() << endl;
|
|
catchHandler(iex.what(), uniqueId, iex.errorCode());
|
|
}
|
|
catch (std::exception& re)
|
|
{
|
|
cerr << "DictScanJob caught an exception: " << re.what() << endl;
|
|
catchHandler(re.what(), uniqueId);
|
|
}
|
|
catch (...)
|
|
{
|
|
string msg("Unknown exception caught in DictScanJob.");
|
|
cerr << msg << endl;
|
|
catchHandler(msg, uniqueId);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void DictScanJob::catchHandler(const string& ex, uint32_t id, uint16_t code)
|
|
{
|
|
Logger log;
|
|
log.logMessage(ex);
|
|
sendErrorMsg(id, code);
|
|
}
|
|
|
|
void DictScanJob::sendErrorMsg(uint32_t id, uint16_t code)
|
|
{
|
|
ISMPacketHeader ism;
|
|
PrimitiveHeader ph;
|
|
ism.Status = code;
|
|
ph.UniqueID = id;
|
|
|
|
SBS msg(new ByteStream(sizeof(ISMPacketHeader) + sizeof(PrimitiveHeader)));
|
|
msg->append((uint8_t*)&ism, sizeof(ism));
|
|
msg->append((uint8_t*)&ph, sizeof(ph));
|
|
|
|
write(msg);
|
|
}
|
|
|
|
struct BPPHandler
|
|
{
|
|
BPPHandler(PrimitiveServer* ps) : fPrimitiveServerPtr(ps)
|
|
{
|
|
}
|
|
|
|
// Keep a list of keys so that if connection fails we don't leave BPP
|
|
// threads lying around
|
|
std::vector<uint32_t> bppKeys;
|
|
std::vector<uint32_t>::iterator bppKeysIt;
|
|
|
|
~BPPHandler()
|
|
{
|
|
boost::mutex::scoped_lock scoped(bppLock);
|
|
|
|
for (bppKeysIt = bppKeys.begin(); bppKeysIt != bppKeys.end(); ++bppKeysIt)
|
|
{
|
|
uint32_t key = *bppKeysIt;
|
|
BPPMap::iterator it;
|
|
|
|
it = bppMap.find(key);
|
|
|
|
if (it != bppMap.end())
|
|
{
|
|
it->second->abort();
|
|
bppMap.erase(it);
|
|
}
|
|
|
|
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
|
|
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key);
|
|
}
|
|
|
|
scoped.unlock();
|
|
}
|
|
|
|
struct BPPHandlerFunctor : public FairThreadPool::Functor
|
|
{
|
|
BPPHandlerFunctor(boost::shared_ptr<BPPHandler> r, SBS b) : rt(std::move(r)), bs(std::move(b))
|
|
{
|
|
dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
|
|
}
|
|
|
|
boost::shared_ptr<BPPHandler> rt;
|
|
SBS bs;
|
|
posix_time::ptime dieTime;
|
|
};
|
|
|
|
struct LastJoiner : public BPPHandlerFunctor
|
|
{
|
|
LastJoiner(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(std::move(r), std::move(b))
|
|
{
|
|
}
|
|
int operator()() override
|
|
{
|
|
utils::setThreadName("PPHandLastJoiner");
|
|
return rt->lastJoinerMsg(*bs, dieTime);
|
|
}
|
|
};
|
|
|
|
struct Create : public BPPHandlerFunctor
|
|
{
|
|
Create(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(std::move(r), std::move(b))
|
|
{
|
|
}
|
|
int operator()() override
|
|
{
|
|
utils::setThreadName("PPHandCreate");
|
|
rt->createBPP(*bs);
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
struct Destroy : public BPPHandlerFunctor
|
|
{
|
|
Destroy(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(std::move(r), std::move(b))
|
|
{
|
|
}
|
|
int operator()() override
|
|
{
|
|
utils::setThreadName("PPHandDestroy");
|
|
return rt->destroyBPP(*bs, dieTime);
|
|
}
|
|
};
|
|
|
|
struct AddJoiner : public BPPHandlerFunctor
|
|
{
|
|
AddJoiner(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(std::move(r), std::move(b))
|
|
{
|
|
}
|
|
int operator()() override
|
|
{
|
|
utils::setThreadName("PPHandAddJoiner");
|
|
return rt->addJoinerToBPP(*bs, dieTime);
|
|
}
|
|
};
|
|
|
|
struct Abort : public BPPHandlerFunctor
|
|
{
|
|
Abort(boost::shared_ptr<BPPHandler> r, SBS b) : BPPHandlerFunctor(r, b)
|
|
{
|
|
}
|
|
int operator()() override
|
|
{
|
|
utils::setThreadName("PPHandAbort");
|
|
return rt->doAbort(*bs, dieTime);
|
|
}
|
|
};
|
|
|
|
int doAbort(ByteStream& bs, const posix_time::ptime& dieTime)
|
|
{
|
|
uint32_t key;
|
|
BPPMap::iterator it;
|
|
|
|
try
|
|
{
|
|
bs.advance(sizeof(ISMPacketHeader));
|
|
bs >> key;
|
|
}
|
|
catch (...)
|
|
{
|
|
// MCOL-857 We don't have the full packet yet
|
|
bs.rewind();
|
|
return -1;
|
|
}
|
|
|
|
boost::mutex::scoped_lock scoped(bppLock);
|
|
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), key);
|
|
|
|
if (bppKeysIt != bppKeys.end())
|
|
{
|
|
bppKeys.erase(bppKeysIt);
|
|
}
|
|
|
|
it = bppMap.find(key);
|
|
|
|
if (it != bppMap.end())
|
|
{
|
|
it->second->abort();
|
|
bppMap.erase(it);
|
|
}
|
|
else
|
|
{
|
|
if (posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
std::cout << "doAbort: job for key " << key << " has been killed." << std::endl;
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
bs.rewind();
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
scoped.unlock();
|
|
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(key);
|
|
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(key);
|
|
return 0;
|
|
}
|
|
|
|
int doAck(ByteStream& bs)
|
|
{
|
|
uint32_t key;
|
|
int16_t msgCount;
|
|
SBPPV bpps;
|
|
const ISMPacketHeader* ism = (const ISMPacketHeader*)bs.buf();
|
|
|
|
key = ism->Interleave;
|
|
msgCount = (int16_t)ism->Size;
|
|
bs.advance(sizeof(ISMPacketHeader));
|
|
|
|
bpps = grabBPPs(key);
|
|
|
|
if (bpps)
|
|
{
|
|
bpps->getSendThread()->sendMore(msgCount);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
bs.rewind();
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
void createBPP(ByteStream& bs)
|
|
{
|
|
uint32_t i;
|
|
uint32_t key, initMsgsLeft;
|
|
SBPP bpp;
|
|
SBPPV bppv;
|
|
|
|
// make the new BPP object
|
|
bppv.reset(new BPPV(fPrimitiveServerPtr));
|
|
bpp.reset(new BatchPrimitiveProcessor(bs, fPrimitiveServerPtr->prefetchThreshold(), bppv->getSendThread(),
|
|
fPrimitiveServerPtr->ProcessorThreads()));
|
|
|
|
if (bs.length() > 0)
|
|
bs >> initMsgsLeft;
|
|
else
|
|
{
|
|
initMsgsLeft = -1;
|
|
}
|
|
|
|
idbassert(bs.length() == 0);
|
|
bppv->getSendThread()->sendMore(initMsgsLeft);
|
|
bppv->add(bpp);
|
|
|
|
// this block of code creates some BPP instances up front for user queries,
|
|
// seems to get us slightly better query times
|
|
if ((bpp->getSessionID() & 0x80000000) == 0)
|
|
{
|
|
for (i = 1; i < BPPCount / 8; i++)
|
|
{
|
|
SBPP dup = bpp->duplicate();
|
|
|
|
/* Uncomment these lines to verify duplicate(). == op might need updating */
|
|
// if (*bpp != *dup)
|
|
// cerr << "createBPP: duplicate mismatch at index " << i
|
|
// << endl;
|
|
// idbassert(*bpp == *dup);
|
|
bppv->add(dup);
|
|
}
|
|
}
|
|
|
|
boost::mutex::scoped_lock scoped(bppLock);
|
|
key = bpp->getUniqueID();
|
|
bppKeys.push_back(key);
|
|
bool newInsert;
|
|
newInsert = bppMap.insert(pair<uint32_t, SBPPV>(key, bppv)).second;
|
|
scoped.unlock();
|
|
|
|
if (!newInsert)
|
|
{
|
|
if (bpp->getSessionID() & 0x80000000)
|
|
cerr << "warning: createBPP() tried to clobber a BPP with duplicate sessionID & stepID. sessionID="
|
|
<< (int)(bpp->getSessionID() ^ 0x80000000) << " stepID=" << bpp->getStepID() << " (syscat)"
|
|
<< endl;
|
|
else
|
|
cerr << "warning: createBPP() tried to clobber a BPP with duplicate sessionID & stepID. sessionID="
|
|
<< bpp->getSessionID() << " stepID=" << bpp->getStepID() << endl;
|
|
}
|
|
}
|
|
|
|
inline SBPPV grabBPPs(uint32_t uniqueID)
|
|
{
|
|
BPPMap::iterator it;
|
|
|
|
SBPPV ret;
|
|
|
|
boost::mutex::scoped_lock scoped(bppLock);
|
|
it = bppMap.find(uniqueID);
|
|
|
|
if (it != bppMap.end())
|
|
return it->second;
|
|
else
|
|
return SBPPV();
|
|
}
|
|
|
|
inline shared_mutex& getDJLock(uint32_t uniqueID)
|
|
{
|
|
boost::mutex::scoped_lock lk(djMutex);
|
|
auto it = djLock.find(uniqueID);
|
|
if (it != djLock.end())
|
|
return *it->second;
|
|
else
|
|
{
|
|
auto ret = djLock.insert(make_pair(uniqueID, new shared_mutex())).first;
|
|
return *ret->second;
|
|
}
|
|
}
|
|
|
|
inline void deleteDJLock(uint32_t uniqueID)
|
|
{
|
|
boost::mutex::scoped_lock lk(djMutex);
|
|
auto it = djLock.find(uniqueID);
|
|
if (it != djLock.end())
|
|
{
|
|
delete it->second;
|
|
djLock.erase(it);
|
|
}
|
|
}
|
|
|
|
int addJoinerToBPP(ByteStream& bs, const posix_time::ptime& dieTime)
|
|
{
|
|
SBPPV bppv;
|
|
uint32_t uniqueID;
|
|
const uint8_t* buf;
|
|
|
|
/* call addToJoiner() on the first BPP */
|
|
|
|
buf = bs.buf();
|
|
/* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
|
|
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
|
|
|
|
bppv = grabBPPs(uniqueID);
|
|
|
|
if (bppv)
|
|
{
|
|
shared_lock<shared_mutex> lk(getDJLock(uniqueID));
|
|
bppv->get()[0]->addToJoiner(bs);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
if (posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
cout << "addJoinerToBPP: job for id " << uniqueID << " has been killed." << endl;
|
|
return 0;
|
|
}
|
|
else
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
int lastJoinerMsg(ByteStream& bs, const posix_time::ptime& dieTime)
|
|
{
|
|
SBPPV bppv;
|
|
uint32_t uniqueID, i;
|
|
const uint8_t* buf;
|
|
int err;
|
|
|
|
/* call endOfJoiner() on the every BPP */
|
|
|
|
buf = bs.buf();
|
|
/* the uniqueID is after the ISMPacketHeader, sessionID, and stepID */
|
|
uniqueID = *((const uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
|
|
bppv = grabBPPs(uniqueID);
|
|
|
|
if (!bppv)
|
|
{
|
|
if (posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
cout << "LastJoiner: job for id " << uniqueID << " has been killed." << endl;
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
|
|
for (i = 0; i < bppv->get().size(); i++)
|
|
{
|
|
err = bppv->get()[i]->endOfJoiner();
|
|
|
|
if (err == -1)
|
|
{
|
|
if (posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
cout << "LastJoiner: job for id " << uniqueID
|
|
<< " has been killed waiting for joiner messages for too long." << endl;
|
|
return 0;
|
|
}
|
|
else
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
/* Note: some of the duplicate/run/join sync was moved to the BPPV class to do
|
|
more intelligent scheduling. Once the join data is received, BPPV will
|
|
start letting jobs run and create more BPP instances on demand. */
|
|
|
|
bppv->joinDataReceived = true;
|
|
return 0;
|
|
}
|
|
|
|
int destroyBPP(ByteStream& bs, const posix_time::ptime& dieTime)
|
|
{
|
|
uint32_t uniqueID, sessionID, stepID;
|
|
BPPMap::iterator it;
|
|
if (bs.length() < sizeof(ISMPacketHeader) + sizeof(sessionID) + sizeof(stepID) + sizeof(uniqueID))
|
|
{
|
|
// MCOL-857 We don't appear to have the full packet yet!
|
|
return -1;
|
|
}
|
|
|
|
// throw here will be actual error, not a phony one.
|
|
bs.advance(sizeof(ISMPacketHeader));
|
|
bs >> sessionID;
|
|
bs >> stepID;
|
|
bs >> uniqueID;
|
|
|
|
boost::unique_lock<shared_mutex> lk(getDJLock(uniqueID));
|
|
boost::mutex::scoped_lock scoped(bppLock);
|
|
|
|
bppKeysIt = std::find(bppKeys.begin(), bppKeys.end(), uniqueID);
|
|
|
|
if (bppKeysIt != bppKeys.end())
|
|
{
|
|
bppKeys.erase(bppKeysIt);
|
|
}
|
|
|
|
it = bppMap.find(uniqueID);
|
|
|
|
if (it != bppMap.end())
|
|
{
|
|
boost::shared_ptr<BPPV> bppv = it->second;
|
|
|
|
if (bppv->joinDataReceived)
|
|
{
|
|
bppv->abort();
|
|
bppMap.erase(it);
|
|
}
|
|
else
|
|
{
|
|
// MCOL-5. On ubuntu, a crash was happening. Checking
|
|
// joinDataReceived here fixes it.
|
|
// We're not ready for a destroy. Reschedule to wait
|
|
// for all joiners to arrive.
|
|
// TODO there might be no joiners if the query is canceled.
|
|
// The memory will leak.
|
|
// Rewind to the beginning of ByteStream buf b/c of the advance above.
|
|
bs.rewind();
|
|
return -1;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
cout << "destroyBPP: job for id " << uniqueID << " and sessionID " << sessionID << " has been killed."
|
|
<< endl;
|
|
// If for some reason there are jobs for this uniqueID that arrived later
|
|
// they won't leave PP thread pool staying there forever.
|
|
}
|
|
else
|
|
{
|
|
bs.rewind();
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
fPrimitiveServerPtr->getProcessorThreadPool()->removeJobs(uniqueID);
|
|
fPrimitiveServerPtr->getOOBProcessorThreadPool()->removeJobs(uniqueID);
|
|
lk.unlock();
|
|
deleteDJLock(uniqueID);
|
|
return 0;
|
|
}
|
|
|
|
void setBPPToError(uint32_t uniqueID, const string& error, logging::ErrorCodeValues errorCode)
|
|
{
|
|
SBPPV bppv;
|
|
|
|
bppv = grabBPPs(uniqueID);
|
|
|
|
if (!bppv)
|
|
return;
|
|
|
|
for (uint32_t i = 0; i < bppv->get().size(); i++)
|
|
bppv->get()[i]->setError(error, errorCode);
|
|
|
|
if (bppv->get().empty() && !bppMap.empty())
|
|
bppMap.begin()->second.get()->get()[0]->setError(error, errorCode);
|
|
}
|
|
|
|
// Would be good to define the structure of these msgs somewhere...
|
|
inline uint32_t getUniqueID(SBS bs, uint8_t command)
|
|
{
|
|
uint8_t* buf;
|
|
|
|
buf = bs->buf();
|
|
|
|
switch (command)
|
|
{
|
|
case BATCH_PRIMITIVE_ABORT: return *((uint32_t*)&buf[sizeof(ISMPacketHeader)]);
|
|
|
|
case BATCH_PRIMITIVE_ACK:
|
|
{
|
|
ISMPacketHeader* ism = (ISMPacketHeader*)buf;
|
|
return ism->Interleave;
|
|
}
|
|
|
|
case BATCH_PRIMITIVE_ADD_JOINER:
|
|
case BATCH_PRIMITIVE_END_JOINER:
|
|
case BATCH_PRIMITIVE_DESTROY: return *((uint32_t*)&buf[sizeof(ISMPacketHeader) + 2 * sizeof(uint32_t)]);
|
|
|
|
default: return 0;
|
|
}
|
|
}
|
|
|
|
PrimitiveServer* fPrimitiveServerPtr;
|
|
};
|
|
|
|
class DictionaryOp : public FairThreadPool::Functor
|
|
{
|
|
public:
|
|
DictionaryOp(SBS cmd) : bs(std::move(cmd))
|
|
{
|
|
dieTime = posix_time::second_clock::universal_time() + posix_time::seconds(100);
|
|
}
|
|
virtual int execute() = 0;
|
|
int operator()() override
|
|
{
|
|
utils::setThreadName("PPDictOp");
|
|
int ret;
|
|
ret = execute();
|
|
|
|
if (ret != 0)
|
|
{
|
|
bs->rewind();
|
|
|
|
if (posix_time::second_clock::universal_time() > dieTime)
|
|
{
|
|
cout << "DictionaryOp::operator(): job has been killed." << endl;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
protected:
|
|
SBS bs;
|
|
|
|
private:
|
|
posix_time::ptime dieTime;
|
|
};
|
|
|
|
class CreateEqualityFilter : public DictionaryOp
|
|
{
|
|
public:
|
|
CreateEqualityFilter(SBS cmd) : DictionaryOp(cmd)
|
|
{
|
|
}
|
|
int execute() override
|
|
{
|
|
createEqualityFilter();
|
|
return 0;
|
|
}
|
|
|
|
private:
|
|
void createEqualityFilter()
|
|
{
|
|
uint32_t uniqueID, count, i, charsetNumber;
|
|
string str;
|
|
bs->advance(sizeof(ISMPacketHeader));
|
|
*bs >> uniqueID;
|
|
*bs >> charsetNumber;
|
|
|
|
datatypes::Charset cs(charsetNumber);
|
|
boost::shared_ptr<DictEqualityFilter> filter(new DictEqualityFilter(cs));
|
|
|
|
*bs >> count;
|
|
|
|
for (i = 0; i < count; i++)
|
|
{
|
|
*bs >> str;
|
|
filter->insert(str);
|
|
}
|
|
|
|
boost::mutex::scoped_lock sl(eqFilterMutex);
|
|
dictEqualityFilters[uniqueID] = filter;
|
|
}
|
|
};
|
|
|
|
class DestroyEqualityFilter : public DictionaryOp
|
|
{
|
|
public:
|
|
DestroyEqualityFilter(SBS cmd) : DictionaryOp(std::move(cmd))
|
|
{
|
|
}
|
|
int execute() override
|
|
{
|
|
return destroyEqualityFilter();
|
|
}
|
|
|
|
int destroyEqualityFilter()
|
|
{
|
|
uint32_t uniqueID;
|
|
map<uint32_t, boost::shared_ptr<DictEqualityFilter> >::iterator it;
|
|
|
|
bs->advance(sizeof(ISMPacketHeader));
|
|
*bs >> uniqueID;
|
|
|
|
boost::mutex::scoped_lock sl(eqFilterMutex);
|
|
it = dictEqualityFilters.find(uniqueID);
|
|
|
|
if (it != dictEqualityFilters.end())
|
|
{
|
|
dictEqualityFilters.erase(it);
|
|
return 0;
|
|
}
|
|
else
|
|
{
|
|
bs->rewind();
|
|
return -1;
|
|
}
|
|
}
|
|
};
|
|
|
|
struct ReadThread
|
|
{
|
|
ReadThread(const string& serverName, IOSocket& ios, PrimitiveServer* ps)
|
|
: fServerName(serverName), fIos(ios), fPrimitiveServerPtr(ps)
|
|
{
|
|
fBPPHandler.reset(new BPPHandler(ps));
|
|
}
|
|
|
|
const ByteStream buildCacheOpResp(int32_t result)
|
|
{
|
|
const int msgsize = sizeof(ISMPacketHeader) + sizeof(int32_t);
|
|
ByteStream::byte msgbuf[msgsize];
|
|
memset(msgbuf, 0, sizeof(ISMPacketHeader));
|
|
ISMPacketHeader* hdrp = reinterpret_cast<ISMPacketHeader*>(&msgbuf[0]);
|
|
hdrp->Command = CACHE_OP_RESULTS;
|
|
int32_t* resp = reinterpret_cast<int32_t*>(&msgbuf[sizeof(ISMPacketHeader)]);
|
|
*resp = result;
|
|
return ByteStream(msgbuf, msgsize);
|
|
}
|
|
|
|
/* Message format:
|
|
* ISMPacketHeader
|
|
* OID count - 32 bits
|
|
* OID array - 32 bits * count
|
|
*/
|
|
void doCacheFlushByOID(SP_UM_IOSOCK ios, ByteStream& bs)
|
|
{
|
|
uint8_t* buf = bs.buf();
|
|
buf += sizeof(ISMPacketHeader);
|
|
uint32_t count = *((uint32_t*)buf);
|
|
buf += 4;
|
|
uint32_t* oids = (uint32_t*)buf;
|
|
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
blockCacheClient bc(*BRPp[i]);
|
|
bc.flushOIDs(oids, count);
|
|
}
|
|
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
/* Message format:
|
|
* ISMPacketHeader
|
|
* Partition count - 32 bits
|
|
* Partition set - sizeof(LogicalPartition) boost::shared_ptr* count
|
|
* OID count - 32 bits
|
|
* OID array - 32 bits * count
|
|
*/
|
|
void doCacheFlushByPartition(SP_UM_IOSOCK ios, ByteStream& bs)
|
|
{
|
|
set<BRM::LogicalPartition> partitions;
|
|
vector<OID_t> oids;
|
|
|
|
bs.advance(sizeof(ISMPacketHeader));
|
|
deserializeSet<LogicalPartition>(bs, partitions);
|
|
deserializeInlineVector<OID_t>(bs, oids);
|
|
|
|
idbassert(bs.length() == 0);
|
|
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
blockCacheClient bc(*BRPp[i]);
|
|
bc.flushPartition(oids, partitions);
|
|
}
|
|
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
void doCacheFlushCmd(SP_UM_IOSOCK ios, const ByteStream& bs)
|
|
{
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
blockCacheClient bc(*BRPp[i]);
|
|
bc.flushCache();
|
|
}
|
|
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
void doCacheDropFDs(SP_UM_IOSOCK ios, ByteStream& bs)
|
|
{
|
|
std::vector<BRM::FileInfo> files;
|
|
|
|
bs.advance(sizeof(ISMPacketHeader));
|
|
dropFDCache();
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
void doCachePurgeFDs(SP_UM_IOSOCK ios, ByteStream& bs)
|
|
{
|
|
std::vector<BRM::FileInfo> files;
|
|
|
|
bs.advance(sizeof(ISMPacketHeader));
|
|
deserializeInlineVector<BRM::FileInfo>(bs, files);
|
|
purgeFDCache(files);
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
// N.B. this fcn doesn't actually clean the VSS, but rather instructs PP to flush its
|
|
// cache of specific LBID's
|
|
void doCacheCleanVSSCmd(SP_UM_IOSOCK ios, const ByteStream& bs)
|
|
{
|
|
const ByteStream::byte* bytePtr = bs.buf();
|
|
const uint32_t* cntp = reinterpret_cast<const uint32_t*>(&bytePtr[sizeof(ISMPacketHeader)]);
|
|
const LbidAtVer* itemp =
|
|
reinterpret_cast<const LbidAtVer*>(&bytePtr[sizeof(ISMPacketHeader) + sizeof(uint32_t)]);
|
|
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
blockCacheClient bc(*BRPp[i]);
|
|
bc.flushMany(itemp, *cntp);
|
|
}
|
|
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
void doCacheFlushAllversion(SP_UM_IOSOCK ios, const ByteStream& bs)
|
|
{
|
|
const ByteStream::byte* bytePtr = bs.buf();
|
|
const uint32_t* cntp = reinterpret_cast<const uint32_t*>(&bytePtr[sizeof(ISMPacketHeader)]);
|
|
const LBID_t* itemp =
|
|
reinterpret_cast<const LBID_t*>(&bytePtr[sizeof(ISMPacketHeader) + sizeof(uint32_t)]);
|
|
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
{
|
|
blockCacheClient bc(*BRPp[i]);
|
|
bc.flushManyAllversion(itemp, *cntp);
|
|
}
|
|
|
|
ios->write(buildCacheOpResp(0));
|
|
}
|
|
|
|
static void dispatchPrimitive(SBS sbs, boost::shared_ptr<BPPHandler>& fBPPHandler,
|
|
boost::shared_ptr<threadpool::FairThreadPool> procPool,
|
|
std::shared_ptr<threadpool::PriorityThreadPool> OOBProcPool,
|
|
SP_UM_IOSOCK& outIos, SP_UM_MUTEX& writeLock, const uint32_t processorThreads,
|
|
const bool ptTrace)
|
|
{
|
|
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(sbs->buf());
|
|
switch (ismHdr->Command)
|
|
{
|
|
case DICT_CREATE_EQUALITY_FILTER:
|
|
case DICT_DESTROY_EQUALITY_FILTER:
|
|
case BATCH_PRIMITIVE_CREATE:
|
|
case BATCH_PRIMITIVE_ADD_JOINER:
|
|
case BATCH_PRIMITIVE_END_JOINER:
|
|
case BATCH_PRIMITIVE_DESTROY:
|
|
case BATCH_PRIMITIVE_ABORT:
|
|
{
|
|
const uint8_t* buf = sbs->buf();
|
|
uint32_t pos = sizeof(ISMPacketHeader) - 2;
|
|
const uint32_t txnId = *((uint32_t*)&buf[pos + 2]);
|
|
const uint32_t stepID = *((uint32_t*)&buf[pos + 6]);
|
|
const uint32_t uniqueID = *((uint32_t*)&buf[pos + 10]);
|
|
const uint32_t weight = threadpool::MetaJobsInitialWeight;
|
|
const uint32_t priority = 0;
|
|
|
|
uint32_t id = 0;
|
|
boost::shared_ptr<FairThreadPool::Functor> functor;
|
|
if (ismHdr->Command == DICT_CREATE_EQUALITY_FILTER)
|
|
{
|
|
functor.reset(new CreateEqualityFilter(sbs));
|
|
}
|
|
else if (ismHdr->Command == DICT_DESTROY_EQUALITY_FILTER)
|
|
{
|
|
functor.reset(new DestroyEqualityFilter(sbs));
|
|
}
|
|
else if (ismHdr->Command == BATCH_PRIMITIVE_CREATE)
|
|
{
|
|
functor.reset(new BPPHandler::Create(fBPPHandler, sbs));
|
|
}
|
|
else if (ismHdr->Command == BATCH_PRIMITIVE_ADD_JOINER)
|
|
{
|
|
functor.reset(new BPPHandler::AddJoiner(fBPPHandler, sbs));
|
|
}
|
|
else if (ismHdr->Command == BATCH_PRIMITIVE_END_JOINER)
|
|
{
|
|
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
|
|
functor.reset(new BPPHandler::LastJoiner(fBPPHandler, sbs));
|
|
}
|
|
else if (ismHdr->Command == BATCH_PRIMITIVE_DESTROY)
|
|
{
|
|
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
|
|
functor.reset(new BPPHandler::Destroy(fBPPHandler, sbs));
|
|
}
|
|
else if (ismHdr->Command == BATCH_PRIMITIVE_ABORT)
|
|
{
|
|
id = fBPPHandler->getUniqueID(sbs, ismHdr->Command);
|
|
functor.reset(new BPPHandler::Abort(fBPPHandler, sbs));
|
|
}
|
|
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
|
OOBProcPool->addJob(job);
|
|
break;
|
|
}
|
|
|
|
case DICT_TOKEN_BY_SCAN_COMPARE:
|
|
case BATCH_PRIMITIVE_RUN:
|
|
{
|
|
TokenByScanRequestHeader* hdr = nullptr;
|
|
boost::shared_ptr<FairThreadPool::Functor> functor;
|
|
uint32_t id = 0;
|
|
uint32_t weight = 0;
|
|
uint32_t priority = 0;
|
|
uint32_t txnId = 0;
|
|
uint32_t stepID = 0;
|
|
uint32_t uniqueID = 0;
|
|
|
|
if (ismHdr->Command == DICT_TOKEN_BY_SCAN_COMPARE)
|
|
{
|
|
idbassert(sbs->length() >= sizeof(TokenByScanRequestHeader));
|
|
hdr = (TokenByScanRequestHeader*)ismHdr;
|
|
functor.reset(new DictScanJob(outIos, sbs, writeLock));
|
|
id = hdr->Hdr.UniqueID;
|
|
weight = LOGICAL_BLOCK_RIDS;
|
|
priority = hdr->Hdr.Priority;
|
|
const uint8_t* buf = sbs->buf();
|
|
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
|
|
txnId = *((uint32_t*)&buf[pos + 2]);
|
|
stepID = *((uint32_t*)&buf[pos + 6]);
|
|
uniqueID = *((uint32_t*)&buf[pos + 10]);
|
|
}
|
|
else if (ismHdr->Command == BATCH_PRIMITIVE_RUN)
|
|
{
|
|
functor.reset(new BPPSeeder(sbs, writeLock, outIos, processorThreads, ptTrace));
|
|
BPPSeeder* bpps = dynamic_cast<BPPSeeder*>(functor.get());
|
|
id = bpps->getID();
|
|
priority = bpps->priority();
|
|
const uint8_t* buf = sbs->buf();
|
|
const uint32_t pos = sizeof(ISMPacketHeader) - 2;
|
|
txnId = *((uint32_t*)&buf[pos + 2]);
|
|
stepID = *((uint32_t*)&buf[pos + 6]);
|
|
uniqueID = *((uint32_t*)&buf[pos + 10]);
|
|
weight = ismHdr->Size + *((uint32_t*)&buf[pos + 18]) + 100000;
|
|
}
|
|
if (hdr && hdr->flags & IS_SYSCAT)
|
|
{
|
|
PriorityThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
|
OOBProcPool->addJob(job);
|
|
}
|
|
else
|
|
{
|
|
FairThreadPool::Job job(uniqueID, stepID, txnId, functor, outIos, weight, priority, id);
|
|
procPool->addJob(job);
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case BATCH_PRIMITIVE_ACK:
|
|
{
|
|
fBPPHandler->doAck(*sbs);
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
std::ostringstream os;
|
|
Logger log;
|
|
os << "unknown primitive cmd: " << ismHdr->Command;
|
|
log.logMessage(os.str());
|
|
break;
|
|
}
|
|
} // the switch stmt
|
|
}
|
|
|
|
void operator()()
|
|
{
|
|
utils::setThreadName("PPReadThread");
|
|
auto procPool = fPrimitiveServerPtr->getProcessorThreadPool();
|
|
auto OOBProcPool = fPrimitiveServerPtr->getOOBProcessorThreadPool();
|
|
SBS bs;
|
|
UmSocketSelector* pUmSocketSelector = UmSocketSelector::instance();
|
|
|
|
// Establish default output IOSocket (and mutex) based on the input
|
|
// IOSocket. If we end up rotating through multiple output sockets
|
|
// for the same UM, we will use UmSocketSelector to select output.
|
|
SP_UM_IOSOCK outIosDefault(new IOSocket(fIos));
|
|
SP_UM_MUTEX writeLockDefault(new boost::mutex());
|
|
|
|
bool bRotateDest = fPrimitiveServerPtr->rotatingDestination();
|
|
|
|
if (bRotateDest)
|
|
{
|
|
// If we tried adding an IP address not listed as UM in config
|
|
// file; probably a DMLProc connection. We allow the connection
|
|
// but disable destination rotation since not in Columnstore.xml.
|
|
if (!pUmSocketSelector->addConnection(outIosDefault, writeLockDefault))
|
|
{
|
|
bRotateDest = false;
|
|
}
|
|
}
|
|
|
|
SP_UM_IOSOCK outIos(outIosDefault);
|
|
SP_UM_MUTEX writeLock(writeLockDefault);
|
|
|
|
//..Loop to process incoming messages on IOSocket fIos
|
|
for (;;)
|
|
{
|
|
try
|
|
{
|
|
bs = fIos.read();
|
|
}
|
|
catch (...)
|
|
{
|
|
// This connection is dead, nothing useful will come from it ever again
|
|
// We can't rely on the state of bs at this point...
|
|
if (bRotateDest && pUmSocketSelector)
|
|
pUmSocketSelector->delConnection(fIos);
|
|
|
|
fIos.close();
|
|
break;
|
|
}
|
|
|
|
try
|
|
{
|
|
if (bs->length() != 0)
|
|
{
|
|
idbassert(bs->length() >= sizeof(ISMPacketHeader));
|
|
|
|
const ISMPacketHeader* ismHdr = reinterpret_cast<const ISMPacketHeader*>(bs->buf());
|
|
/* This switch is for the OOB commands */
|
|
switch (ismHdr->Command)
|
|
{
|
|
case CACHE_FLUSH_PARTITION:
|
|
doCacheFlushByPartition(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
case CACHE_FLUSH_BY_OID:
|
|
doCacheFlushByOID(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
case CACHE_FLUSH:
|
|
doCacheFlushCmd(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
case CACHE_CLEAN_VSS:
|
|
doCacheCleanVSSCmd(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
case FLUSH_ALL_VERSION:
|
|
doCacheFlushAllversion(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
case CACHE_DROP_FDS:
|
|
doCacheDropFDs(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
case CACHE_PURGE_FDS:
|
|
doCachePurgeFDs(outIos, *bs);
|
|
fIos.close();
|
|
return;
|
|
|
|
default: break;
|
|
}
|
|
dispatchPrimitive(bs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock,
|
|
fPrimitiveServerPtr->ProcessorThreads(), fPrimitiveServerPtr->PTTrace());
|
|
}
|
|
else // bs.length() == 0
|
|
{
|
|
if (bRotateDest)
|
|
pUmSocketSelector->delConnection(fIos);
|
|
|
|
fIos.close();
|
|
break;
|
|
}
|
|
} // the try- surrounding the if stmt
|
|
catch (std::exception& e)
|
|
{
|
|
Logger logger;
|
|
logger.logMessage(e.what());
|
|
}
|
|
}
|
|
}
|
|
|
|
// If this function is called, we have a "bug" of some sort. We added
|
|
// the "fIos" connection to UmSocketSelector earlier, so at the very
|
|
// least, UmSocketSelector should have been able to return that con-
|
|
// nection/port. We will try to recover by using the original fIos to
|
|
// send the response msg; but as stated, if this ever happens we have
|
|
// a bug we need to resolve.
|
|
void handleUmSockSelErr(const string& cmd)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to rotate through socket destinations (" << cmd << ") for connection: " << fIos.toString();
|
|
cerr << oss.str() << endl;
|
|
logging::Message::Args args;
|
|
args.add(oss.str());
|
|
mlp->logMessage(logging::M0058, args, false);
|
|
}
|
|
|
|
~ReadThread() = default;
|
|
string fServerName;
|
|
IOSocket fIos;
|
|
PrimitiveServer* fPrimitiveServerPtr;
|
|
boost::shared_ptr<BPPHandler> fBPPHandler;
|
|
};
|
|
|
|
struct ServerThread
|
|
{
|
|
ServerThread(string serverName, PrimitiveServer* ps) : fServerName(serverName), fPrimitiveServerPtr(ps)
|
|
{
|
|
SUMMARY_INFO2("starting server ", fServerName);
|
|
|
|
bool tellUser = true;
|
|
bool toldUser = false;
|
|
|
|
for (;;)
|
|
{
|
|
try
|
|
{
|
|
mqServerPtr = new MessageQueueServer(fServerName);
|
|
break;
|
|
}
|
|
catch (runtime_error& re)
|
|
{
|
|
string what = re.what();
|
|
|
|
if (what.find("Address already in use") != string::npos)
|
|
{
|
|
if (tellUser)
|
|
{
|
|
cerr << "Address already in use, retrying..." << endl;
|
|
tellUser = false;
|
|
toldUser = true;
|
|
}
|
|
|
|
sleep(5);
|
|
}
|
|
else
|
|
{
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (toldUser)
|
|
cerr << "Ready." << endl;
|
|
}
|
|
|
|
void operator()()
|
|
{
|
|
utils::setThreadName("PPServerThr");
|
|
IOSocket ios;
|
|
|
|
try
|
|
{
|
|
for (;;)
|
|
{
|
|
ios = mqServerPtr->accept();
|
|
// startup a detached thread to handle this socket's I/O
|
|
boost::thread rt(ReadThread(fServerName, ios, fPrimitiveServerPtr));
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
SUMMARY_INFO2("exception caught in ServerThread: ", ex.what());
|
|
}
|
|
catch (...)
|
|
{
|
|
SUMMARY_INFO("exception caught in ServerThread.");
|
|
}
|
|
}
|
|
|
|
string fServerName;
|
|
PrimitiveServer* fPrimitiveServerPtr;
|
|
MessageQueueServer* mqServerPtr;
|
|
};
|
|
|
|
} // namespace
|
|
|
|
namespace primitiveprocessor
|
|
{
|
|
PrimitiveServer::PrimitiveServer(int serverThreads, int serverQueueSize, int processorWeight,
|
|
int processorQueueSize, bool rotatingDestination, uint32_t BRPBlocks,
|
|
int BRPThreads, int cacheCount, int maxBlocksPerRead, int readAheadBlocks,
|
|
uint32_t deleteBlocks, bool ptTrace, double prefetch, uint64_t smallSide)
|
|
: fServerThreads(serverThreads)
|
|
, fServerQueueSize(serverQueueSize)
|
|
, fProcessorWeight(processorWeight)
|
|
, fProcessorQueueSize(processorQueueSize)
|
|
, fMaxBlocksPerRead(maxBlocksPerRead)
|
|
, fReadAheadBlocks(readAheadBlocks)
|
|
, fRotatingDestination(rotatingDestination)
|
|
, fPTTrace(ptTrace)
|
|
, fPrefetchThreshold(prefetch)
|
|
, fPMSmallSide(smallSide)
|
|
{
|
|
fCacheCount = cacheCount;
|
|
fServerpool.setMaxThreads(fServerThreads);
|
|
fServerpool.setQueueSize(fServerQueueSize);
|
|
fServerpool.setName("PrimitiveServer");
|
|
|
|
fProcessorPool.reset(new threadpool::FairThreadPool(fProcessorWeight, highPriorityThreads,
|
|
medPriorityThreads, lowPriorityThreads, 0));
|
|
// We're not using either the priority or the job-clustering features, just need a threadpool
|
|
// that can reschedule jobs, and an unlimited non-blocking queue
|
|
fOOBPool.reset(new threadpool::PriorityThreadPool(1, 5, 0, 0, 1));
|
|
|
|
asyncCounter = 0;
|
|
|
|
brm = new DBRM();
|
|
|
|
BRPp = new BlockRequestProcessor*[fCacheCount];
|
|
|
|
try
|
|
{
|
|
for (int i = 0; i < fCacheCount; i++)
|
|
BRPp[i] = new BlockRequestProcessor(BRPBlocks / fCacheCount, BRPThreads / fCacheCount,
|
|
fMaxBlocksPerRead, deleteBlocks / fCacheCount);
|
|
}
|
|
catch (...)
|
|
{
|
|
cerr << "Unable to allocate " << BRPBlocks
|
|
<< " cache blocks. Adjust the DBBC config parameter "
|
|
"downward."
|
|
<< endl;
|
|
mlp->logMessage(logging::M0045, logging::Message::Args(), true);
|
|
exit(1);
|
|
}
|
|
}
|
|
|
|
PrimitiveServer::~PrimitiveServer()
|
|
{
|
|
}
|
|
|
|
void PrimitiveServer::start(Service* service, utils::USpaceSpinLock& startupRaceLock)
|
|
{
|
|
// start all the server threads
|
|
for (int i = 1; i <= fServerThreads; i++)
|
|
{
|
|
string s("PMS");
|
|
stringstream oss;
|
|
oss << s << i;
|
|
|
|
fServerpool.invoke(ServerThread(oss.str(), this));
|
|
}
|
|
|
|
startupRaceLock.release();
|
|
service->NotifyServiceStarted();
|
|
|
|
std::thread sameHostServerThread(
|
|
[this]()
|
|
{
|
|
utils::setThreadName("PPSHServerThr");
|
|
auto* exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr;
|
|
while (!exeMgrDecPtr)
|
|
{
|
|
sleep(1);
|
|
exeMgrDecPtr = (exemgr::globServiceExeMgr) ? exemgr::globServiceExeMgr->getDec() : nullptr;
|
|
}
|
|
// This is a pseudo socket that puts data into DEC queue directly.
|
|
// It can be used for PP to EM communication only.
|
|
SP_UM_IOSOCK outIos(new IOSocket(new SameNodePseudoSocket(exeMgrDecPtr)));
|
|
// This empty SP transmits "same-host" messaging semantics.
|
|
SP_UM_MUTEX writeLock(nullptr);
|
|
auto procPool = this->getProcessorThreadPool();
|
|
auto OOBProcPool = this->getOOBProcessorThreadPool();
|
|
boost::shared_ptr<BPPHandler> fBPPHandler(new BPPHandler(this));
|
|
for (;;)
|
|
{
|
|
joblist::DistributedEngineComm::SBSVector primitiveMsgs;
|
|
for (auto sbs : exeMgrDecPtr->readLocalQueueMessagesOrWait(primitiveMsgs))
|
|
{
|
|
if (sbs->length() == 0)
|
|
{
|
|
std::cout << "PPSHServerThr got an empty ByteStream." << std::endl;
|
|
continue;
|
|
}
|
|
idbassert(sbs->length() >= sizeof(ISMPacketHeader));
|
|
|
|
ReadThread::dispatchPrimitive(sbs, fBPPHandler, procPool, OOBProcPool, outIos, writeLock,
|
|
this->ProcessorThreads(), this->PTTrace());
|
|
}
|
|
}
|
|
});
|
|
|
|
fServerpool.wait();
|
|
|
|
cerr << "PrimitiveServer::start() exiting!" << endl;
|
|
}
|
|
|
|
BPPV::BPPV(PrimitiveServer* ps)
|
|
{
|
|
sendThread.reset(new BPPSendThread());
|
|
sendThread->setProcessorPool(ps->getProcessorThreadPool());
|
|
v.reserve(BPPCount);
|
|
pos = 0;
|
|
}
|
|
|
|
BPPV::~BPPV()
|
|
{
|
|
}
|
|
|
|
void BPPV::add(boost::shared_ptr<BatchPrimitiveProcessor> a)
|
|
{
|
|
/* Right now BPP initialization locks the object if there is a join to
|
|
prevent the instance from being used before the join data is received.
|
|
The init/join/run sync code is quite old and confusing at this point,
|
|
and this makes it a little worse by circumventing the original design.
|
|
Due for a rewrite. */
|
|
|
|
if (!unusedInstance)
|
|
{
|
|
unusedInstance = a->duplicate();
|
|
|
|
if (a->hasJoin())
|
|
{
|
|
joinDataReceived = false;
|
|
unusedInstance->unlock();
|
|
}
|
|
else
|
|
joinDataReceived = true;
|
|
}
|
|
|
|
v.push_back(a);
|
|
}
|
|
const vector<boost::shared_ptr<BatchPrimitiveProcessor> >& BPPV::get()
|
|
{
|
|
return v;
|
|
}
|
|
|
|
boost::shared_ptr<BatchPrimitiveProcessor> BPPV::next()
|
|
{
|
|
uint32_t size = v.size();
|
|
uint32_t i = 0;
|
|
|
|
// This block of code creates BPP instances if/when they are needed
|
|
|
|
// don't use a processor thread when it will just block, reschedule it instead
|
|
if (!joinDataReceived)
|
|
return boost::shared_ptr<BatchPrimitiveProcessor>();
|
|
|
|
for (i = 0; i < size; i++)
|
|
{
|
|
uint32_t index = (i + pos) % size;
|
|
|
|
if (!(v[index]->busy()))
|
|
{
|
|
pos = (index + 1) % size;
|
|
v[index]->busy(true);
|
|
return v[index];
|
|
}
|
|
}
|
|
|
|
// honor the BPPCount limit, mostly for debugging purposes.
|
|
if (size >= BPPCount)
|
|
return boost::shared_ptr<BatchPrimitiveProcessor>();
|
|
|
|
SBPP newone = unusedInstance->duplicate();
|
|
|
|
if (newone->hasJoin())
|
|
newone->unlock();
|
|
|
|
newone->busy(true);
|
|
v.push_back(newone);
|
|
return newone;
|
|
}
|
|
|
|
void BPPV::abort()
|
|
{
|
|
sendThread->abort();
|
|
BOOST_FOREACH (boost::shared_ptr<BatchPrimitiveProcessor> bpp, v)
|
|
{
|
|
bpp->unlock();
|
|
}
|
|
}
|
|
|
|
bool BPPV::aborted()
|
|
{
|
|
return sendThread->aborted();
|
|
}
|
|
|
|
// end workaround
|
|
|
|
} // namespace primitiveprocessor
|