/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */
/*******************************************************************************
 * $Id$
 *
 *******************************************************************************/
/*
 * 	we_dataloader.cpp
 *
 *  Created on: Oct 4, 2011
 *      Author: Boby Paul: bpaul@calpont.com
 */

#include "mcsconfig.h"  // Used to pickup STRERROR_R_CHAR_P definition

#include <cstdlib>
#include <csignal>
#include <cstring>
#include <cerrno>

#include <unistd.h>  //pipe() && fork()
#include <wait.h>  //wait()
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "bytestream.h"
#include "rwlock_local.h"

#include <iostream>
#include <fstream>
#include <vector>
#include <queue>
#include <string>
#include <map>
using namespace std;

#include <boost/thread/condition.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread.hpp>
#include <boost/filesystem.hpp>
using namespace boost;

#include "bytestream.h"
#include "messagequeue.h"
using namespace messageqcpp;

#include "we_messages.h"
#include "we_brmrprtparser.h"
#include "we_cleartablelockcmd.h"
#include "we_dataloader.h"
#include "we_readthread.h"

#include "installdir.h"

namespace WriteEngine
{
// bool WEDataLoader::fTearDownCpimport=false; // @bug 4267

//-----------------------------------------------------------------------------
/**
 *
 * @brief 	WEDataLoader::Constructor
 *
 **/
WEDataLoader::WEDataLoader(SplitterReadThread& Srt)
 : fRef(Srt)
 , fMode(-1)
 , fDataDumpFile()
 , fTxBytes(0)
 , fRxBytes(0)
 , fPmId(0)
 , fCh_pid(0)
 , fThis_pid(0)
 , fP_pid(0)
 , fpCfThread(0)
 , fTearDownCpimport(false)
 ,  // @bug 4267
 fWaitPidRc(0)
 ,  // @bug 4267
 fWaitPidStatus(0)
 ,  // @bug 4267
 fForceKill(false)
 , fPipeErr(false)
 , fpSysLog(0)
{
  Config weConfig;
  uint16_t localModuleId = weConfig.getLocalModuleID();
  fPmId = static_cast<char>(localModuleId);

  srand(time(NULL));                // initialize random seed
  int aObjId = rand() % 10000 + 1;  // generate a random number

  setObjId(aObjId);

  setupSignalHandlers();

  if (!fpSysLog)
  {
    fpSysLog = SimpleSysLog::instance();
    fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
  }
}
//-----------------------------------------------------------------------------
/**
 *
 * @brief 	WEDataLoader::Destructor
 *
 **/

WEDataLoader::~WEDataLoader()
{
  try
  {
    if (fDataDumpFile.is_open())
      fDataDumpFile.close();

    cout << "\tRx Bytes " << getRxBytes() << endl;
    cout << "\tTX Bytes " << getTxBytes() << endl;

    cout << "\tChild PID " << getChPid() << endl;

    if (getChPid())
    {
      if (2 == getMode())  //@bug 5012
      {
        kill(getChPid(), SIGINT);
        teardownCpimport(fTearDownCpimport);
      }
      else
      {
        teardownCpimport(false);  // @bug 4267
      }
    }
  }
  catch (std::exception&)  // @bug 4164: exception causing thread to exit
  {
    cout << "Error tearing down cpimport in WEDataLoader destructor" << endl;
  }

  // cout << "Leaving WEDataLoader destructor" << endl;
}

//------------------------------------------------------------------------------
// Initialize signal handling
//------------------------------------------------------------------------------

void WEDataLoader::setupSignalHandlers()
{
  signal(SIGPIPE, SIG_IGN);
  signal(SIGCHLD, WEDataLoader::onSigChild);
}
//------------------------------------------------------------------------------
// handles on signal Terminate
//------------------------------------------------------------------------------
void WEDataLoader::onSigChild(int aInt)
{
  std::string aStr = "Received SIGCHLD of terminated process..";
  cout << aStr << endl;
  // fTearDownCpimport = true; // @bug 4267

  // commented out for non-static variables
  // ostringstream oss;
  // oss << getObjId() <<" : " <<aStr;
  // logging::Message::Args errMsgArgs;
  // errMsgArgs.add(oss.str());
  // fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
}

//-----------------------------------------------------------------------------
/**
 *
 * @brief 	WEDataLoader::update
 *
 **/

bool WEDataLoader::update(Subject* pSub)
{
  return true;
}
//-----------------------------------------------------------------------------
/**
 *
 * @brief setup cpimport as a seperate process
 *
 **/

bool WEDataLoader::setupCpimport()  // fork the cpimport
{
  pid_t aChPid;

  errno = 0;

  if (pipe(fFIFO) == -1)
  {
    int errnum = errno;
    ostringstream oss;
    oss << getObjId() << " : Error in creating pipe (errno-" << errnum << "); " << strerror(errnum);
    throw runtime_error(oss.str());
  }

  setPid(getpid());
  setPPid(getppid());

  errno = 0;
  aChPid = fork();

  if (aChPid == -1)  // an error caused
  {
    int errnum = errno;
    ostringstream oss;
    oss << getObjId() << " : Error in forking cpimport.bin (errno-" << errnum << "); " << strerror(errnum);
    throw runtime_error(oss.str());
  }
  else if (aChPid == 0)  // we are in child
  {
    int aStartFD = 3;
    int aEndFD = fFIFO[1] + 256;
    close(fFIFO[1]);  // close the WRITER of CHILD

    cout << "Child Process Info: PID = " << getpid() << " (fFIFO[0], fFIFO[1]) = (" << fFIFO[0] << ","
         << fFIFO[1] << ")"
         << " (StartFD, EndFD) = (" << aStartFD << "," << aEndFD << ")" << endl;

    std::vector<char*> Cmds;
    // str2Argv(fCmdLineStr, Cmds);	// to avoid out-of-scope problem
    std::string aCmdLine = fCmdLineStr;
    std::istringstream ss(aCmdLine);
    std::string arg;
    std::vector<std::string> v2(20, "");
    unsigned int i = 0;

    while (ss >> arg)
    {
      v2[i++] = arg;
    }

    for (unsigned int j = 0; j < i; ++j)
    {
      Cmds.push_back(const_cast<char*>(v2[j].c_str()));
    }

    Cmds.push_back(0);  // null terminate
    // updatePrgmPath(Cmds);

    // NOTE: for debugging
    int aSize = Cmds.size();

    for (int aIdx = 0; aIdx < aSize; ++aIdx)
    {
      cout << "Args " << Cmds[aIdx] << endl;
    }

    cout.flush();

    close(0);           // close stdin for the child
    dup2(fFIFO[0], 0);  // make stdin be the reading end of the pipe

    // BUG 4410 : hacky solution so that CHLD process get EOF on close of pipe
    for (int i = aStartFD; i < aEndFD; i++)
      close(i);

    errno = 0;
    int aRet = execvp(Cmds[0], &Cmds[0]);  // NOTE - works with full Path
    // int aRet = execvp(Cmds[0], &Cmds[0]);	//NOTE - works if $PATH has cpimport

    int execvErrno = errno;
    cout << "Return status of cpimport is " << aRet << endl;
    cout.flush();
    close(fFIFO[0]);  // will trigger an EOF on stdin
    ostringstream oss;
    oss << getObjId() << " : execv error: cpimport.bin invocation failed; "
        << "(errno-" << errno << "); " << strerror(execvErrno) << "; Check file and try invoking locally.";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);

    if (aRet == -1)
      exit(-1);
  }
  else  // parent
  {
    setChPid(aChPid);  // This is the child PID
    cout << "Child PID is " << this->getChPid() << endl;
    close(fFIFO[0]);  // close the READER of PARENT
    // now we can send all the data thru FIFO[1], writer of PARENT
  }

  if (aChPid == 0)
    cout << "******** Child finished its work ********" << endl;

  return true;
}
//-----------------------------------------------------------------------------
/**
 *
 * @brief close all file handles opened for cpimport
 * @brief wait for the cpimport process to finish work
 *
 **/

void WEDataLoader::teardownCpimport(bool useStoredWaitPidStatus)  // @bug 4267
{
  fTearDownCpimport = false;  // Reset it
  // cout << "Tearing down Cpimport" << endl;
  int aStatus;

  // cout << "checking fpCfThread value" << endl;
  if (fpCfThread)
  {
    // cout << "checking fpCfThread has a valid value" << endl;

    // wait until we are done with the queued messages
    while ((!fpCfThread->isMsgQueueEmpty()) && (!fpCfThread->isStopped()))
    {
      // cout << "DEBUG : MsgQueue not empty" << endl;
      // cannot be too low, since there is a lock in isMsgQueueEmpty()
      usleep(2000000);
    }

    //		while(fpCfThread->isPushing())
    //		{
    //			cout << "DEBUG : still pushing" << endl;
    //			usleep(100000);
    //		}

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Message Queue is empty; Stopping CF Thread";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
    }

    fpCfThread->stopThread();

    while (!fpCfThread->isStopped())
    {
      cout << "DEBUG : still not stopped" << endl;
      usleep(100000);
    }

    delete fpCfThread;
    fpCfThread = 0;
  }

  closeWritePipe();
  pid_t aPid;

  // @bug 4267 begin: call waitpid() to get job status or use stored job status
  // aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
  if (useStoredWaitPidStatus)
  {
    aPid = fWaitPidRc;
    aStatus = fWaitPidStatus;
  }
  else
  {
    // aPid = waitpid(getChPid(), &aStatus, 0); // wait until cpimport finishs
    aPid = waitpid(getChPid(), &aStatus, WNOHANG);  // wait until cpimport finishs
    int aIdx = 0;

    while ((aPid == 0) && (aIdx < 25 * MAX_QSIZE))  // Do not loop infinitly
    {
      usleep(2000000);
      aPid = waitpid(getChPid(), &aStatus, WNOHANG);
      cout << "Inside tearDown waitpid rc[" << aIdx << "] = " << aPid << endl;
      ++aIdx;
    }
  }

  // @bug 4267 end						 // BP - added -1 as per DMC comment below
  if ((aPid == getChPid()) || (aPid == -1))  // @bug 4267 (DMC-shouldn't we check for aPid of -1?)
  {
    setChPid(0);

    if ((WIFEXITED(aStatus)) && (WEXITSTATUS(aStatus) == 0))
    {
      cout << "\tCpimport exit on success" << endl;
      ostringstream oss;
      oss << getObjId() << " : cpimport exit on success";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);

      onCpimportSuccess();
    }
    else
    {
      int termsig = (WIFSIGNALED(aStatus) ? WTERMSIG(aStatus) : -1);

      if (!fForceKill)
      {
        cout << "\tCpimport exit on failure (signal " << termsig << ')' << endl;
        ostringstream oss;
        oss << getObjId() << " : cpimport exit on failure (signal " << termsig << ')';
        logging::Message::Args errMsgArgs;
        errMsgArgs.add(oss.str());
        fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
        onCpimportFailure();
      }
      else
      {
        cout << "\tCpimport exit on Force Kill!!" << endl;
        ostringstream oss;
        oss << getObjId() << " : cpimport exit on Force kill!!";
        logging::Message::Args errMsgArgs;
        errMsgArgs.add(oss.str());
        fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
        onCpimportSuccess();
      }
    }
  }
}
//-----------------------------------------------------------------------------
/**
 * @brief 	Push the data to cpimport from the incoming ByteStream
 * @param	Incoming ByteStream
 *
 */
//
void WEDataLoader::pushData2Cpimport(ByteStream& Ibs)
{
  if (Ibs.length() > 0)
  {
    int aLen = Ibs.length();
    char* pStart = reinterpret_cast<char*>(Ibs.buf());
    char* pEnd = pStart + aLen;
    char* pPtr = pStart;

    while (pPtr < pEnd)
    {
      //		if(pEnd > (pPtr + MAX_LEN))
      //		{
      //			int aRet = write(fFIFO[1], pPtr, MAX_LEN);
      //			if(aRet == -1) throw runtime_error("Pipe write error");
      //			//write(fFIFO[1], Ibs.buf(), Ibs.length());
      //			pPtr += MAX_LEN;
      //		}
      //		else
      //		{
      //			int aStrLen = pEnd - pPtr;
      //			int aRet = write(fFIFO[1], pPtr, aStrLen);
      //			if(aRet == -1) throw runtime_error("Pipe write error");
      //			pPtr += aStrLen;
      //		}

      try
      {
        int aRet = write(fFIFO[1], pPtr, pEnd - pPtr);

        if (aRet < 0)
        {
          if (!fPipeErr)
          {
            int e = errno;
            fPipeErr = true;
            std::string aStr = "pushing data : PIPE error .........";

            char errMsgBuf[160];
#if STRERROR_R_CHAR_P
            const char* pErrMsg = strerror_r(e, errMsgBuf, sizeof(errMsgBuf));

            if (pErrMsg)
              aStr += pErrMsg;

#else
            int errMsgRc = strerror_r(e, errMsgBuf, sizeof(errMsgBuf));

            if (errMsgRc == 0)
              aStr += errMsgBuf;

#endif
            logging::Message::Args errMsgArgs;
            errMsgArgs.add(aStr);
            fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
          }

          throw runtime_error("Pipe Error - cpimport.bin exited already!!");
        }

        pPtr += aRet;
      }
      catch (...)
      {
        // std::string aStr = "pushing data PIPE error .........";
        // logging::Message::Args errMsgArgs;
        // errMsgArgs.add(aStr);
        // fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
        throw runtime_error("Pipe Error - cpimport.bin exited already!!");
      }
    }
  }
}

//-----------------------------------------------------------------------------
/**
 *
 * @brief 	Close the pipe through which data was written to cpimport
 * @brief 	This will also signal a EOF to the reading pipe.
 *
 **/

void WEDataLoader::closeWritePipe()
{
  cout << "Going to close call = " << fFIFO[1] << endl;
  // NOTE this will flush the file buffer and close it.
  int aRet = close(fFIFO[1]);  // will trigger EOD
  cout << "----- closed both pipes -------- aRet = " << aRet << endl;
}

//-----------------------------------------------------------------------------
/**
 *
 * @brief 	Tokenize a string into char** argv format and store in a vector
 * @brief 	we pass the V as arguments to exec cpimport
 * @param 	CmdLine is the string form of arguments demlimited by space
 * @param 	V vector which contains each element of argv
 *
 **/

void WEDataLoader::str2Argv(std::string CmdLine, std::vector<char*>& V)
{
  std::istringstream ss(CmdLine);
  std::string arg;
  std::vector<std::string> v2;

  while (ss >> arg)
  {
    v2.push_back(arg);
    V.push_back(const_cast<char*>(v2.back().c_str()));
  }

  V.push_back(0);  // null terminate
}

//-----------------------------------------------------------------------------
/**
 *
 * @brief 	Event to trigger when cpimport is successful.
 *
 **/

void WEDataLoader::onCpimportSuccess()
{
  ByteStream obs;

  cout << "Sending BRMRPT" << endl;
  obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
  obs << (ByteStream::byte)fPmId;  // PM id
  // for testing
  // std::string fRptFileName("ReportFile.txt");
  BrmReportParser aBrmRptParser;
  bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);

  if (aRet)
  {
    boost::mutex::scoped_lock aLock(fClntMsgMutex);
    // aBrmRptParser.unserialize(obs);   - was for testing
    updateTxBytes(obs.length());

    try
    {
      fRef.fIos.write(obs);
    }
    catch (...)
    {
      cout << "Broken Pipe .." << endl;

      if (fpSysLog)
      {
        ostringstream oss;
        oss << getObjId() << " : Broken Pipe : socket write failed ";
        logging::Message::Args errMsgArgs;
        errMsgArgs.add(oss.str());
        fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
      }
    }

    aLock.unlock();
    cout << "Finished Sending BRMRPT" << endl;
  }
  else
  {
    cout << "Failed to serialize BRMRpt " << endl;
  }

  //	if(remove(fBrmRptFileName.c_str()) != 0)
  //		cout <<"Failed to delete BRMRpt File "<< fBrmRptFileName << endl;
  // usleep(1000000);	//sleep 1 second.

  obs.reset();
  obs << (ByteStream::byte)WE_CLT_SRV_CPIPASS;
  obs << (ByteStream::byte)fPmId;  // PM id
  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();

  cout << "Sent CPIPASS info" << endl;

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : onCpimportSuccess BrmReport Send";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }
}

//-----------------------------------------------------------------------------
/**
 *
 * @brief 	Event to trigger if a cpimport failure occurs.
 *
 **/
void WEDataLoader::onCpimportFailure()
{
  // Send failure notice back to the parent splitter job
  sendCpimportFailureNotice();

  // Even if we failed, we have failure info in BRMRPT
  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_BRMRPT;
  obs << (ByteStream::byte)fPmId;  // PM id
  BrmReportParser aBrmRptParser;
  bool aRet = aBrmRptParser.serialize(fBrmRptFileName, obs);

  if (aRet)
  {
    boost::mutex::scoped_lock aLock(fClntMsgMutex);
    updateTxBytes(obs.length());

    try
    {
      fRef.fIos.write(obs);
    }
    catch (std::exception& ex)
    {
      cout << "Broken Pipe .." << ex.what() << endl;

      if (fpSysLog)
      {
        ostringstream oss;
        oss << getObjId() << " : Broken Pipe : socket write failed; " << ex.what();
        logging::Message::Args errMsgArgs;
        errMsgArgs.add(oss.str());
        fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
      }
    }

    aLock.unlock();
    cout << "Finished Sending BRMRPT" << endl;
  }

  if (remove(fBrmRptFileName.c_str()) != 0)
    cout << "Failed to delete BRMRpt File " << fBrmRptFileName << endl;

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : onCpimportFailure BrmReport Send";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }
}

//-----------------------------------------------------------------------------
// Send msg to front-end splitter to notify it that a cpimport.bin pgm failed.
//-----------------------------------------------------------------------------
void WEDataLoader::sendCpimportFailureNotice()
{
  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_CPIFAIL;
  obs << (ByteStream::byte)fPmId;  // PM id
  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
    }
  }
}

//-----------------------------------------------------------------------------
/**
 * @brief 	Event when a KEEPALIVE arrives.
 * @param	Incoming ByteStream, not used currently
 */
void WEDataLoader::onReceiveKeepAlive(ByteStream& Ibs)
{
  /*
  // TODO comment out when we done with debug
  if(fpSysLog)
  {
      ostringstream oss;
      oss << getObjId() <<" : Received KEEPALIVE";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
  }
  */
  // cout << "Received KEEPALIVE" << endl;
  // NOTE only seldom a KEEPALIVE receives,
  // 		so nothing wrong in responding with a KEEPALIVE.
  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
  obs << (ByteStream::byte)fPmId;  // PM id
  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << ": Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();

  // @bug 4267 begin
  int aStatus;
  pid_t aPid;

  if (getChPid() > 0)
  {
    aPid = waitpid(getChPid(), &aStatus, WNOHANG);  // wait until cpimport finishs

    if (aPid != 0)
    {
      cout << "waitpid(" << getChPid() << "): rc-" << aPid << "; status-" << aStatus << "; exited-"
           << (WIFEXITED(aStatus)) << endl;
    }

    if ((aPid == getChPid()) || (aPid == -1))
    {
      fTearDownCpimport = true;
      fWaitPidRc = aPid;
      fWaitPidStatus = aStatus;
    }
  }

  // @bug 4267 end

  if (fTearDownCpimport)
  {
    // fTearDownCpimport = false;		//Reset it // commented out to use the flag in EOD
    cout << "Cpimport terminated " << endl;

    if (0 == getMode())
      onReceiveEod(Ibs);
    else if (1 == getMode())  // mode 1 has to drive from UM
    {
      ByteStream obs;
      obs << (ByteStream::byte)WE_CLT_SRV_EOD;
      obs << (ByteStream::byte)fPmId;  // PM id
      boost::mutex::scoped_lock aLock(fClntMsgMutex);
      updateTxBytes(obs.length());

      try
      {
        fRef.fIos.write(obs);
      }
      catch (...)
      {
        cout << "Broken Pipe .." << endl;

        if (fpSysLog)
        {
          ostringstream oss;
          oss << getObjId() << ": Broken Pipe : socket write failed ";
          logging::Message::Args errMsgArgs;
          errMsgArgs.add(oss.str());
          fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
        }
      }

      aLock.unlock();
    }
    else if (2 == getMode())
    {
      // if(getChPid()) teardownCpimport(true); // @bug 4267
      if (getChPid())
        teardownCpimport(fTearDownCpimport);  // @bug 4267 BP
    }
  }
  else
  {
    // if(1 == getMode())
    //	if(fpCfThread) cout << "Queue Size = " << fpCfThread->getQueueSize() << endl;
  }
}
//-----------------------------------------------------------------------------
/**
 * @brief 	trigger when a DATA arrives.
 * @param	Incoming ByteStream which contains data
 */

void WEDataLoader::onReceiveData(ByteStream& Ibs)
{
  if ((0 == getMode()) && (fDataDumpFile.is_open()))
  {
    // Will write to the output file.
    fDataDumpFile << Ibs;
    sendDataRequest();
  }
  else if (1 == getMode())
  {
    // commented out since we are going to use seperate thread
    // pushData2Cpimport(Ibs);

    if (fpCfThread)
    {
      fpCfThread->add2MsgQueue(Ibs);
      // sendDataRequest();	// Need to control Queue Size
      // Bug 5031 : Will only send 1 rqst for a batch to cpimport.bin
      // if(fpCfThread->getQueueSize()<MIN_QSIZE) sendDataRequest();
      // if(fpCfThread->getQueueSize()<MAX_QSIZE) sendDataRequest();

      int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;

      // Bug 5031 : If Q size goes above 100 (2*250);
      if (aQsz < MAX_QSIZE)
        sendDataRequest();

      if (aQsz > 1.5 * static_cast<int>(MAX_QSIZE))  // > 2*250
      {
        cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;

        if (fpSysLog)
        {
          ostringstream oss;
          oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
          logging::Message::Args errMsgArgs;
          errMsgArgs.add(oss.str());
          fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
        }
      }
    }
  }
  else if (2 == getMode())
  {
    cout << "onReceiveData : In Mode 2 NO data suppose to receive" << endl;
  }

  //	ByteStream obs;
  //	obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
  //	obs << (ByteStream::byte)fPmId;     // PM id
  //  updateTxBytes(obs.length());
  //	fRef.fIos.write(obs);
}

//-----------------------------------------------------------------------------
/**
 * @brief 	trigger when a EOD arrives.
 * @param	Incoming ByteStream; not relevent for now
 */
void WEDataLoader::onReceiveEod(ByteStream& Ibs)
{
  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : onReceiveEOD : child ID = " << getChPid();
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  cout << "Received EOD " << endl;

  if (0 == getMode())
  {
    fDataDumpFile.close();
  }

  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_EOD;
  obs << (ByteStream::byte)fPmId;  // PM id
  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();

  // if(( 1 == getMode())||( 2 == getMode()))
  if (1 == getMode())  // BUG 4370 - seperated mode 1 & 2
  {
    // if(getChPid()) teardownCpimport(false); // @bug 4267
    if (getChPid())
      teardownCpimport(fTearDownCpimport);  // @bug 4267 //BP changed to send the correct flag
  }
  else if (2 == getMode())  // BUG 4370
  {
    if (getChPid())
    {
      kill(getChPid(), SIGINT);  // BUG 4370
      fForceKill = true;
      teardownCpimport(fTearDownCpimport);  // BUG 4370
    }
  }
}
//-----------------------------------------------------------------------------
/**
 * @brief 	Event on Command Received. It should contain sub commands
 * @param	Incoming ByteStream, will have sub commands
 */
void WEDataLoader::onReceiveCmd(ByteStream& bs)
{
  // TODO - can be cpimport cmd or server cmd, for now write to a file
  ByteStream::byte aCmdId;
  bs >> aCmdId;
}
//-----------------------------------------------------------------------------
/**
 * @brief 	The mode in which WES running.
 * @param	Incoming ByteStream, not relevent
 */
void WEDataLoader::onReceiveMode(ByteStream& Ibs)
{
  // Assigning it here since WEDataLoader constructor is called multiple times
  // while coping in readthread class.
  if (!fpSysLog)
  {
    fpSysLog = SimpleSysLog::instance();
    fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SRV));
  }

  Ibs >> (ByteStream::quadbyte&)fMode;
  cout << "Setting fMode = " << fMode << endl;

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : onReceiveMode() Setting fMode = " << fMode;
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  char aName[64];
  snprintf(aName, sizeof(aName), "ModuleDBRootCount%d-3", fPmId);

  string aStrDbRootCnt = config::Config::makeConfig()->getConfig("SystemModuleConfig", aName);
  cout << "DbRootCnt = " << aStrDbRootCnt << endl;
  ByteStream::byte aDbCnt = (ByteStream::byte)atoi(aStrDbRootCnt.c_str());

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : onReceiveMode() DbRoot Count = " + aStrDbRootCnt;
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  // Send No. of DBRoots to Client
  ByteStream aObs;
  aObs << (ByteStream::byte)WE_CLT_SRV_DBRCNT;
  aObs << (ByteStream::byte)fPmId;
  aObs << (ByteStream::byte)aDbCnt;
  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(aObs.length());

  try
  {
    fRef.fIos.write(aObs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();
}
//-----------------------------------------------------------------------------
/**
 * @brief 	Acknowledgment. Not relevant at this point of time.
 * @brief	Can make use to update the BRM
 * @param	Incoming ByteStream, not relevant for now
 */

void WEDataLoader::onReceiveAck(ByteStream& Ibs)
{
  // All is good
  // update the status
}
//-----------------------------------------------------------------------------
/**
 * @brief 	NAK. A Failure, Rollback should be initiated.
 * @brief	Can make use to update the BRM
 * @param	Incoming ByteStream, not relevant for now
 */
void WEDataLoader::onReceiveNak(ByteStream& Ibs)
{
  // TODO - handle the problem
}
//-----------------------------------------------------------------------------
/**
 * @brief 	ERROR. A Failure, Rollback should be initiated.
 * @brief	Can make use to update the BRM
 * @param	Incoming ByteStream, not relevant for now
 */
void WEDataLoader::onReceiveError(ByteStream& Ibs)
{
  // TODO - handle the failure situation.
}
//------------------------------------------------------------------------------
//  onReceiveCmdLineArgs - do what ever need to do with command line args
//------------------------------------------------------------------------------
/**
 * @brief 	Command line args received.
 * @brief	Can make use to update the BRM
 * @param	Incoming ByteStream, not relevant for now
 */

void WEDataLoader::onReceiveCmdLineArgs(ByteStream& Ibs)
{
  Ibs >> fCmdLineStr;
  cout << "CMD LINE ARGS came in " << fCmdLineStr << endl;

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : CMD LINE ARGS came in " << fCmdLineStr;
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  ByteStream obs;

  // TODO - Need to check all clear for starting CPI
  if (0 != getMode())
  {
    obs << (ByteStream::byte)WE_CLT_SRV_STARTCPI;
  }
  else
  {
    obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
  }

  obs << (ByteStream::byte)fPmId;  // PM id
  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();
}

//-----------------------------------------------------------------------------

void WEDataLoader::onReceiveStartCpimport()
{
  cout << "Start Cpimport command reached!!" << endl;

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : Start Cpimport command reached!!";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  try
  {
    setupCpimport();

    if (1 == getMode())  // create a thread to handle the data feeding part
    {
      fpCfThread = new WECpiFeederThread(*this);
      fpCfThread->startFeederThread();
    }
  }
  catch (std::exception& ex)
  {
    // send an CPI FAIL command back to splitter
    if (fpSysLog)
    {
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(ex.what());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);

      sendCpimportFailureNotice();
      return;
    }
  }

  if (1 == getMode())  // In mode 2/0 we do not rqst data.
  {
    sendDataRequest();
  }

  // We need to respond to KEEP ALIVES
  // else if(2 == getMode())	// Now we wait till cpimport comes back
  //{
  //	if(getChPid())
  //		teardownCpimport();
  //}
}

//-----------------------------------------------------------------------------

void WEDataLoader::onReceiveBrmRptFileName(ByteStream& Ibs)
{
  Ibs >> fBrmRptFileName;
  cout << "Brm Rpt Filename Arrived " << fBrmRptFileName << endl;

  // BUG 4645
  string::size_type idx = fBrmRptFileName.find_last_of('/');

  if (idx > 0 && idx < string::npos)
  {
    string dirname(fBrmRptFileName, 0, idx);
    struct stat st;

    if (stat(dirname.c_str(), &st) != 0)
    {
      cout << "Creating directory : " << dirname << endl;
      boost::filesystem::create_directories(dirname.c_str());
    }
  }

  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : Brm Rpt Filename Arrived " << fBrmRptFileName;
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }
}

//-----------------------------------------------------------------------------

void WEDataLoader::onReceiveCleanup(ByteStream& Ibs)
{
  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : OnReceiveCleanup arrived";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  std::string aErrMsg;

  WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
  int aRet = aClrTblLockCmd.processCleanup(Ibs, aErrMsg);

  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_CLEANUP;
  obs << (ByteStream::byte)fPmId;  // PM id

  if (aRet == 0)
    obs << (ByteStream::byte)1;  // cleanup success
  else
    obs << (ByteStream::byte)0;  // cleanup failed

  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();
}

//-----------------------------------------------------------------------------

void WEDataLoader::onReceiveRollback(ByteStream& Ibs)
{
  if (fpSysLog)
  {
    ostringstream oss;
    oss << getObjId() << " : OnReceiveRollback arrived";
    logging::Message::Args errMsgArgs;
    errMsgArgs.add(oss.str());
    fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_DEBUG, logging::M0000);
  }

  std::string aErrMsg;

  WE_ClearTableLockCmd aClrTblLockCmd("DataLoader");
  int aRet = aClrTblLockCmd.processRollback(Ibs, aErrMsg);

  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK;
  obs << (ByteStream::byte)fPmId;  // PM id

  if (aRet == 0)
    obs << (ByteStream::byte)1;  // Rollback success
  else
    obs << (ByteStream::byte)0;  // Rollback failed

  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();
}
//-----------------------------------------------------------------------------

void WEDataLoader::onReceiveImportFileName(ByteStream& Ibs)
{
  bool aGoodFile = true;
  std::string aFileName;
  Ibs >> aFileName;

  // BUG 4245 : Need to check the file or path exists
  {
    std::fstream aFin;
    aFin.open(aFileName.c_str(), std::ios::in);

    if (aFin.is_open())  // File exist, send an ERROR immediately
    {
      // file exists
      ByteStream obs;
      obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
      obs << (ByteStream::byte)fPmId;
      updateTxBytes(obs.length());
      boost::mutex::scoped_lock aLock(fClntMsgMutex);

      try
      {
        fRef.fIos.write(obs);
      }
      catch (...)
      {
        cout << "Broken Pipe .." << endl;

        if (fpSysLog)
        {
          ostringstream oss;
          oss << getObjId() << " : Broken Pipe : socket write failed ";
          logging::Message::Args errMsgArgs;
          errMsgArgs.add(oss.str());
          fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
        }
      }

      aGoodFile = false;
      aLock.unlock();
    }

    aFin.close();
  }

  if (aGoodFile)
  {
    fDataDumpFile.open(aFileName.c_str(), std::ios::app);

    // BUG 4245 : If file dir is not existing, we need to fail this import
    if (!fDataDumpFile.good())
    {
      ByteStream obs;
      obs << (ByteStream::byte)WE_CLT_SRV_IMPFILEERROR;
      obs << (ByteStream::byte)fPmId;  // PM id
      boost::mutex::scoped_lock aLock(fClntMsgMutex);
      updateTxBytes(obs.length());

      try
      {
        fRef.fIos.write(obs);
      }
      catch (...)
      {
        cout << "Broken Pipe .." << endl;

        if (fpSysLog)
        {
          ostringstream oss;
          oss << getObjId() << " : Broken Pipe : socket write failed ";
          logging::Message::Args errMsgArgs;
          errMsgArgs.add(oss.str());
          fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
        }
      }

      aLock.unlock();
    }
  }
}
//------------------------------------------------------------------------------

void WEDataLoader::onReceiveJobId(ByteStream& Ibs)
{
  std::string aJobFileName;
  Ibs >> aJobFileName;

  cout << "Incoming JobFileName : " << aJobFileName << endl;

  // BUG 4645
  string::size_type idx = aJobFileName.find_last_of('/');

  if (idx > 0 && idx < string::npos)
  {
    string dirname(aJobFileName, 0, idx);
    struct stat st;

    if (stat(dirname.c_str(), &st) != 0)
    {
      cout << "Creating directory : " << dirname << endl;
      boost::filesystem::create_directories(dirname.c_str());
    }
  }

  fJobFile.open(aJobFileName.c_str());
}

//------------------------------------------------------------------------------

void WEDataLoader::onReceiveJobData(ByteStream& Ibs)
{
  // Will write to the output file.
  std::string aData;
  Ibs >> aData;
  fJobFile << aData;
  fJobFile.close();
}

//------------------------------------------------------------------------------

void WEDataLoader::onReceiveErrFileRqst(ByteStream& Ibs)
{
  std::string aErrFileName;
  Ibs >> aErrFileName;
  cout << "Error Filename Arrived " << aErrFileName << endl;

  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_ERRLOG;
  obs << (ByteStream::byte)fPmId;  // PM id
  obs << aErrFileName;
  BrmReportParser aErrFileParser;
  bool aRet = aErrFileParser.serialize(aErrFileName, obs);

  if (aRet)
  {
    boost::mutex::scoped_lock aLock(fClntMsgMutex);
    updateTxBytes(obs.length());

    try
    {
      fRef.fIos.write(obs);
    }
    catch (...)
    {
      cout << "Broken Pipe .." << endl;

      if (fpSysLog)
      {
        ostringstream oss;
        oss << getObjId() << " : Broken Pipe : socket write failed ";
        logging::Message::Args errMsgArgs;
        errMsgArgs.add(oss.str());
        fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
      }
    }

    aLock.unlock();
  }

  // delete the temp files
  if (remove(aErrFileName.c_str()) != 0)
    cout << "Failed in removing Error file: " << aErrFileName << endl;
}

//------------------------------------------------------------------------------
// Process the receipt of a msg containing the contents of a *.bad file.
//------------------------------------------------------------------------------
void WEDataLoader::onReceiveBadFileRqst(ByteStream& Ibs)
{
  std::string aBadFileName;
  Ibs >> aBadFileName;
  cout << "Error Filename Arrived " << aBadFileName << endl;

  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_BADLOG;
  obs << (ByteStream::byte)fPmId;  // PM id
  obs << aBadFileName;
  BrmReportParser aBadFileParser;
  bool aRet = aBadFileParser.serializeBlocks(aBadFileName, obs);

  if (aRet)
  {
    boost::mutex::scoped_lock aLock(fClntMsgMutex);
    updateTxBytes(obs.length());

    try
    {
      fRef.fIos.write(obs);
    }
    catch (...)
    {
      cout << "Broken Pipe .." << endl;

      if (fpSysLog)
      {
        ostringstream oss;
        oss << getObjId() << " : Broken Pipe : socket write failed ";
        logging::Message::Args errMsgArgs;
        errMsgArgs.add(oss.str());
        fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
      }
    }

    aLock.unlock();
  }

  // delete the temp files
  if (remove(aBadFileName.c_str()) != 0)
    cout << "Failed in removing Error file: " << aBadFileName << endl;
}

//------------------------------------------------------------------------------

void WEDataLoader::sendDataRequest()
{
  int aQsz = (fpCfThread) ? fpCfThread->getQueueSize() : 0;

  // if(aQsz>MIN_QSIZE)
  // Bug 5031 : If Q size goes above 100 (2*50); there is some thing wrong
  // will put a warning in info log. Controlled in Cpimport init data rqst cnt
  if (aQsz > MAX_QSIZE)  // >250
  {
    cout << "WARNING : Data Queuing up : QSize = " << aQsz << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << "WARNING : Data Queuing up : QSize = " << aQsz;
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  boost::mutex::scoped_lock aLock(fClntMsgMutex);
  ByteStream obs;
  obs << (ByteStream::byte)WE_CLT_SRV_DATARQST;
  obs << (ByteStream::byte)fPmId;  // PM id
  updateTxBytes(obs.length());

  try
  {
    fRef.fIos.write(obs);
  }
  catch (...)
  {
    cout << "Broken Pipe .." << endl;

    if (fpSysLog)
    {
      ostringstream oss;
      oss << getObjId() << " : Broken Pipe : socket write failed ";
      logging::Message::Args errMsgArgs;
      errMsgArgs.add(oss.str());
      fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
    }
  }

  aLock.unlock();
}

//------------------------------------------------------------------------------
void WEDataLoader::serialize(messageqcpp::ByteStream& b) const
{
  // TODO to be changed. left it here to understand how to implement
  /*
  b << (ObjectReader::id_t) ObjectReader::SIMPLECOLUMN;
  ReturnedColumn::serialize(b); // parent class serialize
  b << (uint32_t) fOid;
  b << fData;
  b << static_cast<const ByteStream::doublebyte>(fReturnAll);
  b << (uint32_t) fSequence;
  */
}

//-----------------------------------------------------------------------------

void WEDataLoader::unserialize(messageqcpp::ByteStream& b)
{
  // TODO to be changed. left it here to understand how to implement
  /*
  ObjectReader::checkType(b, ObjectReader::SIMPLECOLUMN);
  ReturnedColumn::unserialize(b); // parent class unserialize
  b >> (uint32_t&) fOid;
  b >> fData;
  b >> reinterpret_cast<ByteStream::doublebyte&>(fReturnAll);
  b >> (uint32_t&) fSequence;
  */
}

//-----------------------------------------------------------------------------

}  // namespace WriteEngine