mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
1631 lines
42 KiB
C++
1631 lines
42 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
/*******************************************************************************
|
|
* $Id$
|
|
*
|
|
*******************************************************************************/
|
|
/*
|
|
* we_dataloader.cpp
|
|
*
|
|
* Created on: Oct 4, 2011
|
|
* Author: Boby Paul: bpaul@calpont.com
|
|
*/
|
|
|
|
#include "mcsconfig.h" // Used to pickup STRERROR_R_CHAR_P definition
|
|
|
|
#include <cstdlib>
|
|
#include <csignal>
|
|
#include <cstring>
|
|
#include <cerrno>
|
|
|
|
#include <unistd.h> //pipe() && fork()
|
|
#include <wait.h> //wait()
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <time.h>
|
|
#include "bytestream.h"
|
|
#include "rwlock_local.h"
|
|
|
|
#include <iostream>
|
|
#include <fstream>
|
|
#include <vector>
|
|
#include <queue>
|
|
#include <string>
|
|
#include <map>
|
|
using namespace std;
|
|
|
|
#include <boost/thread/condition.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/thread.hpp>
|
|
#include <boost/filesystem.hpp>
|
|
using namespace boost;
|
|
|
|
#include "bytestream.h"
|
|
#include "messagequeue.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "we_messages.h"
|
|
#include "we_brmrprtparser.h"
|
|
#include "we_cleartablelockcmd.h"
|
|
#include "we_dataloader.h"
|
|
#include "we_readthread.h"
|
|
|
|
#include "installdir.h"
|
|
|
|
namespace WriteEngine
|
|
{
|
|
// bool WEDataLoader::fTearDownCpimport=false; // @bug 4267
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief WEDataLoader::Constructor
|
|
*
|
|
**/
|
|
WEDataLoader::WEDataLoader(SplitterReadThread& Srt)
|
|
: fRef(Srt)
|
|
, fMode(-1)
|
|
, fDataDumpFile()
|
|
, fTxBytes(0)
|
|
, fRxBytes(0)
|
|
, fPmId(0)
|
|
, fCh_pid(0)
|
|
, fThis_pid(0)
|
|
, fP_pid(0)
|
|
, fpCfThread(0)
|
|
, fTearDownCpimport(false)
|
|
, // @bug 4267
|
|
fWaitPidRc(0)
|
|
, // @bug 4267
|
|
fWaitPidStatus(0)
|
|
, // @bug 4267
|
|
fForceKill(false)
|
|
, fPipeErr(false)
|
|
, fpSysLog(0)
|
|
{
|
|
Config weConfig;
|
|
uint16_t localModuleId = weConfig.getLocalModuleID();
|
|
fPmId = static_cast<char>(localModuleId);
|
|
|
|
srand(time(NULL)); // initialize random seed
|
|
int aObjId = rand() % 10000 + 1; // generate a random number
|
|
|
|
setObjId(aObjId);
|
|
|
|
setupSignalHandlers();
|
|
|
|
if (!fpSysLog)
|
|
{
|
|
fpSysLog = SimpleSysLog::instance();
|
|
fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
|
|
}
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief WEDataLoader::Destructor
|
|
*
|
|
**/
|
|
|
|
WEDataLoader::~WEDataLoader()
|
|
{
|
|
try
|
|
{
|
|
if (fDataDumpFile.is_open())
|
|
fDataDumpFile.close();
|
|
|
|
cout << "\tRx Bytes " << getRxBytes() << endl;
|
|
cout << "\tTX Bytes " << getTxBytes() << endl;
|
|
|
|
cout << "\tChild PID " << getChPid() << endl;
|
|
|
|
if (getChPid())
|
|
{
|
|
if (2 == getMode()) //@bug 5012
|
|
{
|
|
kill(getChPid(), SIGINT);
|
|
teardownCpimport(fTearDownCpimport);
|
|
}
|
|
else
|
|
{
|
|
teardownCpimport(false); // @bug 4267
|
|
}
|
|
}
|
|
}
|
|
catch (std::exception&) // @bug 4164: exception causing thread to exit
|
|
{
|
|
cout << "Error tearing down cpimport in WEDataLoader destructor" << endl;
|
|
}
|
|
|
|
// cout << "Leaving WEDataLoader destructor" << endl;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initialize signal handling
|
|
//------------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::setupSignalHandlers()
|
|
{
|
|
signal(SIGPIPE, SIG_IGN);
|
|
signal(SIGCHLD, WEDataLoader::onSigChild);
|
|
}
|
|
//------------------------------------------------------------------------------
|
|
// handles on signal Terminate
|
|
//------------------------------------------------------------------------------
|
|
void WEDataLoader::onSigChild(int aInt)
|
|
{
|
|
std::string aStr = "Received SIGCHLD of terminated process..";
|
|
cout << aStr << endl;
|
|
// fTearDownCpimport = true; // @bug 4267
|
|
|
|
// commented out for non-static variables
|
|
// ostringstream oss;
|
|
// oss << getObjId() <<" : " <<aStr;
|
|
// logging::Message::Args errMsgArgs;
|
|
// errMsgArgs.add(oss.str());
|
|
// fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief WEDataLoader::update
|
|
*
|
|
**/
|
|
|
|
bool WEDataLoader::update(Subject* pSub)
|
|
{
|
|
return true;
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief setup cpimport as a seperate process
|
|
*
|
|
**/
|
|
|
|
bool WEDataLoader::setupCpimport() // fork the cpimport
|
|
{
|
|
pid_t aChPid;
|
|
|
|
errno = 0;
|
|
|
|
if (pipe(fFIFO) == -1)
|
|
{
|
|
int errnum = errno;
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Error in creating pipe (errno-" << errnum << "); " << strerror(errnum);
|
|
throw runtime_error(oss.str());
|
|
}
|
|
|
|
setPid(getpid());
|
|
setPPid(getppid());
|
|
|
|
errno = 0;
|
|
aChPid = fork();
|
|
|
|
if (aChPid == -1) // an error caused
|
|
{
|
|
int errnum = errno;
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Error in forking cpimport.bin (errno-" << errnum << "); " << strerror(errnum);
|
|
throw runtime_error(oss.str());
|
|
}
|
|
else if (aChPid == 0) // we are in child
|
|
{
|
|
int aStartFD = 3;
|
|
int aEndFD = fFIFO[1] + 256;
|
|
close(fFIFO[1]); // close the WRITER of CHILD
|
|
|
|
cout << "Child Process Info: PID = " << getpid() << " (fFIFO[0], fFIFO[1]) = (" << fFIFO[0] << ","
|
|
<< fFIFO[1] << ")"
|
|
<< " (StartFD, EndFD) = (" << aStartFD << "," << aEndFD << ")" << endl;
|
|
|
|
std::vector<char*> Cmds;
|
|
// str2Argv(fCmdLineStr, Cmds); // to avoid out-of-scope problem
|
|
std::string aCmdLine = fCmdLineStr;
|
|
std::istringstream ss(aCmdLine);
|
|
std::string arg;
|
|
std::vector<std::string> v2(20, "");
|
|
unsigned int i = 0;
|
|
|
|
while (ss >> arg)
|
|
{
|
|
v2[i++] = arg;
|
|
}
|
|
|
|
for (unsigned int j = 0; j < i; ++j)
|
|
{
|
|
Cmds.push_back(const_cast<char*>(v2[j].c_str()));
|
|
}
|
|
|
|
Cmds.push_back(0); // null terminate
|
|
// updatePrgmPath(Cmds);
|
|
|
|
// NOTE: for debugging
|
|
int aSize = Cmds.size();
|
|
|
|
for (int aIdx = 0; aIdx < aSize; ++aIdx)
|
|
{
|
|
cout << "Args " << Cmds[aIdx] << endl;
|
|
}
|
|
|
|
cout.flush();
|
|
|
|
close(0); // close stdin for the child
|
|
dup2(fFIFO[0], 0); // make stdin be the reading end of the pipe
|
|
|
|
// BUG 4410 : hacky solution so that CHLD process get EOF on close of pipe
|
|
for (int i = aStartFD; i < aEndFD; i++)
|
|
close(i);
|
|
|
|
errno = 0;
|
|
int aRet = execvp(Cmds[0], &Cmds[0]); // NOTE - works with full Path
|
|
// int aRet = execvp(Cmds[0], &Cmds[0]); //NOTE - works if $PATH has cpimport
|
|
|
|
int execvErrno = errno;
|
|
cout << "Return status of cpimport is " << aRet << endl;
|
|
cout.flush();
|
|
close(fFIFO[0]); // will trigger an EOF on stdin
|
|
ostringstream oss;
|
|
oss << getObjId() << " : execv error: cpimport.bin invocation failed; "
|
|
<< "(errno-" << errno << "); " << strerror(execvErrno) << "; Check file and try invoking locally.";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
|
|
if (aRet == -1)
|
|
exit(-1);
|
|
}
|
|
else // parent
|
|
{
|
|
setChPid(aChPid); // This is the child PID
|
|
cout << "Child PID is " << this->getChPid() << endl;
|
|
close(fFIFO[0]); // close the READER of PARENT
|
|
// now we can send all the data thru FIFO[1], writer of PARENT
|
|
}
|
|
|
|
if (aChPid == 0)
|
|
cout << "******** Child finished its work ********" << endl;
|
|
|
|
return true;
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief close all file handles opened for cpimport
|
|
* @brief wait for the cpimport process to finish work
|
|
*
|
|
**/
|
|
|
|
void WEDataLoader::teardownCpimport(bool useStoredWaitPidStatus) // @bug 4267
|
|
{
|
|
fTearDownCpimport = false; // Reset it
|
|
// cout << "Tearing down Cpimport" << endl;
|
|
int aStatus;
|
|
|
|
// cout << "checking fpCfThread value" << endl;
|
|
if (fpCfThread)
|
|
{
|
|
// cout << "checking fpCfThread has a valid value" << endl;
|
|
|
|
// wait until we are done with the queued messages
|
|
while ((!fpCfThread->isMsgQueueEmpty()) && (!fpCfThread->isStopped()))
|
|
{
|
|
// cout << "DEBUG : MsgQueue not empty" << endl;
|
|
// cannot be too low, since there is a lock in isMsgQueueEmpty()
|
|
usleep(2000000);
|
|
}
|
|
|
|
// while(fpCfThread->isPushing())
|
|
// {
|
|
// cout << "DEBUG : still pushing" << endl;
|
|
// usleep(100000);
|
|
// }
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Message Queue is empty; Stopping CF Thread";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
fpCfThread->stopThread();
|
|
|
|
while (!fpCfThread->isStopped())
|
|
{
|
|
cout << "DEBUG : still not stopped" << endl;
|
|
usleep(100000);
|
|
}
|
|
|
|
delete fpCfThread;
|
|
fpCfThread = 0;
|
|
}
|
|
|
|
closeWritePipe();
|
|
pid_t aPid;
|
|
|
|
// @bug 4267 begin: call waitpid() to get job status or use stored job status
|
|
// aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
|
|
if (useStoredWaitPidStatus)
|
|
{
|
|
aPid = fWaitPidRc;
|
|
aStatus = fWaitPidStatus;
|
|
}
|
|
else
|
|
{
|
|
// aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
|
|
aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs
|
|
int aIdx = 0;
|
|
|
|
while ((aPid == 0) && (aIdx < 25 * MAX_QSIZE)) // Do not loop infinitly
|
|
{
|
|
usleep(2000000);
|
|
aPid = waitpid(getChPid(), &aStatus, WNOHANG);
|
|
cout << "Inside tearDown waitpid rc[" << aIdx << "] = " << aPid << endl;
|
|
++aIdx;
|
|
}
|
|
}
|
|
|
|
// @bug 4267 end // BP - added -1 as per DMC comment below
|
|
if ((aPid == getChPid()) || (aPid == -1)) // @bug 4267 (DMC-shouldn't we check for aPid of -1?)
|
|
{
|
|
setChPid(0);
|
|
|
|
if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0))
|
|
{
|
|
cout << "\tCpimport exit on success" << endl;
|
|
ostringstream oss;
|
|
oss << getObjId() << " : cpimport exit on success";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
|
|
onCpimportSuccess();
|
|
}
|
|
else
|
|
{
|
|
int termsig = (WIFSIGNALED(aStatus) ? WTERMSIG(aStatus) : -1);
|
|
|
|
if (!fForceKill)
|
|
{
|
|
cout << "\tCpimport exit on failure (signal " << termsig << ')' << endl;
|
|
ostringstream oss;
|
|
oss << getObjId() << " : cpimport exit on failure (signal " << termsig << ')';
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
onCpimportFailure();
|
|
}
|
|
else
|
|
{
|
|
cout << "\tCpimport exit on Force Kill!!" << endl;
|
|
ostringstream oss;
|
|
oss << getObjId() << " : cpimport exit on Force kill!!";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
onCpimportSuccess();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief Push the data to cpimport from the incoming ByteStream
|
|
* @param Incoming ByteStream
|
|
*
|
|
*/
|
|
//
|
|
void WEDataLoader::pushData2Cpimport(ByteStream& Ibs)
|
|
{
|
|
if (Ibs.length() > 0)
|
|
{
|
|
int aLen = Ibs.length();
|
|
char* pStart = reinterpret_cast<char*>(Ibs.buf());
|
|
char* pEnd = pStart + aLen;
|
|
char* pPtr = pStart;
|
|
|
|
while (pPtr < pEnd)
|
|
{
|
|
// if(pEnd > (pPtr + MAX_LEN))
|
|
// {
|
|
// int aRet = write(fFIFO[1], pPtr, MAX_LEN);
|
|
// if(aRet == -1) throw runtime_error("Pipe write error");
|
|
// //write(fFIFO[1], Ibs.buf(), Ibs.length());
|
|
// pPtr += MAX_LEN;
|
|
// }
|
|
// else
|
|
// {
|
|
// int aStrLen = pEnd - pPtr;
|
|
// int aRet = write(fFIFO[1], pPtr, aStrLen);
|
|
// if(aRet == -1) throw runtime_error("Pipe write error");
|
|
// pPtr += aStrLen;
|
|
// }
|
|
|
|
try
|
|
{
|
|
int aRet = write(fFIFO[1], pPtr, pEnd - pPtr);
|
|
|
|
if (aRet < 0)
|
|
{
|
|
if (!fPipeErr)
|
|
{
|
|
int e = errno;
|
|
fPipeErr = true;
|
|
std::string aStr = "pushing data : PIPE error .........";
|
|
|
|
char errMsgBuf[160];
|
|
#if STRERROR_R_CHAR_P
|
|
const char* pErrMsg = strerror_r(e, errMsgBuf, sizeof(errMsgBuf));
|
|
|
|
if (pErrMsg)
|
|
aStr += pErrMsg;
|
|
|
|
#else
|
|
int errMsgRc = strerror_r(e, errMsgBuf, sizeof(errMsgBuf));
|
|
|
|
if (errMsgRc == 0)
|
|
aStr += errMsgBuf;
|
|
|
|
#endif
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(aStr);
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
}
|
|
|
|
throw runtime_error("Pipe Error - cpimport.bin exited already!!");
|
|
}
|
|
|
|
pPtr += aRet;
|
|
}
|
|
catch (...)
|
|
{
|
|
// std::string aStr = "pushing data PIPE error .........";
|
|
// logging::Message::Args errMsgArgs;
|
|
// errMsgArgs.add(aStr);
|
|
// fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
throw runtime_error("Pipe Error - cpimport.bin exited already!!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief Close the pipe through which data was written to cpimport
|
|
* @brief This will also signal a EOF to the reading pipe.
|
|
*
|
|
**/
|
|
|
|
void WEDataLoader::closeWritePipe()
|
|
{
|
|
cout << "Going to close call = " << fFIFO[1] << endl;
|
|
// NOTE this will flush the file buffer and close it.
|
|
int aRet = close(fFIFO[1]); // will trigger EOD
|
|
cout << "----- closed both pipes -------- aRet = " << aRet << endl;
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief Tokenize a string into char** argv format and store in a vector
|
|
* @brief we pass the V as arguments to exec cpimport
|
|
* @param CmdLine is the string form of arguments demlimited by space
|
|
* @param V vector which contains each element of argv
|
|
*
|
|
**/
|
|
|
|
void WEDataLoader::str2Argv(std::string CmdLine, std::vector<char*>& V)
|
|
{
|
|
std::istringstream ss(CmdLine);
|
|
std::string arg;
|
|
std::vector<std::string> v2;
|
|
|
|
while (ss >> arg)
|
|
{
|
|
v2.push_back(arg);
|
|
V.push_back(const_cast<char*>(v2.back().c_str()));
|
|
}
|
|
|
|
V.push_back(0); // null terminate
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief Event to trigger when cpimport is successful.
|
|
*
|
|
**/
|
|
|
|
void WEDataLoader::onCpimportSuccess()
|
|
{
|
|
ByteStream obs;
|
|
|
|
cout << "Sending BRMRPT" << endl;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
// for testing
|
|
// std::string fRptFileName("ReportFile.txt");
|
|
BrmReportParser aBrmRptParser;
|
|
bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);
|
|
|
|
if (aRet)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
// aBrmRptParser.unserialize(obs); - was for testing
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
cout << "Finished Sending BRMRPT" << endl;
|
|
}
|
|
else
|
|
{
|
|
cout << "Failed to serialize BRMRpt " << endl;
|
|
}
|
|
|
|
// if(remove(fBrmRptFileName.c_str()) != 0)
|
|
// cout <<"Failed to delete BRMRpt File "<< fBrmRptFileName << endl;
|
|
// usleep(1000000); //sleep 1 second.
|
|
|
|
obs.reset();
|
|
obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
|
|
cout << "Sent CPIPASS info" << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : onCpimportSuccess BrmReport Send";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
*
|
|
* @brief Event to trigger if a cpimport failure occurs.
|
|
*
|
|
**/
|
|
void WEDataLoader::onCpimportFailure()
|
|
{
|
|
// Send failure notice back to the parent splitter job
|
|
sendCpimportFailureNotice();
|
|
|
|
// Even if we failed, we have failure info in BRMRPT
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
BrmReportParser aBrmRptParser;
|
|
bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);
|
|
|
|
if (aRet)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
cout << "Broken Pipe .." << ex.what() << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed; " << ex.what();
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
cout << "Finished Sending BRMRPT" << endl;
|
|
}
|
|
|
|
if (remove(fBrmRptFileName.c_str()) != 0)
|
|
cout << "Failed to delete BRMRpt File " << fBrmRptFileName << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : onCpimportFailure BrmReport Send";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// Send msg to front-end splitter to notify it that a cpimport.bin pgm failed.
|
|
//-----------------------------------------------------------------------------
|
|
void WEDataLoader::sendCpimportFailureNotice()
|
|
{
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
}
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief Event when a KEEPALIVE arrives.
|
|
* @param Incoming ByteStream, not used currently
|
|
*/
|
|
void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs)
|
|
{
|
|
/*
|
|
// TODO comment out when we done with debug
|
|
if(fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() <<" : Received KEEPALIVE";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
*/
|
|
// cout << "Received KEEPALIVE" << endl;
|
|
// NOTE only seldom a KEEPALIVE receives,
|
|
// so nothing wrong in responding with a KEEPALIVE.
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << ": Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
|
|
// @bug 4267 begin
|
|
int aStatus;
|
|
pid_t aPid;
|
|
|
|
if (getChPid() > 0)
|
|
{
|
|
aPid = waitpid(getChPid(), &aStatus, WNOHANG); // wait until cpimport finishs
|
|
|
|
if (aPid != 0)
|
|
{
|
|
cout << "waitpid(" << getChPid() << "): rc-" << aPid << "; status-" << aStatus << "; exited-"
|
|
<< (WIFEXITED(aStatus)) << endl;
|
|
}
|
|
|
|
if ((aPid == getChPid()) || (aPid == -1))
|
|
{
|
|
fTearDownCpimport = true;
|
|
fWaitPidRc = aPid;
|
|
fWaitPidStatus = aStatus;
|
|
}
|
|
}
|
|
|
|
// @bug 4267 end
|
|
|
|
if (fTearDownCpimport)
|
|
{
|
|
// fTearDownCpimport = false; //Reset it // commented out to use the flag in EOD
|
|
cout << "Cpimport terminated " << endl;
|
|
|
|
if (0 == getMode())
|
|
onReceiveEod(Ibs);
|
|
else if (1 == getMode()) // mode 1 has to drive from UM
|
|
{
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_EOD;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << ": Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
else if (2 == getMode())
|
|
{
|
|
// if(getChPid()) teardownCpimport(true); // @bug 4267
|
|
if (getChPid())
|
|
teardownCpimport(fTearDownCpimport); // @bug 4267 BP
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// if(1 == getMode())
|
|
// if(fpCfThread) cout << "Queue Size = " << fpCfThread->getQueueSize() << endl;
|
|
}
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief trigger when a DATA arrives.
|
|
* @param Incoming ByteStream which contains data
|
|
*/
|
|
|
|
void WEDataLoader::onReceiveData(ByteStream& Ibs)
|
|
{
|
|
if ((0 == getMode()) && (fDataDumpFile.is_open()))
|
|
{
|
|
// Will write to the output file.
|
|
fDataDumpFile << Ibs;
|
|
sendDataRequest();
|
|
}
|
|
else if (1 == getMode())
|
|
{
|
|
// commented out since we are going to use seperate thread
|
|
// pushData2Cpimport(Ibs);
|
|
|
|
if (fpCfThread)
|
|
{
|
|
fpCfThread->add2MsgQueue(Ibs);
|
|
// sendDataRequest(); // Need to control Queue Size
|
|
// Bug 5031 : Will only send 1 rqst for a batch to cpimport.bin
|
|
// if(fpCfThread->getQueueSize()<MIN_QSIZE) sendDataRequest();
|
|
// if(fpCfThread->getQueueSize()<MAX_QSIZE) sendDataRequest();
|
|
|
|
int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;
|
|
|
|
// Bug 5031 : If Q size goes above 100 (2*250);
|
|
if (aQsz < MAX_QSIZE)
|
|
sendDataRequest();
|
|
|
|
if (aQsz > 1.5 * static_cast<int>(MAX_QSIZE)) // > 2*250
|
|
{
|
|
cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (2 == getMode())
|
|
{
|
|
cout << "onReceiveData : In Mode 2 NO data suppose to receive" << endl;
|
|
}
|
|
|
|
// ByteStream obs;
|
|
// obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
|
|
// obs << (ByteStream::byte)fPmId; // PM id
|
|
// updateTxBytes(obs.length());
|
|
// fRef.fIos.write(obs);
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief trigger when a EOD arrives.
|
|
* @param Incoming ByteStream; not relevent for now
|
|
*/
|
|
void WEDataLoader::onReceiveEod(ByteStream& Ibs)
|
|
{
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : onReceiveEOD : child ID = " << getChPid();
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
cout << "Received EOD " << endl;
|
|
|
|
if (0 == getMode())
|
|
{
|
|
fDataDumpFile.close();
|
|
}
|
|
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_EOD;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
|
|
// if(( 1 == getMode())||( 2 == getMode()))
|
|
if (1 == getMode()) // BUG 4370 - seperated mode 1 & 2
|
|
{
|
|
// if(getChPid()) teardownCpimport(false); // @bug 4267
|
|
if (getChPid())
|
|
teardownCpimport(fTearDownCpimport); // @bug 4267 //BP changed to send the correct flag
|
|
}
|
|
else if (2 == getMode()) // BUG 4370
|
|
{
|
|
if (getChPid())
|
|
{
|
|
kill(getChPid(), SIGINT); // BUG 4370
|
|
fForceKill = true;
|
|
teardownCpimport(fTearDownCpimport); // BUG 4370
|
|
}
|
|
}
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief Event on Command Received. It should contain sub commands
|
|
* @param Incoming ByteStream, will have sub commands
|
|
*/
|
|
void WEDataLoader::onReceiveCmd(ByteStream& bs)
|
|
{
|
|
// TODO - can be cpimport cmd or server cmd, for now write to a file
|
|
ByteStream::byte aCmdId;
|
|
bs >> aCmdId;
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief The mode in which WES running.
|
|
* @param Incoming ByteStream, not relevent
|
|
*/
|
|
void WEDataLoader::onReceiveMode(ByteStream& Ibs)
|
|
{
|
|
// Assigning it here since WEDataLoader constructor is called multiple times
|
|
// while coping in readthread class.
|
|
if (!fpSysLog)
|
|
{
|
|
fpSysLog = SimpleSysLog::instance();
|
|
fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
|
|
}
|
|
|
|
Ibs >> (ByteStream::quadbyte&)fMode;
|
|
cout << "Setting fMode = " << fMode << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : onReceiveMode() Setting fMode = " << fMode;
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
char aName[64];
|
|
snprintf(aName, sizeof(aName), "ModuleDBRootCount%d-3", fPmId);
|
|
|
|
string aStrDbRootCnt = config::Config::makeConfig()->getConfig("SystemModuleConfig", aName);
|
|
cout << "DbRootCnt = " << aStrDbRootCnt << endl;
|
|
ByteStream::byte aDbCnt = (ByteStream::byte)atoi(aStrDbRootCnt.c_str());
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : onReceiveMode() DbRoot Count = " + aStrDbRootCnt;
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
// Send No. of DBRoots to Client
|
|
ByteStream aObs;
|
|
aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT;
|
|
aObs << (ByteStream::byte)fPmId;
|
|
aObs << (ByteStream::byte)aDbCnt;
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(aObs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(aObs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief Acknowledgment. Not relevant at this point of time.
|
|
* @brief Can make use to update the BRM
|
|
* @param Incoming ByteStream, not relevant for now
|
|
*/
|
|
|
|
void WEDataLoader::onReceiveAck(ByteStream& Ibs)
|
|
{
|
|
// All is good
|
|
// update the status
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief NAK. A Failure, Rollback should be initiated.
|
|
* @brief Can make use to update the BRM
|
|
* @param Incoming ByteStream, not relevant for now
|
|
*/
|
|
void WEDataLoader::onReceiveNak(ByteStream& Ibs)
|
|
{
|
|
// TODO - handle the problem
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
/**
|
|
* @brief ERROR. A Failure, Rollback should be initiated.
|
|
* @brief Can make use to update the BRM
|
|
* @param Incoming ByteStream, not relevant for now
|
|
*/
|
|
void WEDataLoader::onReceiveError(ByteStream& Ibs)
|
|
{
|
|
// TODO - handle the failure situation.
|
|
}
|
|
//------------------------------------------------------------------------------
|
|
// onReceiveCmdLineArgs - do what ever need to do with command line args
|
|
//------------------------------------------------------------------------------
|
|
/**
|
|
* @brief Command line args received.
|
|
* @brief Can make use to update the BRM
|
|
* @param Incoming ByteStream, not relevant for now
|
|
*/
|
|
|
|
void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs)
|
|
{
|
|
Ibs >> fCmdLineStr;
|
|
cout << "CMD LINE ARGS came in " << fCmdLineStr << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : CMD LINE ARGS came in " << fCmdLineStr;
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
ByteStream obs;
|
|
|
|
// TODO - Need to check all clear for starting CPI
|
|
if (0 != getMode())
|
|
{
|
|
obs << (ByteStream::byte)WE_CLT_SRV_STARTCPI;
|
|
}
|
|
else
|
|
{
|
|
obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
|
|
}
|
|
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveStartCpimport()
|
|
{
|
|
cout << "Start Cpimport command reached!!" << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Start Cpimport command reached!!";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
try
|
|
{
|
|
setupCpimport();
|
|
|
|
if (1 == getMode()) // create a thread to handle the data feeding part
|
|
{
|
|
fpCfThread = new WECpiFeederThread(*this);
|
|
fpCfThread->startFeederThread();
|
|
}
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
// send an CPI FAIL command back to splitter
|
|
if (fpSysLog)
|
|
{
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(ex.what());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
|
|
|
|
sendCpimportFailureNotice();
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (1 == getMode()) // In mode 2/0 we do not rqst data.
|
|
{
|
|
sendDataRequest();
|
|
}
|
|
|
|
// We need to respond to KEEP ALIVES
|
|
// else if(2 == getMode()) // Now we wait till cpimport comes back
|
|
//{
|
|
// if(getChPid())
|
|
// teardownCpimport();
|
|
//}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveBrmRptFileName(ByteStream& Ibs)
|
|
{
|
|
Ibs >> fBrmRptFileName;
|
|
cout << "Brm Rpt Filename Arrived " << fBrmRptFileName << endl;
|
|
|
|
// BUG 4645
|
|
string::size_type idx = fBrmRptFileName.find_last_of('/');
|
|
|
|
if (idx > 0 && idx < string::npos)
|
|
{
|
|
string dirname(fBrmRptFileName, 0, idx);
|
|
struct stat st;
|
|
|
|
if (stat(dirname.c_str(), &st) != 0)
|
|
{
|
|
cout << "Creating directory : " << dirname << endl;
|
|
boost::filesystem::create_directories(dirname.c_str());
|
|
}
|
|
}
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Brm Rpt Filename Arrived " << fBrmRptFileName;
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveCleanup(ByteStream& Ibs)
|
|
{
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : OnReceiveCleanup arrived";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
std::string aErrMsg;
|
|
|
|
WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
|
|
int aRet = aClrTblLockCmd.processCleanup(Ibs, aErrMsg);
|
|
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_CLEANUP;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
|
|
if (aRet == 0)
|
|
obs << (ByteStream::byte)1; // cleanup success
|
|
else
|
|
obs << (ByteStream::byte)0; // cleanup failed
|
|
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveRollback(ByteStream& Ibs)
|
|
{
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : OnReceiveRollback arrived";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
|
|
}
|
|
|
|
std::string aErrMsg;
|
|
|
|
WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
|
|
int aRet = aClrTblLockCmd.processRollback(Ibs, aErrMsg);
|
|
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
|
|
if (aRet == 0)
|
|
obs << (ByteStream::byte)1; // Rollback success
|
|
else
|
|
obs << (ByteStream::byte)0; // Rollback failed
|
|
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
//-----------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs)
|
|
{
|
|
bool aGoodFile = true;
|
|
std::string aFileName;
|
|
Ibs >> aFileName;
|
|
|
|
// BUG 4245 : Need to check the file or path exists
|
|
{
|
|
std::fstream aFin;
|
|
aFin.open(aFileName.c_str(), std::ios::in);
|
|
|
|
if (aFin.is_open()) // File exist, send an ERROR immediately
|
|
{
|
|
// file exists
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
|
|
obs << (ByteStream::byte)fPmId;
|
|
updateTxBytes(obs.length());
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aGoodFile = false;
|
|
aLock.unlock();
|
|
}
|
|
|
|
aFin.close();
|
|
}
|
|
|
|
if (aGoodFile)
|
|
{
|
|
fDataDumpFile.open(aFileName.c_str(), std::ios::app);
|
|
|
|
// BUG 4245 : If file dir is not existing, we need to fail this import
|
|
if (!fDataDumpFile.good())
|
|
{
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
}
|
|
}
|
|
//------------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveJobId(ByteStream& Ibs)
|
|
{
|
|
std::string aJobFileName;
|
|
Ibs >> aJobFileName;
|
|
|
|
cout << "Incoming JobFileName : " << aJobFileName << endl;
|
|
|
|
// BUG 4645
|
|
string::size_type idx = aJobFileName.find_last_of('/');
|
|
|
|
if (idx > 0 && idx < string::npos)
|
|
{
|
|
string dirname(aJobFileName, 0, idx);
|
|
struct stat st;
|
|
|
|
if (stat(dirname.c_str(), &st) != 0)
|
|
{
|
|
cout << "Creating directory : " << dirname << endl;
|
|
boost::filesystem::create_directories(dirname.c_str());
|
|
}
|
|
}
|
|
|
|
fJobFile.open(aJobFileName.c_str());
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveJobData(ByteStream& Ibs)
|
|
{
|
|
// Will write to the output file.
|
|
std::string aData;
|
|
Ibs >> aData;
|
|
fJobFile << aData;
|
|
fJobFile.close();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs)
|
|
{
|
|
std::string aErrFileName;
|
|
Ibs >> aErrFileName;
|
|
cout << "Error Filename Arrived " << aErrFileName << endl;
|
|
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_ERRLOG;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
obs << aErrFileName;
|
|
BrmReportParser aErrFileParser;
|
|
bool aRet = aErrFileParser.serialize(aErrFileName, obs);
|
|
|
|
if (aRet)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
|
|
// delete the temp files
|
|
if (remove(aErrFileName.c_str()) != 0)
|
|
cout << "Failed in removing Error file: " << aErrFileName << endl;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Process the receipt of a msg containing the contents of a *.bad file.
|
|
//------------------------------------------------------------------------------
|
|
void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs)
|
|
{
|
|
std::string aBadFileName;
|
|
Ibs >> aBadFileName;
|
|
cout << "Error Filename Arrived " << aBadFileName << endl;
|
|
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_BADLOG;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
obs << aBadFileName;
|
|
BrmReportParser aBadFileParser;
|
|
bool aRet = aBadFileParser.serializeBlocks(aBadFileName, obs);
|
|
|
|
if (aRet)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
|
|
// delete the temp files
|
|
if (remove(aBadFileName.c_str()) != 0)
|
|
cout << "Failed in removing Error file: " << aBadFileName << endl;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::sendDataRequest()
|
|
{
|
|
int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;
|
|
|
|
// if(aQsz>MIN_QSIZE)
|
|
// Bug 5031 : If Q size goes above 100 (2*50); there is some thing wrong
|
|
// will put a warning in info log. Controlled in Cpimport init data rqst cnt
|
|
if (aQsz > MAX_QSIZE) // >250
|
|
{
|
|
cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
boost::mutex::scoped_lock aLock(fClntMsgMutex);
|
|
ByteStream obs;
|
|
obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
|
|
obs << (ByteStream::byte)fPmId; // PM id
|
|
updateTxBytes(obs.length());
|
|
|
|
try
|
|
{
|
|
fRef.fIos.write(obs);
|
|
}
|
|
catch (...)
|
|
{
|
|
cout << "Broken Pipe .." << endl;
|
|
|
|
if (fpSysLog)
|
|
{
|
|
ostringstream oss;
|
|
oss << getObjId() << " : Broken Pipe : socket write failed ";
|
|
logging::Message::Args errMsgArgs;
|
|
errMsgArgs.add(oss.str());
|
|
fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
|
|
}
|
|
}
|
|
|
|
aLock.unlock();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
void WEDataLoader::serialize(messageqcpp::ByteStream& b) const
|
|
{
|
|
// TODO to be changed. left it here to understand how to implement
|
|
/*
|
|
b << (ObjectReader::id_t) ObjectReader::SIMPLECOLUMN;
|
|
ReturnedColumn::serialize(b); // parent class serialize
|
|
b << (uint32_t) fOid;
|
|
b << fData;
|
|
b << static_cast<const ByteStream::doublebyte>(fReturnAll);
|
|
b << (uint32_t) fSequence;
|
|
*/
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
void WEDataLoader::unserialize(messageqcpp::ByteStream& b)
|
|
{
|
|
// TODO to be changed. left it here to understand how to implement
|
|
/*
|
|
ObjectReader::checkType(b, ObjectReader::SIMPLECOLUMN);
|
|
ReturnedColumn::unserialize(b); // parent class unserialize
|
|
b >> (uint32_t&) fOid;
|
|
b >> fData;
|
|
b >> reinterpret_cast<ByteStream::doublebyte&>(fReturnAll);
|
|
b >> (uint32_t&) fSequence;
|
|
*/
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
|
|
} // namespace WriteEngine
|