You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-05 15:41:14 +03:00
clang format apply
This commit is contained in:
@ -51,499 +51,500 @@ boost::mutex fLock;
|
||||
|
||||
namespace dmlprocessor
|
||||
{
|
||||
|
||||
BatchInsertProc::BatchInsertProc(bool isAutocommitOn, uint32_t tableOid, execplan::CalpontSystemCatalog::SCN txnId, BRM::DBRM* aDbrm) :
|
||||
fTxnid(txnId), fIsAutocommitOn(isAutocommitOn), fTableOid(tableOid), fDbrm(aDbrm)
|
||||
BatchInsertProc::BatchInsertProc(bool isAutocommitOn, uint32_t tableOid,
|
||||
execplan::CalpontSystemCatalog::SCN txnId, BRM::DBRM* aDbrm)
|
||||
: fTxnid(txnId), fIsAutocommitOn(isAutocommitOn), fTableOid(tableOid), fDbrm(aDbrm)
|
||||
{
|
||||
fErrorCode = 0;
|
||||
fErrMsg = "";
|
||||
fLastpkg = false;
|
||||
fUniqueId = fDbrm->getUnique64();
|
||||
fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DMLPROC);
|
||||
fWEClient->addQueue(fUniqueId);
|
||||
fOamcache = OamCache::makeOamCache();
|
||||
std::vector<int> moduleIds = fOamcache->getModuleIds();
|
||||
fErrorCode = 0;
|
||||
fErrMsg = "";
|
||||
fLastpkg = false;
|
||||
fUniqueId = fDbrm->getUnique64();
|
||||
fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DMLPROC);
|
||||
fWEClient->addQueue(fUniqueId);
|
||||
fOamcache = OamCache::makeOamCache();
|
||||
std::vector<int> moduleIds = fOamcache->getModuleIds();
|
||||
|
||||
//cout << "moduleIds size is " << moduleIds.size() << endl;
|
||||
for (unsigned i = 0; i < moduleIds.size(); i++)
|
||||
{
|
||||
fPMs.push_back((uint32_t)moduleIds[i]);
|
||||
//cout << "got module id " << (uint32_t)moduleIds[i] << endl;
|
||||
//cout << "fPMs[0] is " << fPMs[0] << endl;
|
||||
}
|
||||
// cout << "moduleIds size is " << moduleIds.size() << endl;
|
||||
for (unsigned i = 0; i < moduleIds.size(); i++)
|
||||
{
|
||||
fPMs.push_back((uint32_t)moduleIds[i]);
|
||||
// cout << "got module id " << (uint32_t)moduleIds[i] << endl;
|
||||
// cout << "fPMs[0] is " << fPMs[0] << endl;
|
||||
}
|
||||
|
||||
fBatchLoader = new BatchLoader(fTableOid, fTxnid, fPMs);
|
||||
fBatchLoader = new BatchLoader(fTableOid, fTxnid, fPMs);
|
||||
|
||||
for (unsigned i = 0; i < fPMs.size(); i++)
|
||||
{
|
||||
fPmState[fPMs[i]] = true;
|
||||
}
|
||||
for (unsigned i = 0; i < fPMs.size(); i++)
|
||||
{
|
||||
fPmState[fPMs[i]] = true;
|
||||
}
|
||||
|
||||
//cout << "Constructor: There are " << (int)fPMs.size() << " pms" << endl;
|
||||
fInsertPkgQueue.reset(new pkg_type);
|
||||
// cout << "Constructor: There are " << (int)fPMs.size() << " pms" << endl;
|
||||
fInsertPkgQueue.reset(new pkg_type);
|
||||
}
|
||||
|
||||
BatchInsertProc::~BatchInsertProc()
|
||||
{
|
||||
fWEClient->removeQueue(fUniqueId);
|
||||
delete fWEClient;
|
||||
fWEClient->removeQueue(fUniqueId);
|
||||
delete fWEClient;
|
||||
}
|
||||
|
||||
uint64_t BatchInsertProc::grabTableLock(int32_t sessionId)
|
||||
{
|
||||
uint32_t processID = ::getpid();
|
||||
int32_t txnId = fTxnid;
|
||||
std::string processName("DMLProc batchinsert");
|
||||
int32_t tmpSessionId = sessionId;
|
||||
uint32_t tableOid = fTableOid;
|
||||
int i = 0;
|
||||
uint32_t processID = ::getpid();
|
||||
int32_t txnId = fTxnid;
|
||||
std::string processName("DMLProc batchinsert");
|
||||
int32_t tmpSessionId = sessionId;
|
||||
uint32_t tableOid = fTableOid;
|
||||
int i = 0;
|
||||
|
||||
try
|
||||
try
|
||||
{
|
||||
// cout << "In grabTableLock, fPMs[0] is "<< fPMs[0] << endl;
|
||||
fTableLockid =
|
||||
fDbrm->getTableLock(fPMs, tableOid, &processName, &processID, &tmpSessionId, &txnId, BRM::LOADING);
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR,
|
||||
IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (fTableLockid == 0)
|
||||
{
|
||||
int waitPeriod = 10;
|
||||
int sleepTime = 100; // sleep 100 milliseconds between checks
|
||||
int numTries = 10; // try 10 times per second
|
||||
string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
|
||||
|
||||
if (waitPeriodStr.length() != 0)
|
||||
waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
|
||||
|
||||
numTries = waitPeriod * 10;
|
||||
struct timespec rm_ts;
|
||||
|
||||
rm_ts.tv_sec = sleepTime / 1000;
|
||||
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
|
||||
|
||||
for (; i < numTries; i++)
|
||||
{
|
||||
//cout << "In grabTableLock, fPMs[0] is "<< fPMs[0] << endl;
|
||||
fTableLockid = fDbrm->getTableLock(fPMs, tableOid, &processName, &processID, &tmpSessionId, &txnId, BRM::LOADING );
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( fTableLockid == 0 )
|
||||
{
|
||||
int waitPeriod = 10;
|
||||
int sleepTime = 100; // sleep 100 milliseconds between checks
|
||||
int numTries = 10; // try 10 times per second
|
||||
string waitPeriodStr = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
|
||||
|
||||
if ( waitPeriodStr.length() != 0 )
|
||||
waitPeriod = static_cast<int>(config::Config::fromText(waitPeriodStr));
|
||||
|
||||
numTries = waitPeriod * 10;
|
||||
struct timespec rm_ts;
|
||||
|
||||
rm_ts.tv_sec = sleepTime / 1000;
|
||||
rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
|
||||
|
||||
for (; i < numTries; i++)
|
||||
{
|
||||
#ifdef _MSC_VER
|
||||
Sleep(rm_ts.tv_sec * 1000);
|
||||
Sleep(rm_ts.tv_sec * 1000);
|
||||
#else
|
||||
struct timespec abs_ts;
|
||||
struct timespec abs_ts;
|
||||
|
||||
do
|
||||
{
|
||||
abs_ts.tv_sec = rm_ts.tv_sec;
|
||||
abs_ts.tv_nsec = rm_ts.tv_nsec;
|
||||
}
|
||||
while (nanosleep(&abs_ts, &rm_ts) < 0);
|
||||
do
|
||||
{
|
||||
abs_ts.tv_sec = rm_ts.tv_sec;
|
||||
abs_ts.tv_nsec = rm_ts.tv_nsec;
|
||||
} while (nanosleep(&abs_ts, &rm_ts) < 0);
|
||||
|
||||
#endif
|
||||
|
||||
try
|
||||
{
|
||||
processID = ::getpid();
|
||||
txnId = fTxnid;
|
||||
tmpSessionId = sessionId;
|
||||
processName = "DMLProc batchinsert";
|
||||
fTableLockid = fDbrm->getTableLock(fPMs, tableOid, &processName, &processID, &tmpSessionId, &txnId, BRM::LOADING );
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
||||
return 0;
|
||||
}
|
||||
try
|
||||
{
|
||||
processID = ::getpid();
|
||||
txnId = fTxnid;
|
||||
tmpSessionId = sessionId;
|
||||
processName = "DMLProc batchinsert";
|
||||
fTableLockid = fDbrm->getTableLock(fPMs, tableOid, &processName, &processID, &tmpSessionId, &txnId,
|
||||
BRM::LOADING);
|
||||
}
|
||||
catch (std::exception&)
|
||||
{
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::INSERT_ERROR,
|
||||
IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (fTableLockid > 0)
|
||||
return fTableLockid;
|
||||
}
|
||||
|
||||
if (i >= numTries) //error out
|
||||
{
|
||||
logging::Message::Args args;
|
||||
string strOp("batch insert");
|
||||
args.add(strOp);
|
||||
args.add(processName);
|
||||
args.add((uint64_t)processID);
|
||||
args.add(tmpSessionId);
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::TABLE_LOCK_ERROR, IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
|
||||
return 0;
|
||||
}
|
||||
if (fTableLockid > 0)
|
||||
return fTableLockid;
|
||||
}
|
||||
|
||||
return fTableLockid;
|
||||
if (i >= numTries) // error out
|
||||
{
|
||||
logging::Message::Args args;
|
||||
string strOp("batch insert");
|
||||
args.add(strOp);
|
||||
args.add(processName);
|
||||
args.add((uint64_t)processID);
|
||||
args.add(tmpSessionId);
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::TABLE_LOCK_ERROR,
|
||||
IDBErrorInfo::instance()->errorMsg(ERR_TABLE_LOCKED, args));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
return fTableLockid;
|
||||
}
|
||||
|
||||
BatchInsertProc::SP_PKG BatchInsertProc::getInsertQueue()
|
||||
BatchInsertProc::SP_PKG BatchInsertProc::getInsertQueue()
|
||||
{
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
return fInsertPkgQueue;
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
return fInsertPkgQueue;
|
||||
}
|
||||
|
||||
void BatchInsertProc::setLastPkg (bool lastPkg)
|
||||
void BatchInsertProc::setLastPkg(bool lastPkg)
|
||||
{
|
||||
fLastpkg = lastPkg;
|
||||
fLastpkg = lastPkg;
|
||||
}
|
||||
void BatchInsertProc::addPkg(messageqcpp::ByteStream& insertBs)
|
||||
{
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
fInsertPkgQueue->push(insertBs);
|
||||
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
fInsertPkgQueue->push(insertBs);
|
||||
}
|
||||
|
||||
messageqcpp::ByteStream BatchInsertProc::getPkg()
|
||||
{
|
||||
messageqcpp::ByteStream bs;
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
bs = fInsertPkgQueue->front();
|
||||
fInsertPkgQueue->pop();
|
||||
return bs;
|
||||
messageqcpp::ByteStream bs;
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
bs = fInsertPkgQueue->front();
|
||||
fInsertPkgQueue->pop();
|
||||
return bs;
|
||||
}
|
||||
|
||||
void BatchInsertProc::buildPkg(messageqcpp::ByteStream& bs)
|
||||
{
|
||||
bs.reset();
|
||||
bs << (ByteStream::byte) WE_SVR_BATCH_INSERT;
|
||||
bs << fUniqueId;
|
||||
bs << (uint32_t) fTxnid;
|
||||
bs << fCurrentPMid; //to keep track of PMs
|
||||
bs += getPkg();
|
||||
bs.reset();
|
||||
bs << (ByteStream::byte)WE_SVR_BATCH_INSERT;
|
||||
bs << fUniqueId;
|
||||
bs << (uint32_t)fTxnid;
|
||||
bs << fCurrentPMid; // to keep track of PMs
|
||||
bs += getPkg();
|
||||
}
|
||||
|
||||
void BatchInsertProc::buildLastPkg(messageqcpp::ByteStream& bs)
|
||||
{
|
||||
ByteStream::byte rt;
|
||||
ByteStream::byte rt;
|
||||
|
||||
// Bug 757. WriteEngineServer deletes metadata if ErrorCode != 0. With range warning,
|
||||
// we still inserted data, just truncated. Kludge to keep WriteEngineServer on track.
|
||||
if (fErrorCode == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
|
||||
rt = 0;
|
||||
else
|
||||
rt = fErrorCode;
|
||||
// Bug 757. WriteEngineServer deletes metadata if ErrorCode != 0. With range warning,
|
||||
// we still inserted data, just truncated. Kludge to keep WriteEngineServer on track.
|
||||
if (fErrorCode == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
|
||||
rt = 0;
|
||||
else
|
||||
rt = fErrorCode;
|
||||
|
||||
bs.reset();
|
||||
bs << (ByteStream::byte) WE_SVR_BATCH_INSERT_END;
|
||||
bs << fUniqueId;
|
||||
bs << (ByteStream::quadbyte) fTxnid;
|
||||
bs << (ByteStream::byte)fIsAutocommitOn;
|
||||
bs << fTableOid;
|
||||
bs << rt;
|
||||
bs.reset();
|
||||
bs << (ByteStream::byte)WE_SVR_BATCH_INSERT_END;
|
||||
bs << fUniqueId;
|
||||
bs << (ByteStream::quadbyte)fTxnid;
|
||||
bs << (ByteStream::byte)fIsAutocommitOn;
|
||||
bs << fTableOid;
|
||||
bs << rt;
|
||||
}
|
||||
|
||||
void BatchInsertProc::sendFirstBatch()
|
||||
{
|
||||
uint32_t firstPmId = 0;
|
||||
int rc = 0;
|
||||
uint32_t firstPmId = 0;
|
||||
int rc = 0;
|
||||
|
||||
try
|
||||
{
|
||||
firstPmId = fBatchLoader->selectNextPM();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, ex.what());
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
firstPmId = fBatchLoader->selectNextPM();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, ex.what());
|
||||
return;
|
||||
}
|
||||
|
||||
if (firstPmId == 0)
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, "Request cannot be sent to PM 0.");
|
||||
return;
|
||||
}
|
||||
if (firstPmId == 0)
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, "Request cannot be sent to PM 0.");
|
||||
return;
|
||||
}
|
||||
|
||||
fCurrentPMid = firstPmId;
|
||||
messageqcpp::ByteStream bs;
|
||||
buildPkg(bs);
|
||||
fCurrentPMid = firstPmId;
|
||||
messageqcpp::ByteStream bs;
|
||||
buildPkg(bs);
|
||||
|
||||
fWEClient->write(bs, fCurrentPMid);
|
||||
//cout << "in sendFirstBatch: batchinsertprocessor sent pkg to pmnum " << fCurrentPMid << endl;
|
||||
fPmState[fCurrentPMid] = false;
|
||||
fWEClient->write(bs, fCurrentPMid);
|
||||
// cout << "in sendFirstBatch: batchinsertprocessor sent pkg to pmnum " << fCurrentPMid << endl;
|
||||
fPmState[fCurrentPMid] = false;
|
||||
}
|
||||
|
||||
void BatchInsertProc::sendNextBatch()
|
||||
{
|
||||
int rc = 0;
|
||||
int rc = 0;
|
||||
|
||||
try
|
||||
{
|
||||
fCurrentPMid = fBatchLoader->selectNextPM();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, ex.what());
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
fCurrentPMid = fBatchLoader->selectNextPM();
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, ex.what());
|
||||
return;
|
||||
}
|
||||
|
||||
messageqcpp::ByteStream bs;
|
||||
buildPkg(bs);
|
||||
string errorMsg;
|
||||
messageqcpp::ByteStream bs;
|
||||
buildPkg(bs);
|
||||
string errorMsg;
|
||||
|
||||
//cout << "in sendNextBatch: fCurrentPMid changed to " << fCurrentPMid << " this = " << this << endl;
|
||||
if (fPmState[fCurrentPMid])
|
||||
// cout << "in sendNextBatch: fCurrentPMid changed to " << fCurrentPMid << " this = " << this << endl;
|
||||
if (fPmState[fCurrentPMid])
|
||||
{
|
||||
// cout << "current pm state for pm is true for pm " << fCurrentPMid << " this = " << this<< endl;
|
||||
fWEClient->write(bs, fCurrentPMid);
|
||||
// cout << "batchinsertprocessor sent pkg to pmnum " << fCurrentPMid << endl;
|
||||
fPmState[fCurrentPMid] = false;
|
||||
// cout << "set pm state to false for pm " << fCurrentPMid << " this = " << this << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
// cout << "current pm state for pm is false for pm " << fCurrentPMid<< endl;
|
||||
while (1)
|
||||
{
|
||||
//cout << "current pm state for pm is true for pm " << fCurrentPMid << " this = " << this<< endl;
|
||||
fWEClient->write(bs, fCurrentPMid);
|
||||
//cout << "batchinsertprocessor sent pkg to pmnum " << fCurrentPMid << endl;
|
||||
fPmState[fCurrentPMid] = false;
|
||||
//cout << "set pm state to false for pm " << fCurrentPMid << " this = " << this << endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
//cout << "current pm state for pm is false for pm " << fCurrentPMid<< endl;
|
||||
while (1)
|
||||
// cout << "Read from WES for pm id " << (uint32_t) tmp32 << endl;
|
||||
bsIn.reset(new ByteStream());
|
||||
fWEClient->read(fUniqueId, bsIn);
|
||||
|
||||
if (bsIn->length() == 0) // read error
|
||||
{
|
||||
errorMsg = "Lost connection to WES.";
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
*bsIn >> tmp8;
|
||||
rc = tmp8;
|
||||
*bsIn >> errorMsg;
|
||||
*bsIn >> tmp32;
|
||||
|
||||
// cout << "got response from WES for pm id " << (uint32_t) tmp32 << endl;
|
||||
if (rc != 0)
|
||||
{
|
||||
//cout << "Read from WES for pm id " << (uint32_t) tmp32 << endl;
|
||||
bsIn.reset(new ByteStream());
|
||||
fWEClient->read(fUniqueId, bsIn);
|
||||
|
||||
if ( bsIn->length() == 0 ) //read error
|
||||
{
|
||||
errorMsg = "Lost connection to WES.";
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
*bsIn >> tmp8;
|
||||
rc = tmp8;
|
||||
*bsIn >> errorMsg;
|
||||
*bsIn >> tmp32;
|
||||
|
||||
//cout << "got response from WES for pm id " << (uint32_t) tmp32 << endl;
|
||||
if (rc != 0)
|
||||
{
|
||||
//cout << "Batch insert got error code:errormsg = " << (uint32_t)tmp8<<":"<<errorMsg<<endl;
|
||||
setError(rc, errorMsg);
|
||||
fPmState[tmp32] = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fPmState[tmp32] = true;
|
||||
|
||||
//cout << "set pm state to true for pm " << (uint32_t) tmp32 << " and current PM id is " << fCurrentPMid<< " this = " << this << endl;
|
||||
if (tmp32 == fCurrentPMid)
|
||||
break;
|
||||
// cout << "Batch insert got error code:errormsg = " << (uint32_t)tmp8<<":"<<errorMsg<<endl;
|
||||
setError(rc, errorMsg);
|
||||
fPmState[tmp32] = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//cout << "before batchinsertprocessor sent pkg to pm " << fCurrentPMid << endl;
|
||||
fWEClient->write(bs, fCurrentPMid);
|
||||
//cout << "batchinsertprocessor sent pkg to pm " << fCurrentPMid << endl;
|
||||
fPmState[fCurrentPMid] = false;
|
||||
//cout << "set pm state to false for pm " << fCurrentPMid << " this = " << this << endl;
|
||||
fPmState[tmp32] = true;
|
||||
|
||||
// cout << "set pm state to true for pm " << (uint32_t) tmp32 << " and current PM id is " <<
|
||||
// fCurrentPMid<< " this = " << this << endl;
|
||||
if (tmp32 == fCurrentPMid)
|
||||
break;
|
||||
}
|
||||
|
||||
// cout << "before batchinsertprocessor sent pkg to pm " << fCurrentPMid << endl;
|
||||
fWEClient->write(bs, fCurrentPMid);
|
||||
// cout << "batchinsertprocessor sent pkg to pm " << fCurrentPMid << endl;
|
||||
fPmState[fCurrentPMid] = false;
|
||||
// cout << "set pm state to false for pm " << fCurrentPMid << " this = " << this << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void BatchInsertProc::sendlastBatch()
|
||||
{
|
||||
messageqcpp::ByteStream bs;
|
||||
buildLastPkg(bs);
|
||||
messageqcpp::ByteStream bs;
|
||||
buildLastPkg(bs);
|
||||
|
||||
try
|
||||
{
|
||||
fWEClient->write_to_all(bs);
|
||||
//cout << "sent the last pkg" << endl;
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
ostringstream oss;
|
||||
oss << "Exception on communicating to WES ";
|
||||
oss << ex.what();
|
||||
setError(1, oss.str());
|
||||
//cout << oss.str() << endl;
|
||||
}
|
||||
try
|
||||
{
|
||||
fWEClient->write_to_all(bs);
|
||||
// cout << "sent the last pkg" << endl;
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
ostringstream oss;
|
||||
oss << "Exception on communicating to WES ";
|
||||
oss << ex.what();
|
||||
setError(1, oss.str());
|
||||
// cout << oss.str() << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void BatchInsertProc::receiveOutstandingMsg()
|
||||
{
|
||||
//check how many message we need to receive
|
||||
uint32_t messagesNotReceived = 0;
|
||||
int rc = 0;
|
||||
// check how many message we need to receive
|
||||
uint32_t messagesNotReceived = 0;
|
||||
int rc = 0;
|
||||
|
||||
for (unsigned i = 0; i < fPMs.size(); i++)
|
||||
for (unsigned i = 0; i < fPMs.size(); i++)
|
||||
{
|
||||
if (!fPmState[fPMs[i]])
|
||||
messagesNotReceived++;
|
||||
}
|
||||
|
||||
// cout << "receiveOutstandingMsg: Need to receive " << messagesNotReceived << " messages. this = " << this
|
||||
// << endl;
|
||||
if ((messagesNotReceived > 0) && (messagesNotReceived <= fWEClient->getPmCount()))
|
||||
{
|
||||
string errorMsg;
|
||||
uint32_t msgReceived = 0;
|
||||
|
||||
while (1)
|
||||
{
|
||||
if (!fPmState[fPMs[i]])
|
||||
messagesNotReceived++;
|
||||
}
|
||||
if (msgReceived == messagesNotReceived)
|
||||
break;
|
||||
|
||||
//cout << "receiveOutstandingMsg: Need to receive " << messagesNotReceived << " messages. this = " << this << endl;
|
||||
if ((messagesNotReceived > 0) && (messagesNotReceived <= fWEClient->getPmCount()))
|
||||
{
|
||||
string errorMsg;
|
||||
uint32_t msgReceived = 0;
|
||||
bsIn.reset(new ByteStream());
|
||||
|
||||
while (1)
|
||||
try
|
||||
{
|
||||
fWEClient->read(fUniqueId, bsIn);
|
||||
|
||||
if (bsIn->length() == 0) // read error
|
||||
{
|
||||
if (msgReceived == messagesNotReceived)
|
||||
break;
|
||||
|
||||
bsIn.reset(new ByteStream());
|
||||
|
||||
try
|
||||
{
|
||||
fWEClient->read(fUniqueId, bsIn);
|
||||
|
||||
if ( bsIn->length() == 0 ) //read error
|
||||
{
|
||||
rc = 1;
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
else
|
||||
{
|
||||
*bsIn >> tmp8;
|
||||
*bsIn >> errorMsg;
|
||||
*bsIn >> tmp32;
|
||||
|
||||
fPmState[tmp32] = true;
|
||||
msgReceived++;
|
||||
|
||||
if ( tmp8 != 0 )
|
||||
setError(tmp8, errorMsg);
|
||||
}
|
||||
}
|
||||
catch (runtime_error& ex) //write error
|
||||
{
|
||||
errorMsg = ex.what();
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
break;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
errorMsg = "Lost connection to WES.";
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
break;
|
||||
}
|
||||
rc = 1;
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
*bsIn >> tmp8;
|
||||
*bsIn >> errorMsg;
|
||||
*bsIn >> tmp32;
|
||||
|
||||
fPmState[tmp32] = true;
|
||||
msgReceived++;
|
||||
|
||||
if (tmp8 != 0)
|
||||
setError(tmp8, errorMsg);
|
||||
}
|
||||
}
|
||||
catch (runtime_error& ex) // write error
|
||||
{
|
||||
errorMsg = ex.what();
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
break;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
errorMsg = "Lost connection to WES.";
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BatchInsertProc::receiveAllMsg()
|
||||
{
|
||||
uint32_t msgRevd = 0;
|
||||
int rc = 0;
|
||||
string errorMsg;
|
||||
uint32_t msgRevd = 0;
|
||||
int rc = 0;
|
||||
string errorMsg;
|
||||
|
||||
try
|
||||
try
|
||||
{
|
||||
while (1)
|
||||
{
|
||||
while (1)
|
||||
if (msgRevd == fWEClient->getPmCount())
|
||||
break;
|
||||
|
||||
// cout << "Read last from WES bytestream" << endl;
|
||||
fWEClient->read(fUniqueId, bsIn);
|
||||
|
||||
if (bsIn->length() == 0) // read error
|
||||
{
|
||||
errorMsg = "Lost connection to WES.";
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
msgRevd++;
|
||||
|
||||
if (!fIsAutocommitOn)
|
||||
{
|
||||
if (msgRevd == fWEClient->getPmCount())
|
||||
break;
|
||||
|
||||
//cout << "Read last from WES bytestream" << endl;
|
||||
fWEClient->read(fUniqueId, bsIn);
|
||||
|
||||
if ( bsIn->length() == 0 ) //read error
|
||||
{
|
||||
errorMsg = "Lost connection to WES.";
|
||||
setError(dmlpackageprocessor::DMLPackageProcessor::NETWORK_ERROR, errorMsg);
|
||||
msgRevd++;
|
||||
|
||||
if (!fIsAutocommitOn)
|
||||
{
|
||||
BulkSetHWMArgs setHWMArgs;
|
||||
fHwmArgsAllPms.push_back(setHWMArgs);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
*bsIn >> tmp8;
|
||||
rc = tmp8;
|
||||
*bsIn >> errorMsg;
|
||||
msgRevd++;
|
||||
|
||||
if (!fIsAutocommitOn) //collect Hwm
|
||||
{
|
||||
BulkSetHWMArgs setHWMArgs;
|
||||
deserializeInlineVector(*(bsIn.get()), setHWMArgs);
|
||||
fHwmArgsAllPms.push_back(setHWMArgs);
|
||||
|
||||
}
|
||||
|
||||
if (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
|
||||
{
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
else if (rc != 0)
|
||||
{
|
||||
//cout << "Batch insert lastpkg got error code:errormsg = " << (uint32_t)tmp8<<":"<<errorMsg<<endl;
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
}
|
||||
BulkSetHWMArgs setHWMArgs;
|
||||
fHwmArgsAllPms.push_back(setHWMArgs);
|
||||
}
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
ostringstream oss;
|
||||
oss << "Exception on communicating to WES ";
|
||||
oss << ex.what();
|
||||
rc = 1;
|
||||
setError(rc, oss.str());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
*bsIn >> tmp8;
|
||||
rc = tmp8;
|
||||
*bsIn >> errorMsg;
|
||||
msgRevd++;
|
||||
|
||||
if (!fIsAutocommitOn && (fHwmArgsAllPms.size() == fWEClient->getPmCount()))
|
||||
setHwm();
|
||||
if (!fIsAutocommitOn) // collect Hwm
|
||||
{
|
||||
BulkSetHWMArgs setHWMArgs;
|
||||
deserializeInlineVector(*(bsIn.get()), setHWMArgs);
|
||||
fHwmArgsAllPms.push_back(setHWMArgs);
|
||||
}
|
||||
|
||||
if (rc == dmlpackageprocessor::DMLPackageProcessor::IDBRANGE_WARNING)
|
||||
{
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
else if (rc != 0)
|
||||
{
|
||||
// cout << "Batch insert lastpkg got error code:errormsg = " << (uint32_t)tmp8<<":"<<errorMsg<<endl;
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
ostringstream oss;
|
||||
oss << "Exception on communicating to WES ";
|
||||
oss << ex.what();
|
||||
rc = 1;
|
||||
setError(rc, oss.str());
|
||||
}
|
||||
|
||||
if (!fIsAutocommitOn && (fHwmArgsAllPms.size() == fWEClient->getPmCount()))
|
||||
setHwm();
|
||||
}
|
||||
|
||||
void BatchInsertProc::collectHwm()
|
||||
{
|
||||
BulkSetHWMArgs setHWMArgs;
|
||||
//cout << "received from WES bytestream length = " << bsIn->length() << endl;
|
||||
deserializeInlineVector(*(bsIn.get()), setHWMArgs);
|
||||
//cout << "get hwm info from WES size " << setHWMArgs.size() << endl;
|
||||
fHwmArgsAllPms.push_back(setHWMArgs);
|
||||
BulkSetHWMArgs setHWMArgs;
|
||||
// cout << "received from WES bytestream length = " << bsIn->length() << endl;
|
||||
deserializeInlineVector(*(bsIn.get()), setHWMArgs);
|
||||
// cout << "get hwm info from WES size " << setHWMArgs.size() << endl;
|
||||
fHwmArgsAllPms.push_back(setHWMArgs);
|
||||
}
|
||||
|
||||
void BatchInsertProc::setHwm()
|
||||
{
|
||||
std::vector<BRM::BulkSetHWMArg> allHwm;
|
||||
BulkSetHWMArgs::const_iterator itor;
|
||||
std::vector<BRM::BulkSetHWMArg> allHwm;
|
||||
BulkSetHWMArgs::const_iterator itor;
|
||||
|
||||
//cout << "total hwmArgsAllPms size " << hwmArgsAllPms.size() << endl;
|
||||
for (unsigned i = 0; i < fWEClient->getPmCount(); i++)
|
||||
// cout << "total hwmArgsAllPms size " << hwmArgsAllPms.size() << endl;
|
||||
for (unsigned i = 0; i < fWEClient->getPmCount(); i++)
|
||||
{
|
||||
itor = fHwmArgsAllPms[i].begin();
|
||||
|
||||
while (itor != fHwmArgsAllPms[i].end())
|
||||
{
|
||||
itor = fHwmArgsAllPms[i].begin();
|
||||
|
||||
while (itor != fHwmArgsAllPms[i].end())
|
||||
{
|
||||
allHwm.push_back(*itor);
|
||||
//cout << "received hwm info: " << itor->oid << ":" << itor->hwm << endl;
|
||||
itor++;
|
||||
}
|
||||
allHwm.push_back(*itor);
|
||||
// cout << "received hwm info: " << itor->oid << ":" << itor->hwm << endl;
|
||||
itor++;
|
||||
}
|
||||
}
|
||||
|
||||
if (allHwm.size() > 0)
|
||||
if (allHwm.size() > 0)
|
||||
{
|
||||
// cout << "setting hwm allHwm size " << allHwm.size() << endl;
|
||||
int rc = fDbrm->bulkSetHWM(allHwm, 0);
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
//cout << "setting hwm allHwm size " << allHwm.size() << endl;
|
||||
int rc = fDbrm->bulkSetHWM(allHwm, 0);
|
||||
|
||||
if ( rc != 0 )
|
||||
{
|
||||
string errorMsg;
|
||||
BRM::errString(rc, errorMsg);
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
string errorMsg;
|
||||
BRM::errString(rc, errorMsg);
|
||||
setError(rc, errorMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void BatchInsertProc::setError(int errorCode, std::string errMsg)
|
||||
{
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
fErrorCode = errorCode;
|
||||
fErrMsg = errMsg;
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
fErrorCode = errorCode;
|
||||
fErrMsg = errMsg;
|
||||
}
|
||||
void BatchInsertProc::getError(int& errorCode, std::string& errMsg)
|
||||
{
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
errorCode = fErrorCode;
|
||||
errMsg = fErrMsg;
|
||||
}
|
||||
boost::mutex::scoped_lock lk(fLock);
|
||||
errorCode = fErrorCode;
|
||||
errMsg = fErrMsg;
|
||||
}
|
||||
} // namespace dmlprocessor
|
||||
// vim:ts=4 sw=4:
|
||||
|
Reference in New Issue
Block a user