You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	* feat(cpimport): MCOL-4882 add a parameter to skip header rows * chore(cpimport): MCOL-4882 Use boost::program_options to arguments parsing * feat(cpimport.bin): MCOL-4882 Add missing changes * add test * fix clang * add missing cmdline argument * fix bug * Fix double lines skipping * Fix incorrect --silent (-N) parsing * fix default --max-errors processing * fix overwriting default username * move initialization to members declaration
		
			
				
	
	
		
			2885 lines
		
	
	
		
			76 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			2885 lines
		
	
	
		
			76 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 * Copyright (C) 2016 MariaDB Corporation.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/*******************************************************************************
 | 
						|
 * $Id$
 | 
						|
 *
 | 
						|
 *******************************************************************************/
 | 
						|
 | 
						|
/*
 | 
						|
 * we_sdhandler.cpp
 | 
						|
 *
 | 
						|
 *  Created on: Oct 17, 2011
 | 
						|
 *      Author: bpaul
 | 
						|
 */
 | 
						|
 | 
						|
#include <ctime>
 | 
						|
#include <fstream>
 | 
						|
#include <istream>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include "we_messages.h"
 | 
						|
#include "resourcemanager.h"
 | 
						|
 | 
						|
#include <sys/time.h>
 | 
						|
 | 
						|
#include <boost/thread/condition.hpp>
 | 
						|
#include <boost/scoped_array.hpp>
 | 
						|
#include <boost/thread.hpp>
 | 
						|
using namespace boost;
 | 
						|
 | 
						|
#include "mcsconfig.h"
 | 
						|
#include "configcpp.h"
 | 
						|
using namespace config;
 | 
						|
 | 
						|
//-----
 | 
						|
 | 
						|
#include "messagequeue.h"
 | 
						|
#include "bytestream.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
 | 
						|
#include "calpontsystemcatalog.h"
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
#include "batchloader.h"
 | 
						|
using namespace batchloader;
 | 
						|
 | 
						|
//-----
 | 
						|
 | 
						|
#include "we_sdhandler.h"
 | 
						|
 | 
						|
#include "we_splitterapp.h"
 | 
						|
#include "we_respreadthread.h"
 | 
						|
#include "we_filereadthread.h"
 | 
						|
#include "we_brmupdater.h"
 | 
						|
#include "we_tablelockgrabber.h"
 | 
						|
#include "we_simplesyslog.h"
 | 
						|
 | 
						|
#include "installdir.h"
 | 
						|
 | 
						|
namespace WriteEngine
 | 
						|
{
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//
 | 
						|
/** @brief Add PM to the list
 | 
						|
 *  @param	- PmId
 | 
						|
 */
 | 
						|
 | 
						|
void WEPmList::addPm2List(int PmId)
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock aLock(fListMutex);
 | 
						|
  fPmList.push_back(PmId);
 | 
						|
  aLock.unlock();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WEPmList::addPriorityPm2List(int PmId)
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock aLock(fListMutex);
 | 
						|
  fPmList.push_front(PmId);
 | 
						|
  aLock.unlock();
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WEPmList::getNextPm()
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock aLock(fListMutex);
 | 
						|
  int aPmId = 0;
 | 
						|
 | 
						|
  if (!fPmList.empty())
 | 
						|
  {
 | 
						|
    aPmId = fPmList.front();
 | 
						|
    fPmList.pop_front();
 | 
						|
  }
 | 
						|
 | 
						|
  return aPmId;
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
// incase a shutdown or roll back is called we need to clear the data
 | 
						|
// so that the sendingthreads will not keep sending data.
 | 
						|
void WEPmList::clearPmList()
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock aLock(fListMutex);
 | 
						|
 | 
						|
  if (!fPmList.empty())
 | 
						|
    fPmList.clear();
 | 
						|
 | 
						|
  aLock.unlock();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WEPmList::check4Pm(int PmId)
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock aLock(fListMutex);
 | 
						|
  WePmList::iterator aIt = fPmList.begin();
 | 
						|
  bool aFound = false;
 | 
						|
 | 
						|
  while (aIt != fPmList.end())
 | 
						|
  {
 | 
						|
    if ((*aIt) == PmId)
 | 
						|
    {
 | 
						|
      aFound = true;
 | 
						|
      fPmList.erase(aIt);
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    ++aIt;
 | 
						|
  }
 | 
						|
 | 
						|
  return aFound;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//	WESDHandler Definitions
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
WESDHandler::WESDHandler(WESplitterApp& Ref)
 | 
						|
 : fRef(Ref)
 | 
						|
 , fLog()
 | 
						|
 , fQId(101)
 | 
						|
 ,  // 101 - took it from air
 | 
						|
 fOam()
 | 
						|
 , fModuleTypeConfig()
 | 
						|
 , fDebugLvl(0)
 | 
						|
 , fPmCount(0)
 | 
						|
 , fTableLock(0)
 | 
						|
 , fTableOId(0)
 | 
						|
 , fFixedBinaryRecLen(0)
 | 
						|
 , fRespMutex()
 | 
						|
 , fRespCond()
 | 
						|
 , fSendMutex()
 | 
						|
 , fRespList()
 | 
						|
 , fpRespThread(0)
 | 
						|
 , fDataFeedList()
 | 
						|
 , fFileReadThread(*this)
 | 
						|
 , fDisconnectFailure(false)
 | 
						|
 , fForcedFailure(false)
 | 
						|
 , fAllCpiStarted(false)
 | 
						|
 , fFirstDataSent(false)
 | 
						|
 , fFirstPmToSend(0)
 | 
						|
 , fSelectOtherPm(false)
 | 
						|
 , fContinue(true)
 | 
						|
 , fWeSplClients(MAX_PMS)
 | 
						|
 , fBrmRptVec()
 | 
						|
 , fpBatchLoader(0)
 | 
						|
{
 | 
						|
  fRm = joblist::ResourceManager::instance();
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
WESDHandler::~WESDHandler()
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[aCnt] != 0)
 | 
						|
      {
 | 
						|
        delete fWeSplClients[aCnt];
 | 
						|
        fWeSplClients[aCnt] = 0;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    delete fpRespThread;
 | 
						|
    fpRespThread = 0;
 | 
						|
    delete fpBatchLoader;
 | 
						|
    fpBatchLoader = 0;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    std::string aStr = "Handled an error in ~WESDHandler";
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    errMsgArgs.add(aStr);
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
    cout << aStr << endl;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::reset()
 | 
						|
{
 | 
						|
  fTableLock = 0;
 | 
						|
  fTableOId = 0;
 | 
						|
  fFixedBinaryRecLen = 0;
 | 
						|
  // fpRespThread = 0;
 | 
						|
  // fFileReadThread(*this);
 | 
						|
  fForcedFailure = false;
 | 
						|
  fAllCpiStarted = false;
 | 
						|
  fFirstDataSent = false;
 | 
						|
  fFirstPmToSend = 0;
 | 
						|
  fSelectOtherPm = false;
 | 
						|
  fContinue = true;
 | 
						|
  // fWeSplClients();
 | 
						|
  // fpBatchLoader = 0;
 | 
						|
  fImportRslt.reset();
 | 
						|
  fLog.closeLog();
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[aCnt] != 0)
 | 
						|
      {
 | 
						|
        delete fWeSplClients[aCnt];
 | 
						|
        fWeSplClients[aCnt] = 0;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    delete fpRespThread;
 | 
						|
    fpRespThread = 0;
 | 
						|
    delete fpBatchLoader;
 | 
						|
    fpBatchLoader = 0;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    std::string aStr = "Handled an error in ~WESDHandler";
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    errMsgArgs.add(aStr);
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
    cout << aStr << endl;
 | 
						|
  }
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// BP 10/24/2011 14:22
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::send2Pm(ByteStream& Bs, unsigned int PmId)
 | 
						|
{
 | 
						|
  // mutex::scoped_lock aLock(fSendMutex);
 | 
						|
 | 
						|
  if (PmId == 0)  // send it to everyone
 | 
						|
  {
 | 
						|
    for (int aIdx = 1; aIdx <= getPmCount(); ++aIdx)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[aIdx] != 0)
 | 
						|
      {
 | 
						|
        boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fWriteMutex);
 | 
						|
        fWeSplClients[aIdx]->write(Bs);
 | 
						|
        aLock.unlock();
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fWriteMutex);
 | 
						|
    fWeSplClients[PmId]->write(Bs);
 | 
						|
    aLock.unlock();
 | 
						|
  }
 | 
						|
 | 
						|
  // aLock.unlock();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::send2Pm(messageqcpp::SBS& Sbs, unsigned int PmId)
 | 
						|
{
 | 
						|
  // mutex::scoped_lock aLock(fSendMutex);
 | 
						|
 | 
						|
  if (PmId == 0)  // send it to everyone
 | 
						|
  {
 | 
						|
    for (int aIdx = 1; aIdx <= getPmCount(); ++aIdx)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[aIdx] != 0)
 | 
						|
      {
 | 
						|
        boost::mutex::scoped_lock aLock(fWeSplClients[aIdx]->fSentQMutex);
 | 
						|
        fWeSplClients[aIdx]->add2SendQueue(Sbs);
 | 
						|
        aLock.unlock();
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock aLock(fWeSplClients[PmId]->fSentQMutex);
 | 
						|
    fWeSplClients[PmId]->add2SendQueue(Sbs);
 | 
						|
    aLock.unlock();
 | 
						|
  }
 | 
						|
 | 
						|
  // aLock.unlock();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//
 | 
						|
 | 
						|
void WESDHandler::sendEODMsg()
 | 
						|
{
 | 
						|
  // BUG 5035 Sending multiple EOD so that to avoid 'silly window syndrome'
 | 
						|
  for (int idx = 0; idx < 3; idx++)
 | 
						|
  {
 | 
						|
    messageqcpp::SBS aSbs(new messageqcpp::ByteStream);
 | 
						|
    *aSbs << (ByteStream::byte)WE_CLT_SRV_EOD;
 | 
						|
    send2Pm(aSbs);
 | 
						|
  }
 | 
						|
 | 
						|
  {
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "Send EOD message to All PMs";
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    errMsgArgs.add(aStrStr.str());
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//  check for the messages from all WE servers
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::checkForRespMsgs()
 | 
						|
{
 | 
						|
  ByteStream aBs;
 | 
						|
  ByteStream::byte aPmId;
 | 
						|
  ByteStream::byte aMsgId;
 | 
						|
  // boost::shared_ptr<messageqcpp::ByteStream> aSbs;
 | 
						|
  messageqcpp::SBS aSbs;
 | 
						|
 | 
						|
  while (isContinue())
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock aLock(fRespMutex);
 | 
						|
 | 
						|
    // NOTE - if isContinue is not checked thread will hang on shutdown
 | 
						|
    // 		by locking again on fRespList.empty()
 | 
						|
    while ((fRespList.empty()) && (isContinue()))
 | 
						|
      fRespCond.wait(aLock);
 | 
						|
 | 
						|
    // if(!isContinue()) {	aLock.unlock();	break;	} testing for rare hanging
 | 
						|
 | 
						|
    // cout <<"wait signaled checkForRespMsgs" << endl;
 | 
						|
    while (!fRespList.empty())
 | 
						|
    {
 | 
						|
      // mutex::scoped_lock aLock (fRespMutex);
 | 
						|
      aSbs = fRespList.front();
 | 
						|
      fRespList.pop_front();
 | 
						|
      // aLock.unlock();
 | 
						|
      *aSbs >> aMsgId;
 | 
						|
      *aSbs >> aPmId;
 | 
						|
 | 
						|
      // Debugging
 | 
						|
      // cout << "aMsgid = " << static_cast<short>(aMsgId) << endl;
 | 
						|
      switch (aMsgId)
 | 
						|
      {
 | 
						|
        case WE_CLT_SRV_KEEPALIVE: onKeepAliveMessage(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_ACK: onAckResponse(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_DATARQST: onDataRqstResponse(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_EOD: onEodResponse(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_STARTCPI: onStartCpiResponse(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_CPIPASS: onCpimportPass(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_CPIFAIL: onCpimportFail(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_BRMRPT: onBrmReport(static_cast<int>(aPmId), aSbs); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_ROLLBACK: onRollbackResult(static_cast<int>(aPmId), aSbs); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_CLEANUP: onCleanupResult(static_cast<int>(aPmId), aSbs); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_DBRCNT: onDBRootCount(static_cast<int>(aPmId), aSbs); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_ERRLOG: onErrorFile(static_cast<int>(aPmId), aSbs); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_BADLOG: onBadFile(static_cast<int>(aPmId), aSbs); break;
 | 
						|
 | 
						|
        case WE_CLT_SRV_IMPFILEERROR: onImpFileError(static_cast<int>(aPmId)); break;
 | 
						|
 | 
						|
        default: break;
 | 
						|
      }  // switch
 | 
						|
 | 
						|
      aSbs.reset();
 | 
						|
    }  // while not empty()
 | 
						|
 | 
						|
    aLock.unlock();
 | 
						|
    // yield here so that other threads get slice
 | 
						|
  }  // while
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::add2RespQueue(const messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock aLock(fRespMutex);
 | 
						|
  fRespList.push_back(Sbs);
 | 
						|
  aLock.unlock();
 | 
						|
  // cout <<"Notifing from add2RespQueue" << endl;
 | 
						|
  fRespCond.notify_one();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::setup()
 | 
						|
{
 | 
						|
  std::stringstream aPid;
 | 
						|
  bool bRollback;
 | 
						|
  bool bForce;
 | 
						|
  aPid << getpid();
 | 
						|
  std::string aTimeStamp = getTime2Str();
 | 
						|
  std::string aLogName;
 | 
						|
  std::string aErrLogName;
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.isJobLogOnly())
 | 
						|
    aLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_Job_" + fRef.fCmdArgs.getJobId() + ".log";
 | 
						|
  else
 | 
						|
    aLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_" + aTimeStamp + "_" + aPid.str() + ".log";
 | 
						|
 | 
						|
  if (getDebugLvl() > 1)
 | 
						|
    cout << "LogName : " << aLogName << endl;
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.isJobLogOnly())
 | 
						|
    aErrLogName = std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_Job_" + fRef.fCmdArgs.getJobId() + ".err";
 | 
						|
  else
 | 
						|
    aErrLogName =
 | 
						|
        std::string(MCSLOGDIR) + "/cpimport/" + "cpimport_" + aTimeStamp + "_" + aPid.str() + ".err";
 | 
						|
 | 
						|
  if (getDebugLvl() > 1)
 | 
						|
    cout << "ErrLogName : " << aErrLogName << endl;
 | 
						|
 | 
						|
  // consoleFlag false will only output only MSGLOG_LVL1 to console
 | 
						|
  // and MSGLOG_LVL2 to log file without writing to console.
 | 
						|
  // fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), false);
 | 
						|
  fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), getConsoleLog());
 | 
						|
 | 
						|
  // In mode 0 and Mode 1, we need to construct the input file list and check availability
 | 
						|
  if (0 == fRef.fCmdArgs.getMode() || 1 == fRef.fCmdArgs.getMode())
 | 
						|
    setInputFileList(fRef.getLocFile());
 | 
						|
 | 
						|
  fImportRslt.startTimer();
 | 
						|
 | 
						|
  if (fPmCount == 0)  // Should have already set in cpimport invoke check
 | 
						|
  {
 | 
						|
    throw(runtime_error("Configuration Error. PM Count = 0"));
 | 
						|
  }
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getPmVecSize() == 0)  // No Pms listed in Cmd line
 | 
						|
  {
 | 
						|
    // BUG 4668 - Added this code to find the PM's realtime from OAM
 | 
						|
    // fOam.getSystemConfig("pm", fModuleTypeConfig); //commented out since-
 | 
						|
    // - we are calling that function in check4CpiInvokeMode()
 | 
						|
    oam::DeviceNetworkList::iterator pt = fModuleTypeConfig.ModuleNetworkList.begin();
 | 
						|
 | 
						|
    for (; pt != fModuleTypeConfig.ModuleNetworkList.end(); pt++)
 | 
						|
    {
 | 
						|
      int moduleID =
 | 
						|
          atoi((*pt).DeviceName.substr(oam::MAX_MODULE_TYPE_SIZE, oam::MAX_MODULE_ID_SIZE).c_str());
 | 
						|
 | 
						|
      if (getDebugLvl() > 1)
 | 
						|
        cout << "Adding PmId - " << moduleID << endl;
 | 
						|
 | 
						|
      fRef.fCmdArgs.add2PmVec(moduleID);
 | 
						|
    }
 | 
						|
 | 
						|
    /*
 | 
						|
     for (int PmId = 1; ((PmId <= fPmCount) && (PmId < MAX_PMS)); ++PmId)
 | 
						|
     {
 | 
						|
     if(getDebugLvl()>1) cout<<"Adding PmId - "<<PmId <<endl;
 | 
						|
     fRef.fCmdArgs.add2PmVec(PmId);
 | 
						|
     }
 | 
						|
     */
 | 
						|
  }
 | 
						|
 | 
						|
  // getModuleStatus will take too long. Also to test in development
 | 
						|
  /*    if (getenv("SKIP_OAM_INIT") == NULL)
 | 
						|
      {
 | 
						|
          vector<unsigned int>& aVec = fRef.fCmdArgs.getPmVec();
 | 
						|
 | 
						|
          for (unsigned int PmId = 1; (PmId <= static_cast<unsigned int>(fPmCount)); ++PmId)
 | 
						|
          {
 | 
						|
              int opState = oam::ACTIVE;
 | 
						|
              bool aDegraded = false;
 | 
						|
              ostringstream aOss;
 | 
						|
              aOss << "pm" << PmId;
 | 
						|
              std::string aModName = aOss.str();
 | 
						|
 | 
						|
              if (getDebugLvl())
 | 
						|
                  cout << "getModuleStatus() ModName = " << aModName << endl;
 | 
						|
 | 
						|
              try
 | 
						|
              {
 | 
						|
                  fOam.getModuleStatus(aModName, opState, aDegraded);
 | 
						|
 | 
						|
                  if (getDebugLvl())
 | 
						|
                      cout << "ModName = " << aModName << " opState = " << opState << endl;
 | 
						|
              }
 | 
						|
              catch (std::exception& ex)
 | 
						|
              {}
 | 
						|
 | 
						|
              if (opState != oam::ACTIVE ) // BUG 4668
 | 
						|
              {
 | 
						|
                  aVec.erase(std::remove(aVec.begin(), aVec.end(), PmId), aVec.end());
 | 
						|
              }
 | 
						|
          }
 | 
						|
      }
 | 
						|
  */
 | 
						|
 | 
						|
  int rtn = fDbrm.getSystemReady();
 | 
						|
 | 
						|
  if (rtn < 1)
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "System is not ready (" << rtn << ").  Verify that Columnstore is up and ready ";
 | 
						|
    // fLog.logMsg( oss.str(), MSGLVL_ERROR );
 | 
						|
    setContinue(false);
 | 
						|
    throw runtime_error(oss.str());
 | 
						|
  }
 | 
						|
  else if (BRM::ERR_OK != fDbrm.isReadWrite())
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Error: System in ReadOnly state.";
 | 
						|
    // fLog.logMsg(oss.str(), MSGLVL_ERROR);
 | 
						|
    setContinue(false);
 | 
						|
    throw runtime_error(oss.str());
 | 
						|
  }
 | 
						|
  else if (fDbrm.getSystemShutdownPending(bRollback, bForce) > 0)
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "System is being shutdown. Can't start a new import";
 | 
						|
    setContinue(false);
 | 
						|
    throw runtime_error(oss.str());
 | 
						|
  }
 | 
						|
  else if (fDbrm.getSystemSuspendPending(bRollback) > 0 || fDbrm.getSystemSuspended())
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "System is in write disabled state. Can't start a new import";
 | 
						|
    setContinue(false);
 | 
						|
    throw runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  if ((fRef.fCmdArgs.getMode() == 1) || (fRef.fCmdArgs.getMode() == 2))
 | 
						|
  {
 | 
						|
    fTableOId = 0;
 | 
						|
    fFixedBinaryRecLen = 0;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
      int32_t tblOid = getTableOID(fRef.fCmdArgs.getSchemaName(), fRef.fCmdArgs.getTableName());
 | 
						|
      fTableOId = tblOid;
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Table OID = " << fTableOId << endl;
 | 
						|
 | 
						|
      if (fRef.fCmdArgs.getImportDataMode() != IMPORT_DATA_TEXT)
 | 
						|
      {
 | 
						|
        fFixedBinaryRecLen = calcTableRecLen(fRef.fCmdArgs.getSchemaName(), fRef.fCmdArgs.getTableName());
 | 
						|
      }
 | 
						|
    }
 | 
						|
    catch (std::exception& ex)
 | 
						|
    {
 | 
						|
      std::string aDetails = fRef.fCmdArgs.getSchemaName() + "." + fRef.fCmdArgs.getTableName() + " ERROR : ";
 | 
						|
      std::string aStr = aDetails + ex.what();
 | 
						|
      logging::Message::Args errMsgArgs;
 | 
						|
      // BUG 4152
 | 
						|
      errMsgArgs.add(fRef.fCmdArgs.getSchemaName());
 | 
						|
      errMsgArgs.add(fRef.fCmdArgs.getTableName());
 | 
						|
      errMsgArgs.add(ex.what());
 | 
						|
      fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0097);
 | 
						|
      fLog.logMsg(aStr, MSGLVL_ERROR);
 | 
						|
      setContinue(false);
 | 
						|
 | 
						|
      if (fTableOId == 0)  // error getting table OID
 | 
						|
        throw runtime_error("Please make sure both schema and table exists!!");
 | 
						|
      else  // error getting fixed binary record length
 | 
						|
        throw runtime_error(ex.what());
 | 
						|
    }
 | 
						|
 | 
						|
    int aWaitIntvl = 10;  // In seconds
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
      string aWaitPeriod = config::Config::makeConfig()->getConfig("SystemConfig", "WaitPeriod");
 | 
						|
 | 
						|
      if (!aWaitPeriod.empty())
 | 
						|
        aWaitIntvl = atoi(aWaitPeriod.c_str());
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "aWaitPeriod = " << aWaitPeriod << endl;
 | 
						|
    }
 | 
						|
    catch (std::exception&)
 | 
						|
    {
 | 
						|
      aWaitIntvl = 10;
 | 
						|
    }
 | 
						|
 | 
						|
    std::vector<unsigned int> aPmVec = fRef.fCmdArgs.getPmVec();
 | 
						|
    WETableLockGrabber aTLG(*this);
 | 
						|
    string errMsg;
 | 
						|
 | 
						|
    for (int aIdx = 0; aIdx < (aWaitIntvl * 10); aIdx++)
 | 
						|
    {
 | 
						|
      try
 | 
						|
      {
 | 
						|
        fTableLock = aTLG.grabTableLock(aPmVec, fTableOId);
 | 
						|
 | 
						|
        if (getDebugLvl() > 1)
 | 
						|
          cout << "Table Lock = " << fTableLock << endl;
 | 
						|
      }
 | 
						|
      catch (std::exception& ex)
 | 
						|
      {
 | 
						|
        errMsg = ex.what();
 | 
						|
      }
 | 
						|
 | 
						|
      if (fTableLock != 0)
 | 
						|
        break;
 | 
						|
 | 
						|
      usleep(100000);
 | 
						|
    }
 | 
						|
 | 
						|
    if (fTableLock == 0)
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Failed to acquire Table Lock of ";
 | 
						|
      oss << fRef.fCmdArgs.getSchemaName() << ".";
 | 
						|
      oss << fRef.fCmdArgs.getTableName() << "; " << errMsg;
 | 
						|
      // fLog.logMsg( oss.str(), MSGLVL_ERROR );
 | 
						|
      setContinue(false);
 | 
						|
      throw runtime_error(oss.str());
 | 
						|
    }
 | 
						|
 | 
						|
    if (0 != fTableOId)
 | 
						|
    {
 | 
						|
      try
 | 
						|
      {
 | 
						|
        if (getDebugLvl() > 1)
 | 
						|
        {
 | 
						|
          for (unsigned int idx = 0; idx < aPmVec.size(); idx++)
 | 
						|
            cout << "PmId = " << aPmVec[idx] << std::endl;
 | 
						|
        }
 | 
						|
 | 
						|
        fpBatchLoader = new BatchLoader(fTableOId, 0, aPmVec);
 | 
						|
        // int aRet=fpBatchLoader->selectFirstPM(fFirstPmToSend, fSelectOtherPm);
 | 
						|
        // if (aRet != 0) throw runtime_error("BatchLoader error.. exiting");
 | 
						|
      }
 | 
						|
      catch (std::exception& ex)
 | 
						|
      {
 | 
						|
        releaseTableLocks();
 | 
						|
        ostringstream oss;
 | 
						|
        oss << ex.what() << " ... import exiting";
 | 
						|
        // fLog.logMsg( oss.str(), MSGLVL_ERROR );
 | 
						|
        setContinue(false);
 | 
						|
        throw runtime_error(oss.str());
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  for (int PmId = 1; ((PmId <= fPmCount) && (PmId < MAX_PMS)); ++PmId)
 | 
						|
  {
 | 
						|
    try
 | 
						|
    {
 | 
						|
      if (fRef.getPmStatus(PmId))
 | 
						|
      {
 | 
						|
        fWeSplClients.at(PmId) = new WESplClient(*this, PmId);
 | 
						|
 | 
						|
        if (fWeSplClients[PmId] != NULL)
 | 
						|
        {
 | 
						|
          fWeSplClients[PmId]->setup();
 | 
						|
 | 
						|
          if (2 == fRef.fCmdArgs.getMode())
 | 
						|
            fWeSplClients[PmId]->setRdSecTo(fPmCount);  // Set Rd T/O to 1 sec
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          std::string aStr;
 | 
						|
          aStr = std::string("Encountered NULL WESplClient : ") + std::to_string(PmId);
 | 
						|
          cout << aStr << endl;
 | 
						|
          fLog.logMsg(aStr, MSGLVL_ERROR);
 | 
						|
          throw WESdHandlerException(aStr);
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    catch (const std::exception& ex)
 | 
						|
    {
 | 
						|
      std::string aStr = ex.what();
 | 
						|
      logging::Message::Args errMsgArgs;
 | 
						|
      errMsgArgs.add(aStr);
 | 
						|
      fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
      releaseTableLocks();  // BUG 4295 - release table lock as connection fails
 | 
						|
      // cout << aStr << endl;
 | 
						|
      fLog.logMsg(aStr, MSGLVL_ERROR);
 | 
						|
      throw runtime_error("Error in connection setup.");
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // to initiate adding data to the SendQueue, that many BS
 | 
						|
  // BUG 5031 - Initial Data request count set to 100. This will be the max
 | 
						|
  // Q size on the WES side. Here after a batch send for every data rqst
 | 
						|
  for (int aIdx = 0; aIdx < MAX_WES_QSIZE; aIdx++)
 | 
						|
  {
 | 
						|
    for (int PmId = 1; PmId <= fPmCount; PmId++)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[PmId] != 0)
 | 
						|
        fWeSplClients[PmId]->incDataRqstCount();
 | 
						|
 | 
						|
      // fDataFeedList.addPm2List(PmId);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  fpRespThread = new boost::thread(WERespReadThread(*this));
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // start the File Read thread
 | 
						|
    if ((fRef.fCmdArgs.getMode() == 0) || (fRef.fCmdArgs.getMode() == 1))
 | 
						|
    {
 | 
						|
      if (fRef.getLocFile() == "STDIN")
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Reading input from STDIN to import into table ";
 | 
						|
        oss << fRef.fCmdArgs.getSchemaName() << ".";
 | 
						|
        oss << fRef.fCmdArgs.getTableName() << "...";
 | 
						|
 | 
						|
        if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
          fLog.logMsg(oss.str(), MSGLVL_INFO1);
 | 
						|
      }
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "BatchQuantity=" << fRef.fCmdArgs.getBatchQuantity() << endl;
 | 
						|
 | 
						|
      fFileReadThread.setBatchQty(fRef.fCmdArgs.getBatchQuantity());
 | 
						|
      fFileReadThread.setup(fRef.getLocFile());
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    releaseTableLocks();  // BUG 4295
 | 
						|
    throw runtime_error(ex.what());
 | 
						|
  }
 | 
						|
 | 
						|
  // Output "Running distributed import (Mode{x}) on [all/<pmlist>] PMs"
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Running distributed import (mode ";
 | 
						|
    oss << fRef.fCmdArgs.getMode() << ") on ";
 | 
						|
 | 
						|
    if (fRef.fCmdArgs.getPmVecSize() == static_cast<size_t>(fPmCount))
 | 
						|
      oss << "all PMs...";
 | 
						|
    else
 | 
						|
    {
 | 
						|
      oss << " PM ";
 | 
						|
      std::vector<unsigned int> aPmVec = fRef.fCmdArgs.getPmVec();
 | 
						|
      unsigned int aIdx = 0;
 | 
						|
 | 
						|
      while (aIdx < aPmVec.size())
 | 
						|
      {
 | 
						|
        oss << aPmVec[aIdx];
 | 
						|
        aIdx++;
 | 
						|
 | 
						|
        if (aIdx != aPmVec.size())
 | 
						|
          oss << ",";
 | 
						|
      }
 | 
						|
 | 
						|
      oss << " ...";
 | 
						|
    }
 | 
						|
 | 
						|
    if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
      fLog.logMsg(oss.str(), MSGLVL_INFO1);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
bool WESDHandler::updateCPAndHWMInBRM()
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Inside updateCPAndHWMInBRM()" << endl;
 | 
						|
 | 
						|
  WEBrmUpdater aBrmUpdater(*this);
 | 
						|
  bool aRslt = aBrmUpdater.updateCasualPartitionAndHighWaterMarkInBRM();
 | 
						|
  return aRslt;
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
void WESDHandler::cancelOutstandingCpimports()
 | 
						|
{
 | 
						|
  std::string aStr = "Canceling outstanding cpimports";
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << aStr << endl;
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStr, MSGLVL_INFO1);
 | 
						|
 | 
						|
  fFileReadThread.shutdown();
 | 
						|
  bool aSetFail = false;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      if ((!fWeSplClients[aCnt]->isCpiFailed()) && (!fWeSplClients[aCnt]->isCpiPassed()))
 | 
						|
      {
 | 
						|
        if (getDebugLvl())
 | 
						|
          cout << "Canceling Cpimport in " << aCnt << endl;
 | 
						|
 | 
						|
        // clear the sendQ
 | 
						|
        fWeSplClients[aCnt]->clearSendQueue();
 | 
						|
        messageqcpp::ByteStream aBs;
 | 
						|
        aBs << (ByteStream::byte)WE_CLT_SRV_EOD;
 | 
						|
 | 
						|
        if (fWeSplClients[aCnt]->isConnected())
 | 
						|
        {
 | 
						|
          boost::mutex::scoped_lock aLock(fWeSplClients[aCnt]->fWriteMutex);
 | 
						|
          fWeSplClients[aCnt]->write(aBs);
 | 
						|
          aLock.unlock();
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          fWeSplClients[aCnt]->setCpiFailed(true);  // Setting as FAILED
 | 
						|
        }
 | 
						|
      }
 | 
						|
      // if it is passed already set it as canceled so that we can rollback
 | 
						|
      else if (fWeSplClients[aCnt]->isCpiPassed())
 | 
						|
      {
 | 
						|
        fWeSplClients[aCnt]->setCpiPassed(false);
 | 
						|
        fWeSplClients[aCnt]->setCpiFailed(true);  // Setting as FAILED
 | 
						|
        aSetFail = true;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // setting Manual Failed caused a Rollback. Warrented if it the last PM
 | 
						|
  if (aSetFail)
 | 
						|
  {
 | 
						|
    if (checkAllCpiFailStatus())
 | 
						|
      doRollback();
 | 
						|
  }
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Canceled all outstanding cpimports!!" << endl;
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::checkForRollbackAndCleanup()
 | 
						|
{
 | 
						|
  bool aRetStatus = true;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      // If Anyone of the client is
 | 
						|
      if ((!fWeSplClients[aCnt]->isCpiFailed()) &&  // NOT Failed
 | 
						|
          (!fWeSplClients[aCnt]->isCpiPassed()) &&  // NOT Passed
 | 
						|
          (fWeSplClients[aCnt]->isConnected()))     // NOT Disconnected
 | 
						|
      {
 | 
						|
        aRetStatus = false;  // then its not time to Rollback/Cleanup
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aRetStatus;
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::checkForCpiFailStatus()
 | 
						|
{
 | 
						|
  bool aRetStatus = false;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      // If Anyone of the client is
 | 
						|
      if (fWeSplClients[aCnt]->isCpiFailed())  // Failed
 | 
						|
      {
 | 
						|
        aRetStatus = true;  // encounterd a Failure
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // if no CPI failed but a Forced Failure simulated, return Status as true
 | 
						|
  if ((!aRetStatus) && (fForcedFailure))
 | 
						|
    aRetStatus = true;
 | 
						|
 | 
						|
  return aRetStatus;
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::checkForConnections()
 | 
						|
{
 | 
						|
  time_t aNow = time(0);
 | 
						|
 | 
						|
  for (int PmId = 1; PmId <= fPmCount; ++PmId)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[PmId] != 0)
 | 
						|
    {
 | 
						|
      if (aNow - fWeSplClients[PmId]->getLastInTime() > 180)
 | 
						|
      {
 | 
						|
        std::string aStr;
 | 
						|
        aStr = std::string("Heartbeats missed - Non Responsive PM") + std::to_string(PmId);
 | 
						|
        fLog.logMsg(aStr, MSGLVL_ERROR);
 | 
						|
        fWeSplClients[PmId]->onDisconnect();
 | 
						|
        exit(1);  // Otherwise; have to wait till write() comes out
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::sendHeartbeats()
 | 
						|
{
 | 
						|
  messageqcpp::SBS aSbs(new messageqcpp::ByteStream);
 | 
						|
  *aSbs << (ByteStream::byte)WE_CLT_SRV_KEEPALIVE;
 | 
						|
  send2Pm(aSbs);
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::shutdown()
 | 
						|
{
 | 
						|
  if (fDisconnectFailure)
 | 
						|
  {
 | 
						|
    onDisconnectFailure();
 | 
						|
  }
 | 
						|
 | 
						|
  if (fRef.fSigHup)
 | 
						|
  {
 | 
						|
    onHandlingSigHup();
 | 
						|
  }
 | 
						|
 | 
						|
  if (fRef.fSignaled)
 | 
						|
  {
 | 
						|
    onHandlingSignal();
 | 
						|
  }
 | 
						|
 | 
						|
  fDataFeedList.clearPmList();
 | 
						|
 | 
						|
  fFileReadThread.shutdown();
 | 
						|
 | 
						|
  if (fFileReadThread.getFpThread())
 | 
						|
    fFileReadThread.getFpThread()->join();
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "cleaning up Client threads " << endl;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      fWeSplClients[aCnt]->setContinue(false);
 | 
						|
      fWeSplClients[aCnt]->setConnected(false);
 | 
						|
 | 
						|
      if (fWeSplClients[aCnt]->getFpThread() != NULL)
 | 
						|
        fWeSplClients[aCnt]->getFpThread()->join();
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        fWeSplClients[aCnt]->printStats();
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  boost::mutex::scoped_lock aLock(fRespMutex);
 | 
						|
  this->setContinue(false);
 | 
						|
  usleep(100000);  // so that response thread get updated.
 | 
						|
  fRespCond.notify_all();
 | 
						|
  aLock.unlock();
 | 
						|
 | 
						|
  if (fpRespThread)
 | 
						|
    fpRespThread->join();
 | 
						|
 | 
						|
  fLog.logMsg("Shutdown of all child threads Finished!!", MSGLVL_INFO2);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onStartCpiResponse(int PmId)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "On Start CPI response arrived " << PmId << endl;
 | 
						|
 | 
						|
  messageqcpp::ByteStream aBs;
 | 
						|
  aBs << (ByteStream::byte)WE_CLT_SRV_STARTCPI;
 | 
						|
  send2Pm(aBs, PmId);
 | 
						|
  fWeSplClients[PmId]->setCpiStarted(true);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onDataRqstResponse(int PmId)
 | 
						|
{
 | 
						|
  // may be we should add this pmid to the queue
 | 
						|
  // cout << "Received an DataRqst from "<< PmId << endl; - for debug
 | 
						|
  // BUG 5031 - Don't allow to accumulate RqstCnt uncontrollably
 | 
						|
  // since that will end up sending too many messages to WES.
 | 
						|
  // - Walt don't want this checking since we are going FIXED Q size
 | 
						|
  int aCnt = fWeSplClients[PmId]->getDataRqstCount();
 | 
						|
 | 
						|
  if (aCnt < MAX_WES_QSIZE)  // 100
 | 
						|
  {
 | 
						|
    if (getDebugLvl() > 2)
 | 
						|
      cout << "DataReqst [" << PmId << "] = " << fWeSplClients[PmId]->getDataRqstCount() << endl;
 | 
						|
 | 
						|
    fWeSplClients[PmId]->incDataRqstCount();
 | 
						|
  }
 | 
						|
 | 
						|
  // fDataFeedList.addPm2List(PmId);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onAckResponse(int PmId)
 | 
						|
{
 | 
						|
  // may be we should add this pmid to the queue
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Received an ACK from " << PmId << endl;
 | 
						|
 | 
						|
  // fDataFeedList.addPm2List(PmId);
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onNakResponse(int PmId)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Received a NAK from " << PmId << endl;
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Eod response means we are do not send anymore data to this PM
 | 
						|
// Increase the read timeout for this PM thread so that it won't
 | 
						|
// consume too much CPU
 | 
						|
void WESDHandler::onEodResponse(int PmId)
 | 
						|
{
 | 
						|
  if (getDebugLvl() > 2)
 | 
						|
    cout << "Received a EOD from " << PmId << endl;
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getMode() == 0)
 | 
						|
  {
 | 
						|
    // This is when one PM fail on Mode 0
 | 
						|
    if (checkForCpiFailStatus())  // someone else failed,
 | 
						|
    {
 | 
						|
      // so set this as failed for rollback.
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Setting CPI failed on " << PmId << endl;
 | 
						|
 | 
						|
      fWeSplClients[PmId]->setCpiPassed(false);
 | 
						|
      fWeSplClients[PmId]->setCpiFailed(true);
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Calling onSigInterrupt from onEodResponse() " << endl;
 | 
						|
 | 
						|
      if (checkAllCpiFailStatus())
 | 
						|
        fRef.onSigInterrupt(1);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Calling onCpimportPass() from onEodResponse() " << endl;
 | 
						|
 | 
						|
      onCpimportPass(PmId);  // set dummy Cpimport Pass
 | 
						|
 | 
						|
      if (checkAllCpiPassStatus())  // Mode 0 won't don't have BRM report
 | 
						|
      {
 | 
						|
        fImportRslt.stopTimer();
 | 
						|
        ostringstream oss1;
 | 
						|
        // Bulk load completed, total run time :    2.98625 seconds
 | 
						|
        oss1 << "Load file distribution completed, total run time : ";
 | 
						|
        oss1 << fImportRslt.getTotalRunTime() << " seconds" << endl;
 | 
						|
 | 
						|
        if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
          fLog.logMsg(oss1.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
        fRef.onSigInterrupt(0);  // 1 for the sake of it
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (fRef.fCmdArgs.getMode() == 1)
 | 
						|
  {
 | 
						|
    // Only time when we get a EOD on Mode 1 is when there is failure
 | 
						|
    // in one of the PMs. Stop sending data and Send EOD to all PMS.
 | 
						|
    // Make sure fileread thread is not working anymore.
 | 
						|
    if (fFileReadThread.isContinue())  // check already stopped running
 | 
						|
    {
 | 
						|
      sendEODMsg();
 | 
						|
      fFileReadThread.shutdown();
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
//  BUG 4244
 | 
						|
//	ERROR because
 | 
						|
//	either an File already exist or the directory doesn't exists
 | 
						|
 | 
						|
void WESDHandler::onImpFileError(int PmId)
 | 
						|
{
 | 
						|
  if (fRef.fCmdArgs.getMode() != 0)
 | 
						|
  {
 | 
						|
    cout << "ERROR : we should not be here. Mode Non-Zero" << endl;
 | 
						|
  }
 | 
						|
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  aStrStr << "Target file Error from PM" << PmId << " - File already exists or path doesn't exist";
 | 
						|
  char aDefCon[16], aRedCol[16];
 | 
						|
  snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
 | 
						|
  snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m");
 | 
						|
  cout << aRedCol << aStrStr.str() << aDefCon << endl;
 | 
						|
  fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
 | 
						|
  if (!fWeSplClients[PmId]->isCpiFailed())
 | 
						|
    fWeSplClients[PmId]->setCpiFailed(true);
 | 
						|
  else
 | 
						|
    return;  // this failure already reported so get out
 | 
						|
 | 
						|
  // If IMPFILE open Failed, then stop sending data to other PMs too
 | 
						|
  if (checkAllCpiFailStatus())
 | 
						|
  {
 | 
						|
    // If all are failed we will get out; no rollback for mode 0
 | 
						|
    fRef.onSigInterrupt(1);  // 1 for the sake of it
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    // Stop sending data to all PMs. Also send EOF to all PMs
 | 
						|
    cancelOutstandingCpimports();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onPmErrorResponse(int PmId)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Received a NAK from " << PmId << endl;
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onKeepAliveMessage(int PmId)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Received a Keep Alive from " << PmId << endl;
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onCpimportPass(int PmId)
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  aStrStr << "Received a Cpimport Pass from PM" << PmId;
 | 
						|
  logging::Message::Args errMsgArgs;
 | 
						|
  // BUG 4152
 | 
						|
  errMsgArgs.add(PmId);
 | 
						|
  fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0098);
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
  fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
  fWeSplClients[PmId]->setCpiPassed(true);
 | 
						|
 | 
						|
  // Every CPI passed, BRM report will be send to us b4 this Msg.
 | 
						|
  if (checkForCpiFailStatus())  // someone else failed,
 | 
						|
  {
 | 
						|
    // so set this as failed for rollback.
 | 
						|
    fWeSplClients[PmId]->setCpiPassed(false);
 | 
						|
    fWeSplClients[PmId]->setCpiFailed(true);
 | 
						|
 | 
						|
    if (checkAllCpiFailStatus())
 | 
						|
      doRollback();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onCpimportFail(int PmId, bool SigHandle)
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  char aDefCon[16], aRedCol[16];
 | 
						|
  snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
 | 
						|
  snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m");
 | 
						|
  aStrStr << aRedCol << "Received a Cpimport Failure from PM" << PmId << aDefCon;
 | 
						|
  logging::Message::Args errMsgArgs;
 | 
						|
  errMsgArgs.add(aStrStr.str());
 | 
						|
 | 
						|
  if (0 != PmId)
 | 
						|
  {
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
    // cout << aRedCol << aStrStr.str() << aDefCon << endl;
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
    std::stringstream aStrStr2;
 | 
						|
    aStrStr2 << "Please verify error log files in PM" << PmId;
 | 
						|
 | 
						|
    if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
      fLog.logMsg(aStrStr2.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
    if (!fWeSplClients[PmId]->isCpiFailed())
 | 
						|
      fWeSplClients[PmId]->setCpiFailed(true);
 | 
						|
    else
 | 
						|
      return;  // this failure already reported so get out
 | 
						|
  }
 | 
						|
 | 
						|
  if ((SigHandle) && (0 == PmId))
 | 
						|
  {
 | 
						|
    fForcedFailure = true;
 | 
						|
  }
 | 
						|
 | 
						|
  //	if (!fWeSplClients[PmId]->isCpiFailed())
 | 
						|
  //		fWeSplClients[PmId]->setCpiFailed(true);
 | 
						|
  //	else
 | 
						|
  //		return; // this failure already reported so get out
 | 
						|
 | 
						|
  // If Any CPI Failed, then stop other CPIMPORTS too
 | 
						|
  // cancelOutstandingCpimports() - called below
 | 
						|
  // TODO - later do a total rollback and release locks.
 | 
						|
  if (checkAllCpiFailStatus())
 | 
						|
  {
 | 
						|
    doRollback();
 | 
						|
    // fRef.onSigInterrupt(1);		// 1 for the sake of it
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    // Stop sending data to all PMs. Also send EOF to all PMs
 | 
						|
    // so that all cpimports will finish bulk upload
 | 
						|
    cancelOutstandingCpimports();
 | 
						|
  }
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onBrmReport(int PmId, messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  aStrStr << "Received a BRM-Report from " << PmId;
 | 
						|
  fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
  fWeSplClients[PmId]->setBrmRptRcvd(true);
 | 
						|
 | 
						|
  std::string aStr;
 | 
						|
  int64_t aTotRows = 0;
 | 
						|
  int64_t aInsRows = 0;
 | 
						|
  int aColNum = 0;
 | 
						|
  CalpontSystemCatalog::ColDataType aColType = CalpontSystemCatalog::INT;
 | 
						|
  int aOorVal = 0;
 | 
						|
  std::string aBadFileName;
 | 
						|
  std::string aErrFileName;
 | 
						|
  std::string aColName;
 | 
						|
 | 
						|
  if (getDebugLvl() > 2)
 | 
						|
    cout << "BRM Report length : " << (*Sbs).length() << endl;
 | 
						|
 | 
						|
  while ((*Sbs).length() > 0)
 | 
						|
  {
 | 
						|
    (*Sbs) >> aStr;
 | 
						|
 | 
						|
    if (getDebugLvl() > 2)
 | 
						|
      cout << "BRM Report value : " << aStr << endl;
 | 
						|
 | 
						|
    bool aRet = WEBrmUpdater::prepareRowsInsertedInfo(aStr, aTotRows, aInsRows);
 | 
						|
 | 
						|
    if (aRet)
 | 
						|
    {
 | 
						|
      setRowsUploadInfo(PmId, aTotRows, aInsRows);
 | 
						|
      fImportRslt.updateRowsProcessed(aTotRows);
 | 
						|
      fImportRslt.updateRowsInserted(aInsRows);
 | 
						|
    }
 | 
						|
 | 
						|
    aRet = WEBrmUpdater::prepareColumnOutOfRangeInfo(aStr, aColNum, aColType, aColName, aOorVal);
 | 
						|
 | 
						|
    if (aRet)
 | 
						|
    {
 | 
						|
      add2ColOutOfRangeInfo(PmId, aColNum, aColType, aColName, aOorVal);
 | 
						|
      fImportRslt.updateColOutOfRangeInfo(aColNum, aColType, aColName, aOorVal);
 | 
						|
    }
 | 
						|
 | 
						|
    aRet = WEBrmUpdater::prepareBadDataFileInfo(aStr, aBadFileName);
 | 
						|
 | 
						|
    if (aRet)
 | 
						|
    {
 | 
						|
      setBadFileName(PmId, aBadFileName);
 | 
						|
 | 
						|
      // BUG 4324  - Mode 2 bad/err files left in PM(s).
 | 
						|
      if (1 == fRef.fCmdArgs.getMode())
 | 
						|
        getBadLog(PmId, aBadFileName);
 | 
						|
      else if (2 == fRef.fCmdArgs.getMode())
 | 
						|
      {
 | 
						|
        std::stringstream aOss;
 | 
						|
        aOss << "Bad File : " << aBadFileName << " @ PM" << PmId;
 | 
						|
 | 
						|
        if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
          fLog.logMsg(aOss.str(), MSGLVL_INFO1);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    aRet = WEBrmUpdater::prepareErrorFileInfo(aStr, aErrFileName);
 | 
						|
 | 
						|
    if (aRet)
 | 
						|
    {
 | 
						|
      setErrorFileName(PmId, aErrFileName);
 | 
						|
 | 
						|
      // BUG 4324  - Mode 2 bad/err files left in PM(s).
 | 
						|
      if (1 == fRef.fCmdArgs.getMode())
 | 
						|
        getErrorLog(PmId, aErrFileName);
 | 
						|
      else if (2 == fRef.fCmdArgs.getMode())
 | 
						|
      {
 | 
						|
        std::stringstream aOss;
 | 
						|
        aOss << "Err File : " << aErrFileName << " @ PM" << PmId;
 | 
						|
 | 
						|
        if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
          fLog.logMsg(aOss.str(), MSGLVL_INFO1);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    aRet = check4CriticalErrMsgs(aStr);  // check 4 Critical msg from .bin
 | 
						|
 | 
						|
    if (aRet)
 | 
						|
    {
 | 
						|
      std::stringstream aOss;
 | 
						|
      aOss << "PM" << PmId << " : " << aStr;
 | 
						|
      logging::Message::Args errMsgArgs;
 | 
						|
      errMsgArgs.add(aOss.str());
 | 
						|
 | 
						|
      if (!fRef.fCmdArgs.getConsoleOutput())
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << startup::StartUp::tmpDir() << fTableOId << ".txt";
 | 
						|
        ofstream dmlFile(oss.str().c_str(), std::ofstream::app);
 | 
						|
 | 
						|
        if (dmlFile.is_open())
 | 
						|
        {
 | 
						|
          dmlFile << aOss.str();
 | 
						|
          dmlFile << endl;
 | 
						|
        }
 | 
						|
 | 
						|
        dmlFile.close();
 | 
						|
      }
 | 
						|
 | 
						|
      fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << aOss.str() << endl;
 | 
						|
 | 
						|
      fLog.logMsg(aOss.str(), MSGLVL_ERROR);
 | 
						|
    }
 | 
						|
    else  // do not add Crit Msgs to BRMRpt vector
 | 
						|
      fBrmRptVec.push_back(aStr);
 | 
						|
  }
 | 
						|
 | 
						|
  // Even when CPI fail, we get BRMRpt to get the Err/Bad file.
 | 
						|
  if (checkForCpiFailStatus())
 | 
						|
    return;  // if a PM failed, don't update BRM.
 | 
						|
 | 
						|
  // cout << "Checking clients for BRM Reports" << endl;
 | 
						|
  // TODO we should update BRM with the report we got.
 | 
						|
  if (check4AllBrmReports())
 | 
						|
  {
 | 
						|
    bool aRslt = updateCPAndHWMInBRM();
 | 
						|
 | 
						|
    if (aRslt)
 | 
						|
    {
 | 
						|
      std::stringstream aStrStr;
 | 
						|
      aStrStr << "BRM updated successfully ";
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
      fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
      if (fTableLock != 0)
 | 
						|
      {
 | 
						|
        WETableLockGrabber aTLG(*this);
 | 
						|
        bool aRet = aTLG.changeTableLockState(fTableLock);
 | 
						|
 | 
						|
        if (aRet)
 | 
						|
        {
 | 
						|
          if (getDebugLvl())
 | 
						|
            cout << "\tSuccessfully changed TableLock State" << endl;
 | 
						|
 | 
						|
          doCleanup(true);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          std::stringstream aStrStr;
 | 
						|
          aStrStr << "Failed to change TableLock state to cleanup";
 | 
						|
          fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
 | 
						|
          if (getDebugLvl())
 | 
						|
            cout << aStrStr.str() << endl;
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      std::stringstream aStrStr;
 | 
						|
      aStrStr << "\tBRM update Failed : Need to Manually release the table locks";
 | 
						|
      logging::Message::Args errMsgArgs;
 | 
						|
      errMsgArgs.add(aStrStr.str());
 | 
						|
      fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
      fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
      fRef.onSigInterrupt(1);  // BUG 4701 //failure in BRM update & process, ie. 1
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << "Still Brm Reports to come in!!" << endl;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::check4CriticalErrMsgs(std::string& Entry)
 | 
						|
{
 | 
						|
  bool aFound = false;
 | 
						|
 | 
						|
  if ((!Entry.empty()) && (Entry.at(0) == 'M'))
 | 
						|
  {
 | 
						|
    aFound = true;
 | 
						|
    // start from after "MERR: "
 | 
						|
    std::string aTmp(Entry.begin() + 6, Entry.end());
 | 
						|
    Entry = aTmp;
 | 
						|
  }
 | 
						|
 | 
						|
  return aFound;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onErrorFile(int PmId, messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  aStrStr << "Received ErrReport from " << PmId;
 | 
						|
  logging::Message::Args errMsgArgs;
 | 
						|
  // BUG 4152
 | 
						|
  errMsgArgs.add(PmId);
 | 
						|
  fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0099);
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
  fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
  // TODO - Open the ERROR log file and append to it
 | 
						|
  ofstream aErrFile;
 | 
						|
  std::string aErrFileName;
 | 
						|
  std::string aTmpFileName;
 | 
						|
  std::string aData;
 | 
						|
  (*Sbs) >> aTmpFileName;
 | 
						|
 | 
						|
  // BUG 4324  - Mode 1 bad/err files stored in datafile loc or CWD
 | 
						|
  size_t aPos = aTmpFileName.rfind('/');
 | 
						|
 | 
						|
  if (aPos != std::string::npos)
 | 
						|
  {
 | 
						|
    std::string aFile = aTmpFileName.substr(aPos + 1);  //+1 to pass '/'
 | 
						|
 | 
						|
    std::string aInFile = fRef.fCmdArgs.getErrorDir();
 | 
						|
 | 
						|
    if (aInFile != "/dev/stdin")
 | 
						|
    {
 | 
						|
      size_t aPos2 = aInFile.rfind('/');
 | 
						|
 | 
						|
      if (aPos2 != std::string::npos)
 | 
						|
      {
 | 
						|
        std::string aStr = aInFile.substr(0, aPos2 + 1);
 | 
						|
        // std::cout << "Point 1 " << aStr << std::endl;
 | 
						|
        std::stringstream aStrStr1;
 | 
						|
        aStrStr1 << aStr << aFile;
 | 
						|
        aTmpFileName = aStrStr1.str();
 | 
						|
        // std::cout << "Point 2 " << aTmpFileName << std::endl;
 | 
						|
      }
 | 
						|
      else
 | 
						|
        aTmpFileName = aFile;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      aTmpFileName = aFile;
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      std::cout << "Prep ErrFile " << aTmpFileName << std::endl;
 | 
						|
  }
 | 
						|
 | 
						|
  aStrStr.str(std::string());
 | 
						|
  aStrStr << aTmpFileName << "_" << PmId;
 | 
						|
  aErrFileName = aStrStr.str();  // PmId+"_"+aTmpFileName;
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Error File Name: " << aErrFileName << endl;
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Error Data: " << endl;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    aErrFile.open(aErrFileName.c_str());
 | 
						|
 | 
						|
    while ((*Sbs).length() > 0)
 | 
						|
    {
 | 
						|
      (*Sbs) >> aData;
 | 
						|
 | 
						|
      if (getDebugLvl() > 1)
 | 
						|
        cout << aData << endl;
 | 
						|
 | 
						|
      aErrFile << aData;
 | 
						|
      aErrFile << endl;
 | 
						|
    }
 | 
						|
 | 
						|
    aErrFile.close();
 | 
						|
    setErrorFileName(PmId, aErrFileName);
 | 
						|
    aStrStr.str(std::string());
 | 
						|
    aStrStr << "Row numbers with error reasons are listed in file : " << aErrFileName;
 | 
						|
 | 
						|
    if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
      fLog.logMsg(aStrStr.str(), MSGLVL_INFO1);
 | 
						|
  }
 | 
						|
  catch (std::exception&)
 | 
						|
  {
 | 
						|
    cout << "Error in opening the ERROR file!!" << aErrFileName << endl;
 | 
						|
    cout << "Error in opening the ERROR file!!" << aTmpFileName << endl;
 | 
						|
    cout << "Check for ErrorFile " << aTmpFileName << "in Pm " << PmId << endl;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Process a bulk load *.bad file containing the rejected rows from a PM.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void WESDHandler::onBadFile(int PmId, messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  aStrStr << "Received BadData Report from " << PmId;
 | 
						|
  logging::Message::Args errMsgArgs;
 | 
						|
  // BUG 4152
 | 
						|
  errMsgArgs.add(PmId);
 | 
						|
  fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0100);
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
  fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
  // TODO - Open the ERROR log file and append to it
 | 
						|
  ofstream aBadFile;
 | 
						|
  std::string aBadFileName;
 | 
						|
  std::string aTmpFileName;
 | 
						|
  std::string aData;
 | 
						|
  (*Sbs) >> aTmpFileName;
 | 
						|
 | 
						|
  // BUG 4324  - Mode 1 bad/err files stored in datafile loc or CWD
 | 
						|
  size_t aPos = aTmpFileName.rfind('/');
 | 
						|
 | 
						|
  if (aPos != std::string::npos)
 | 
						|
  {
 | 
						|
    std::string aFile = aTmpFileName.substr(aPos + 1);  //+1 to pass '/'
 | 
						|
 | 
						|
    std::string aInFile = fRef.fCmdArgs.getErrorDir();
 | 
						|
 | 
						|
    if (aInFile != "/dev/stdin")
 | 
						|
    {
 | 
						|
      size_t aPos2 = aInFile.rfind('/');
 | 
						|
 | 
						|
      if (aPos2 != std::string::npos)
 | 
						|
      {
 | 
						|
        std::string aStr = aInFile.substr(0, aPos2 + 1);
 | 
						|
        // std::cout << "Point 1 " << aStr << std::endl;
 | 
						|
        std::stringstream aStrStr1;
 | 
						|
        aStrStr1 << aStr << aFile;
 | 
						|
        aTmpFileName = aStrStr1.str();
 | 
						|
        // std::cout << "Point 2 " << aTmpFileName << std::endl;
 | 
						|
      }
 | 
						|
      else
 | 
						|
        aTmpFileName = aFile;
 | 
						|
    }
 | 
						|
    else
 | 
						|
      aTmpFileName = aFile;
 | 
						|
 | 
						|
    if (getDebugLvl() > 1)
 | 
						|
      std::cout << "Prep BadFile " << aTmpFileName << std::endl;
 | 
						|
  }
 | 
						|
 | 
						|
  aStrStr.str(std::string());
 | 
						|
  aStrStr << aTmpFileName << "_" << PmId;
 | 
						|
  aBadFileName = aStrStr.str();  // PmId+"_"+aTmpFileName;
 | 
						|
 | 
						|
  if (getDebugLvl() > 1)
 | 
						|
    cout << "Bad File Name: " << aBadFileName << endl;
 | 
						|
 | 
						|
  if ((getDebugLvl() > 1) && (fRef.fCmdArgs.getImportDataMode() == IMPORT_DATA_TEXT))
 | 
						|
    cout << "Bad Data: " << endl;
 | 
						|
 | 
						|
  std::string task;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    task = "opening";
 | 
						|
    aBadFile.open(aBadFileName.c_str());
 | 
						|
 | 
						|
    task = "copying rejected rows to";
 | 
						|
 | 
						|
    while ((*Sbs).length() > 0)
 | 
						|
    {
 | 
						|
      (*Sbs) >> aData;
 | 
						|
 | 
						|
      if ((getDebugLvl() > 1) && (fRef.fCmdArgs.getImportDataMode() == IMPORT_DATA_TEXT))
 | 
						|
      {
 | 
						|
        cout << aData;
 | 
						|
      }
 | 
						|
 | 
						|
      aBadFile.write(aData.c_str(), aData.length());
 | 
						|
    }
 | 
						|
 | 
						|
    aBadFile.close();
 | 
						|
    setBadFileName(PmId, aBadFileName);
 | 
						|
 | 
						|
    aStrStr.str(std::string());
 | 
						|
    aStrStr << "Exact error rows are listed in file : " << aBadFileName;
 | 
						|
 | 
						|
    if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
      fLog.logMsg(aStrStr.str(), MSGLVL_INFO1);
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    cout << "Error in " << task << " the bad file " << aBadFileName << "; " << ex.what() << endl;
 | 
						|
    cout << "Check for Bad File " << aTmpFileName << " on Pm " << PmId << endl;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::getErrorLog(int PmId, const std::string& ErrFileName)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Requesting Error Log" << endl;
 | 
						|
 | 
						|
  // TODO code appropriately for the message
 | 
						|
  messageqcpp::ByteStream aBs;
 | 
						|
  aBs << (ByteStream::byte)WE_CLT_SRV_ERRLOG;
 | 
						|
  aBs << ErrFileName;
 | 
						|
  send2Pm(aBs, PmId);
 | 
						|
  // fWeSplClients[PmId]->setErrLogRqst(true);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::getBadLog(int PmId, const std::string& BadFileName)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Requesting Bad Log" << endl;
 | 
						|
 | 
						|
  // TODO code appropriately for the message
 | 
						|
  messageqcpp::ByteStream aBs;
 | 
						|
  aBs << (ByteStream::byte)WE_CLT_SRV_BADLOG;
 | 
						|
  aBs << BadFileName;
 | 
						|
  send2Pm(aBs, PmId);
 | 
						|
  // fWeSplClients[PmId]->setBadLogRqst(true);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onRollbackResult(int PmId, messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  ByteStream::byte aRslt = 0;
 | 
						|
 | 
						|
  (*Sbs) >> aRslt;
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Rollback rslt arrived PmId = " << PmId << " Rslt = " << (int)aRslt << endl;
 | 
						|
 | 
						|
  if (aRslt)
 | 
						|
    fWeSplClients[PmId]->setRollbackRslt(1);
 | 
						|
  else
 | 
						|
  {
 | 
						|
    fWeSplClients[PmId]->setRollbackRslt(-1);
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "Rollback Failed on PM : " << PmId;
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    // BUG 4152
 | 
						|
    errMsgArgs.add(PmId);
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0101);
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << aStrStr.str() << endl;
 | 
						|
  }
 | 
						|
 | 
						|
  int aStatus = check4RollbackRslts();
 | 
						|
 | 
						|
  if (aStatus == -1)
 | 
						|
  {
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "Rollback Failed on one or more PMs";
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    errMsgArgs.add(aStrStr.str());
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
    // fRef.onSigInterrupt(1);		// 1 for the sake of it
 | 
						|
    if (check4AllRollbackStatus())
 | 
						|
    {
 | 
						|
      fRef.onSigInterrupt(1);  // process altogether is a failure
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (aStatus == 1)
 | 
						|
  {
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "Rollback succeed on all PMs";
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
    // false flag sent to doCleanup says to not delete HDFS temp db files,
 | 
						|
    // because the bulk rollback will have already deleted them.  We still
 | 
						|
    // call doCleanup for other file cleanup (like deleting meta file).
 | 
						|
    doCleanup(false);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onCleanupResult(int PmId, messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  ByteStream::byte aRslt = 0;
 | 
						|
  (*Sbs) >> aRslt;
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Cleanup rslt arrived PmId = " << PmId << " Rslt = " << (int)aRslt << endl;
 | 
						|
 | 
						|
  if (aRslt)
 | 
						|
    fWeSplClients[PmId]->setCleanupRslt(1);
 | 
						|
  else
 | 
						|
  {
 | 
						|
    fWeSplClients[PmId]->setCleanupRslt(-1);
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "ERROR: Cleanup Failed on PM : " << PmId;
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    // BUG 4152
 | 
						|
    errMsgArgs.add(PmId);
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0102);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
  }
 | 
						|
 | 
						|
  int aStatus = check4CleanupRslts();
 | 
						|
 | 
						|
  if (aStatus == -1)
 | 
						|
  {
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "Cleanup Failed on one or more PMs";
 | 
						|
    logging::Message::Args errMsgArgs;
 | 
						|
    errMsgArgs.add(aStrStr.str());
 | 
						|
    fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_ERROR);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
    // fRef.onSigInterrupt(1);		// 1 for the sake of it -
 | 
						|
    // We need to wait till all the results comes back.
 | 
						|
    if (check4AllCleanupStatus())
 | 
						|
    {
 | 
						|
      fRef.onSigInterrupt(1);  // failure in cleanup & process, ie. 1
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (aStatus == 1)
 | 
						|
  {
 | 
						|
    releaseTableLocks();
 | 
						|
 | 
						|
    std::stringstream aStrStr;
 | 
						|
    aStrStr << "Cleanup succeed on all PMs";
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
    if (checkAllCpiPassStatus())  // Cleanup and entire process success.
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      // For table walt.abc: 1000 rows processed and 1000 rows inserted.
 | 
						|
      oss << "For table ";
 | 
						|
      oss << fRef.fCmdArgs.getSchemaName() << ".";
 | 
						|
      oss << fRef.fCmdArgs.getTableName() << ": ";
 | 
						|
      oss << fImportRslt.fRowsPro << " rows processed and ";
 | 
						|
      oss << fImportRslt.fRowsIns << " rows inserted.";
 | 
						|
 | 
						|
      if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
        fLog.logMsg(oss.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
      // BUG 4399 Print out WARN messages for out of range counts
 | 
						|
      WEColOorVec::iterator aIt = fImportRslt.fColOorVec.begin();
 | 
						|
      ofstream dmlFile;
 | 
						|
 | 
						|
      if (!fRef.fCmdArgs.getConsoleOutput())  // for DML to use file
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << startup::StartUp::tmpDir() << fTableOId << ".txt";
 | 
						|
        dmlFile.open(oss.str().c_str());
 | 
						|
      }
 | 
						|
 | 
						|
      while (aIt != fImportRslt.fColOorVec.end())
 | 
						|
      {
 | 
						|
        if ((*aIt).fNoOfOORs > 0)
 | 
						|
        {
 | 
						|
          ostringstream ossSatCnt;
 | 
						|
          ossSatCnt << "Column " << (*aIt).fColName << "; Number of ";
 | 
						|
 | 
						|
          switch ((*aIt).fColType)
 | 
						|
          {
 | 
						|
            case CalpontSystemCatalog::DATE: ossSatCnt << "invalid dates replaced with zero value: "; break;
 | 
						|
 | 
						|
            case CalpontSystemCatalog::DATETIME:
 | 
						|
              ossSatCnt << "invalid date/times replaced with zero value: ";
 | 
						|
              break;
 | 
						|
 | 
						|
            case CalpontSystemCatalog::TIMESTAMP:
 | 
						|
              ossSatCnt << "invalid timestamps replaced with zero value: ";
 | 
						|
              break;
 | 
						|
 | 
						|
            case CalpontSystemCatalog::TIME: ossSatCnt << "invalid times replaced with zero value: "; break;
 | 
						|
 | 
						|
            case CalpontSystemCatalog::CHAR: ossSatCnt << "character strings truncated: "; break;
 | 
						|
 | 
						|
            case CalpontSystemCatalog::VARCHAR: ossSatCnt << "varchar strings truncated: "; break;
 | 
						|
 | 
						|
            default: ossSatCnt << "rows inserted with saturated values: "; break;
 | 
						|
          }
 | 
						|
 | 
						|
          ossSatCnt << (*aIt).fNoOfOORs;
 | 
						|
          fLog.logMsg(ossSatCnt.str(), MSGLVL_WARNING);
 | 
						|
 | 
						|
          if (!fRef.fCmdArgs.getConsoleOutput())  // for DML to use
 | 
						|
          {
 | 
						|
            if (dmlFile.is_open())
 | 
						|
            {
 | 
						|
              dmlFile << (*aIt).fNoOfOORs;
 | 
						|
              dmlFile << endl;
 | 
						|
            }
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        aIt++;
 | 
						|
      }
 | 
						|
 | 
						|
      dmlFile.close();
 | 
						|
      fImportRslt.stopTimer();
 | 
						|
      ostringstream oss1;
 | 
						|
      // Bulk load completed, total run time :    2.98625 seconds
 | 
						|
      oss1 << "Bulk load completed, total run time : ";
 | 
						|
      oss1 << fImportRslt.getTotalRunTime() << " seconds";
 | 
						|
 | 
						|
      if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
        fLog.logMsg(oss1.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
      fRef.onSigInterrupt(0);  // 0 for entire success
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Table " << fRef.fCmdArgs.getSchemaName() << ".";
 | 
						|
      oss << fRef.fCmdArgs.getTableName() << ": (OID-";
 | 
						|
      oss << this->getTableOID() << ") was NOT successfully loaded.";
 | 
						|
 | 
						|
      if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
        fLog.logMsg(oss.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
      fImportRslt.stopTimer();
 | 
						|
      ostringstream oss1;
 | 
						|
      // Bulk load completed, total run time :    2.98625 seconds
 | 
						|
      oss1 << "Bulk load completed, total run time : ";
 | 
						|
      oss1 << fImportRslt.getTotalRunTime() << " seconds";
 | 
						|
 | 
						|
      if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
        fLog.logMsg(oss1.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
      // Even though cleanup is success, entire process is failure
 | 
						|
      fRef.onSigInterrupt(1);  // therefore 1
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onDBRootCount(int PmId, messageqcpp::SBS& Sbs)
 | 
						|
{
 | 
						|
  ByteStream::byte aDbrCnt = 0;
 | 
						|
  (*Sbs) >> aDbrCnt;
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "No of DBRoots in PM" << PmId << " = " << (int)aDbrCnt << endl;
 | 
						|
 | 
						|
  if (aDbrCnt > 0)
 | 
						|
  {
 | 
						|
    fWeSplClients[PmId]->setDbRootCnt(static_cast<unsigned int>(aDbrCnt));
 | 
						|
    fWeSplClients[PmId]->resetDbRootVar();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::doRollback()
 | 
						|
{
 | 
						|
  std::string aAppName = "cpimport";
 | 
						|
  messageqcpp::ByteStream aBs;
 | 
						|
  aBs << (ByteStream::byte)WE_CLT_SRV_ROLLBACK;
 | 
						|
  aBs << (ByteStream::octbyte)fTableLock;
 | 
						|
  aBs << (ByteStream::quadbyte)fTableOId;
 | 
						|
  aBs << fRef.fCmdArgs.getTableName();
 | 
						|
  aBs << aAppName;
 | 
						|
  boost::mutex::scoped_lock aLock(fSendMutex);
 | 
						|
  send2Pm(aBs);
 | 
						|
  aLock.unlock();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::doCleanup(bool deleteHdfsTempDbFiles)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "A cleanup is called!!" << endl;
 | 
						|
 | 
						|
  messageqcpp::ByteStream aBs;
 | 
						|
  aBs << (ByteStream::byte)WE_CLT_SRV_CLEANUP;
 | 
						|
  aBs << (ByteStream::quadbyte)fTableOId;
 | 
						|
  aBs << (ByteStream::byte)deleteHdfsTempDbFiles;
 | 
						|
  boost::mutex::scoped_lock aLock(fSendMutex);
 | 
						|
  send2Pm(aBs);
 | 
						|
  aLock.unlock();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::releaseTableLocks()
 | 
						|
{
 | 
						|
  if (fTableLock != 0)
 | 
						|
  {
 | 
						|
    WETableLockGrabber aTLG(*this);
 | 
						|
 | 
						|
    // BUG 4398. Move the call to takeSnapshot() from cpimport.bin to here
 | 
						|
    fDbrm.takeSnapshot();
 | 
						|
 | 
						|
    bool aRet = aTLG.releaseTableLock(fTableLock);
 | 
						|
 | 
						|
    if (aRet)
 | 
						|
    {
 | 
						|
      std::stringstream aStrStr;
 | 
						|
      aStrStr << "Released Table Lock";
 | 
						|
      logging::Message::Args errMsgArgs;
 | 
						|
      errMsgArgs.add(aStrStr.str());
 | 
						|
      fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
 | 
						|
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << aStrStr.str() << endl;
 | 
						|
 | 
						|
      fLog.logMsg(aStrStr.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
      return true;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return false;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WESDHandler::check4RollbackRslts()
 | 
						|
{
 | 
						|
  int aStatus = 1;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      int aRslt = fWeSplClients[aCnt]->getRollbackRslt();
 | 
						|
 | 
						|
      if (aRslt == -1)
 | 
						|
      {
 | 
						|
        // cout << "Rollback Failed in PM - " << aCnt << endl;
 | 
						|
        aStatus = -1;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else if (aRslt == 0)  // not all results available yet
 | 
						|
      {
 | 
						|
        aStatus = 0;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aStatus;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WESDHandler::check4CleanupRslts()
 | 
						|
{
 | 
						|
  int aStatus = 1;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      int aRslt = fWeSplClients[aCnt]->getCleanupRslt();
 | 
						|
 | 
						|
      if (aRslt == -1)
 | 
						|
      {
 | 
						|
        // cout << "Cleanup Failed in PM - " << aCnt << endl;
 | 
						|
        aStatus = -1;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else if (aRslt == 0)  // not all results available yet
 | 
						|
      {
 | 
						|
        aStatus = 0;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aStatus;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::check4AllRollbackStatus()
 | 
						|
{
 | 
						|
  bool aStatus = true;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      int aRslt = fWeSplClients[aCnt]->getRollbackRslt();
 | 
						|
 | 
						|
      if (aRslt == 0)  // not all results available yet; either -1/1
 | 
						|
      {
 | 
						|
        aStatus = false;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aStatus;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::check4AllCleanupStatus()
 | 
						|
{
 | 
						|
  bool aStatus = true;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      int aRslt = fWeSplClients[aCnt]->getCleanupRslt();
 | 
						|
 | 
						|
      if (aRslt == 0)  // not all results available yet; either -1/1
 | 
						|
      {
 | 
						|
        aStatus = false;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aStatus;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::checkAllCpiPassStatus()
 | 
						|
{
 | 
						|
  bool aStatus = true;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      if (!fWeSplClients[aCnt]->isCpiPassed())
 | 
						|
      {
 | 
						|
        if (getDebugLvl())
 | 
						|
          cout << "CPI Pass status still False in " << aCnt << endl;
 | 
						|
 | 
						|
        aStatus = false;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aStatus;
 | 
						|
}
 | 
						|
//
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
bool WESDHandler::checkAllCpiFailStatus()
 | 
						|
{
 | 
						|
  bool aStatus = true;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      if (!fWeSplClients[aCnt]->isCpiFailed())
 | 
						|
      {
 | 
						|
        if (getDebugLvl())
 | 
						|
          cout << "CPI Fail status still False in " << aCnt << endl;
 | 
						|
 | 
						|
        aStatus = false;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aStatus;
 | 
						|
}
 | 
						|
 | 
						|
//-----------------------------------------------------------------------------
 | 
						|
bool WESDHandler::check4AllBrmReports()
 | 
						|
{
 | 
						|
  for (int PmId = 1; PmId <= fPmCount; ++PmId)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[PmId] != NULL)
 | 
						|
    {
 | 
						|
      if (!fWeSplClients[PmId]->isBrmRptRcvd())
 | 
						|
      {
 | 
						|
        if (getDebugLvl())
 | 
						|
          cout << "BRMReport from " << PmId << " still not received" << endl;
 | 
						|
 | 
						|
        return false;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
int WESDHandler::getNextPm2Feed()
 | 
						|
{
 | 
						|
  // int aLdPm = leastDataSendPm();
 | 
						|
  int aLdPm = getNextDbrPm2Send();
 | 
						|
 | 
						|
  if (fWeSplClients[aLdPm] != 0)
 | 
						|
  {
 | 
						|
    // Balancing the total bytes sent and Q Size
 | 
						|
    int aSz = fWeSplClients[aLdPm]->getSendQSize();
 | 
						|
 | 
						|
    if (aSz > MAX_QSIZE)
 | 
						|
    {
 | 
						|
      // cout << "Queue Size = " << aSz << endl;
 | 
						|
      if (fpBatchLoader)
 | 
						|
        fpBatchLoader->reverseSequence();
 | 
						|
 | 
						|
      aLdPm = 0;  // Filled Q
 | 
						|
    }             // Check enough DataRqst to send Data
 | 
						|
 | 
						|
    if (getDebugLvl() > 2)
 | 
						|
      cout << "NextPm2Feed " << aLdPm << endl;
 | 
						|
 | 
						|
    // will work only specific to mode 0 (fpBatchLoader==0)
 | 
						|
    if ((aLdPm > 0) && (!fpBatchLoader))
 | 
						|
      fWeSplClients[aLdPm]->decDbRootVar();
 | 
						|
  }
 | 
						|
  else if (aLdPm != 0)
 | 
						|
  {
 | 
						|
    cout << "Next PMid Error : PmId = " << aLdPm << endl;
 | 
						|
    aLdPm = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  return aLdPm;
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WESDHandler::getNextDbrPm2Send()
 | 
						|
{
 | 
						|
  unsigned int aDbrVar = 0;
 | 
						|
  unsigned int aPmId = 0;
 | 
						|
 | 
						|
  if ((!fAllCpiStarted) && (1 == fRef.fCmdArgs.getMode()))
 | 
						|
  {
 | 
						|
    check4AllCpiStarts();
 | 
						|
    return aPmId;  // Not all Cpi started.
 | 
						|
  }
 | 
						|
 | 
						|
  // NOTE : Implementing BatchLoader, which will be used to select the
 | 
						|
  //			FIRST PM to send data and subsequent PM's to send data
 | 
						|
  if (fpBatchLoader)  // for mode 1 and 2 only, since mode 0 don't have tableOID
 | 
						|
  {
 | 
						|
    if ((!fFirstDataSent) && (1 == fRef.fCmdArgs.getMode()))
 | 
						|
    {
 | 
						|
      try
 | 
						|
      {
 | 
						|
        aPmId = fFirstPmToSend;
 | 
						|
        fFirstDataSent = true;
 | 
						|
 | 
						|
        if (fSelectOtherPm)
 | 
						|
          aPmId = 0;  // Select the other method
 | 
						|
 | 
						|
        if (getDebugLvl())
 | 
						|
          cout << "1st PM for data = " << aPmId << endl;
 | 
						|
 | 
						|
        fpBatchLoader->prepareForSecondPM();
 | 
						|
      }
 | 
						|
      catch (std::exception& ex)
 | 
						|
      {
 | 
						|
        fLog.logMsg(ex.what(), MSGLVL_ERROR);
 | 
						|
        logging::Message::Args errMsgArgs;
 | 
						|
        errMsgArgs.add(ex.what());
 | 
						|
        fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      aPmId = fpBatchLoader->selectNextPM();
 | 
						|
    }
 | 
						|
 | 
						|
    if (getDebugLvl() > 2)
 | 
						|
      cout << "Next PM to get data = " << aPmId << endl;
 | 
						|
 | 
						|
    return aPmId;
 | 
						|
  }
 | 
						|
 | 
						|
  //--------- The part below is for mode 0, which don't have table relation
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      if (aPmId == 0)  // init
 | 
						|
      {
 | 
						|
        aDbrVar = fWeSplClients[aCnt]->getDbRootVar();
 | 
						|
        aPmId = aCnt;
 | 
						|
      }
 | 
						|
      else if (fWeSplClients[aCnt]->getDbRootVar() > aDbrVar)
 | 
						|
      {
 | 
						|
        aPmId = aCnt;
 | 
						|
        aDbrVar = fWeSplClients[aCnt]->getDbRootVar();
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (aDbrVar == 0)
 | 
						|
  {
 | 
						|
    for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[aCnt] != 0)
 | 
						|
      {
 | 
						|
        fWeSplClients[aCnt]->resetDbRootVar();
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    aPmId = 0;
 | 
						|
  }
 | 
						|
 | 
						|
  if (getDebugLvl() > 2)
 | 
						|
    cout << "DbrPM2Send [" << aPmId << "] = " << aDbrVar << endl;
 | 
						|
 | 
						|
  return aPmId;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WESDHandler::leastDataSendPm()
 | 
						|
{
 | 
						|
  messageqcpp::BSSizeType aTx = 0;
 | 
						|
  int aPmId = 0;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      if (aPmId == 0)  // init
 | 
						|
      {
 | 
						|
        aTx = fWeSplClients[aCnt]->getBytesTx();
 | 
						|
        aPmId = aCnt;
 | 
						|
      }
 | 
						|
      else if (fWeSplClients[aCnt]->getBytesTx() < aTx)
 | 
						|
      {
 | 
						|
        aPmId = aCnt;
 | 
						|
        aTx = fWeSplClients[aCnt]->getBytesTx();
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return aPmId;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WESDHandler::getTableOID(std::string Schema, std::string Table)
 | 
						|
{
 | 
						|
  execplan::CalpontSystemCatalog::ROPair roPair;
 | 
						|
  CalpontSystemCatalog::TableName tableName(Schema, Table);
 | 
						|
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | 
						|
  roPair = systemCatalogPtr->tableRID(tableName);
 | 
						|
 | 
						|
  return roPair.objnum;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Get the expected import binary fixed record length for the specified table.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
unsigned int WESDHandler::calcTableRecLen(const std::string& schema, const std::string table)
 | 
						|
{
 | 
						|
  unsigned int recLen = 0;
 | 
						|
 | 
						|
  CalpontSystemCatalog::TableName tableName(schema, table);
 | 
						|
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog();
 | 
						|
  CalpontSystemCatalog::RIDList colRidList = systemCatalogPtr->columnRIDs(tableName, true);
 | 
						|
  CalpontSystemCatalog::RIDList::const_iterator rid_iterator = colRidList.begin();
 | 
						|
 | 
						|
  std::set<std::string> colListInJobFile;
 | 
						|
  fRef.fCmdArgs.getColumnList(colListInJobFile);
 | 
						|
  std::set<std::string>::const_iterator setIter;
 | 
						|
 | 
						|
  // Add up the column widths to get the total record length
 | 
						|
  while (rid_iterator != colRidList.end())
 | 
						|
  {
 | 
						|
    CalpontSystemCatalog::ROPair roPair = *rid_iterator;
 | 
						|
    CalpontSystemCatalog::OID oid = roPair.objnum;
 | 
						|
    CalpontSystemCatalog::ColType colType;
 | 
						|
 | 
						|
    colType = systemCatalogPtr->colType(oid);
 | 
						|
 | 
						|
    // If we have a list of column names taken from an XML job file,
 | 
						|
    // then we filter against that list
 | 
						|
    if (colListInJobFile.size() > 0)
 | 
						|
    {
 | 
						|
      CalpontSystemCatalog::TableColName colName;
 | 
						|
      colName = systemCatalogPtr->colName(oid);
 | 
						|
 | 
						|
      setIter = colListInJobFile.find(colName.column);
 | 
						|
 | 
						|
      if (setIter != colListInJobFile.end())
 | 
						|
      {
 | 
						|
        recLen += colType.colWidth;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      recLen += colType.colWidth;
 | 
						|
    }
 | 
						|
 | 
						|
    ++rid_iterator;
 | 
						|
  }
 | 
						|
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "Binary record length for " << schema << '.' << table << " is: " << recLen << endl;
 | 
						|
 | 
						|
  return recLen;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::check4CpiInvokeMode()
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    oam::oamModuleInfo_t aModInfo = fOam.getModuleInfo();
 | 
						|
    string aModuleType = boost::get<1>(aModInfo);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << "ModuleType " << aModuleType << endl;
 | 
						|
 | 
						|
    int aInstallType = boost::get<5>(aModInfo);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << "InstallType " << aInstallType << endl;
 | 
						|
 | 
						|
    fOam.getSystemConfig("pm", fModuleTypeConfig);
 | 
						|
    fPmCount = fModuleTypeConfig.ModuleCount;
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << "PM Count " << fPmCount << endl;
 | 
						|
 | 
						|
    // oam::INSTALL_COMBINE_* 2,3,4  ie UM+PM in 1 Machine
 | 
						|
    if ((aInstallType > 1) && (fPmCount == 1))
 | 
						|
    {
 | 
						|
      // Mode arg was NOT provided; set to to default Mode 3
 | 
						|
      if (fRef.fCmdArgs.getArgMode() == -1)
 | 
						|
      {
 | 
						|
        fRef.fCmdArgs.setMode(3);
 | 
						|
        fRef.fCmdArgs.setCpiInvoke();
 | 
						|
      }
 | 
						|
      else if (fRef.fCmdArgs.getArgMode() == 3)  // BUG 4210
 | 
						|
      {
 | 
						|
        fRef.fCmdArgs.setCpiInvoke();
 | 
						|
      }
 | 
						|
      else if (fRef.fCmdArgs.getArgMode() == 0)
 | 
						|
      {
 | 
						|
        throw runtime_error("Mode 0 allowed only in Multi-Nodes.");
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if ((aInstallType == oam::INSTALL_NORMAL) && (aModuleType == "um"))
 | 
						|
    {
 | 
						|
      // BUG 4165
 | 
						|
      if (fRef.fCmdArgs.getMode() == 3)
 | 
						|
      {
 | 
						|
        throw runtime_error("Mode 3 imports can only be run on a PM.");
 | 
						|
      }
 | 
						|
      else if (fRef.fCmdArgs.getMode() == -1)  // default mode //BUG 4210
 | 
						|
      {
 | 
						|
        fRef.fCmdArgs.setMode(1);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (aModuleType == "pm")  // BUG 4210
 | 
						|
    {
 | 
						|
      // BUG 4210
 | 
						|
      if ((fPmCount >= 1) && (fRef.fCmdArgs.getMode() == 3))
 | 
						|
      {
 | 
						|
        fRef.fCmdArgs.setCpiInvoke();
 | 
						|
      }
 | 
						|
      // Single node default without argument option m
 | 
						|
      else if ((fPmCount == 1) && (fRef.fCmdArgs.getArgMode() == -1))
 | 
						|
      {
 | 
						|
        fRef.fCmdArgs.setMode(3);
 | 
						|
        fRef.fCmdArgs.setCpiInvoke();
 | 
						|
      }  // Multi-node default without argument option m
 | 
						|
      else if ((fPmCount > 1) && (fRef.fCmdArgs.getArgMode() == -1))
 | 
						|
      {
 | 
						|
        fRef.fCmdArgs.setMode(1);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (runtime_error& exp)
 | 
						|
  {
 | 
						|
    throw runtime_error(exp.what());
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    std::string aStr = "oam.getModuleInfo/getSystemConfig error : WESDHandler check4CpiInvoke()";
 | 
						|
    throw runtime_error(aStr);
 | 
						|
  }
 | 
						|
 | 
						|
  check4PmArguments();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
bool WESDHandler::check4PmArguments()
 | 
						|
{
 | 
						|
  if (fRef.fCmdArgs.getPmVecSize() > 0)
 | 
						|
  {
 | 
						|
    std::vector<unsigned int> aPmVec = fRef.fCmdArgs.getPmVec();
 | 
						|
    int aSize = aPmVec.size();
 | 
						|
 | 
						|
    for (int aIdx = 0; aIdx < aSize; aIdx++)
 | 
						|
    {
 | 
						|
      if ((fPmCount < static_cast<int>(aPmVec[aIdx])) || (0 == aPmVec[aIdx]))
 | 
						|
      {
 | 
						|
        std::stringstream aStrStr;
 | 
						|
        aStrStr << "Invalid argument PMid " << aPmVec[aIdx] << endl;
 | 
						|
        throw runtime_error(aStrStr.str());
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
bool WESDHandler::check4AllCpiStarts()
 | 
						|
{
 | 
						|
  bool aStarted = true;
 | 
						|
 | 
						|
  for (int aCnt = 1; aCnt <= fPmCount; aCnt++)
 | 
						|
  {
 | 
						|
    if (fWeSplClients[aCnt] != 0)
 | 
						|
    {
 | 
						|
      if (!fWeSplClients[aCnt]->isCpiStarted())
 | 
						|
        aStarted = false;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (aStarted)
 | 
						|
    fAllCpiStarted = aStarted;
 | 
						|
 | 
						|
  return aStarted;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::exportJobFile(std::string& JobId, std::string& JobFileName)
 | 
						|
{
 | 
						|
  if (getDebugLvl())
 | 
						|
    cout << "JobFile Name is " << JobFileName << endl;
 | 
						|
 | 
						|
  ifstream aInFile;
 | 
						|
  aInFile.open(JobFileName.c_str());
 | 
						|
 | 
						|
  if ((aInFile.is_open()) && (!aInFile.eof()))
 | 
						|
  {
 | 
						|
    std::stringstream aSs;
 | 
						|
    aSs << fRef.fCmdArgs.getTmpFileDir();
 | 
						|
    aSs << "/Job_";
 | 
						|
    aSs << JobId;
 | 
						|
    aSs << ".xml";
 | 
						|
 | 
						|
    messageqcpp::ByteStream aBs;
 | 
						|
    aBs << (ByteStream::byte)WE_CLT_SRV_JOBID;
 | 
						|
    aBs << aSs.str();
 | 
						|
    send2Pm(aBs);
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << "exportJobFile::Send RmtFileName " << aSs.str() << endl;
 | 
						|
 | 
						|
    // Read everything to a String
 | 
						|
    std::string aData((std::istreambuf_iterator<char>(aInFile)), std::istreambuf_iterator<char>());
 | 
						|
 | 
						|
    if (getDebugLvl())
 | 
						|
      cout << "Sending XML FileData " << aData << endl;
 | 
						|
 | 
						|
    aBs.restart();
 | 
						|
    aBs << (ByteStream::byte)WE_CLT_SRV_JOBDATA;
 | 
						|
    aBs << aData;
 | 
						|
    send2Pm(aBs);
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    throw runtime_error("unable to open Job File");
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
bool WESDHandler::getConsoleLog() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getConsoleLog();
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
char WESDHandler::getEnclChar() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getEnclChar();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
char WESDHandler::getEscChar() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getEscChar();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
int WESDHandler::getReadBufSize()
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getReadBufSize();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
char WESDHandler::getDelimChar() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getDelimChar();
 | 
						|
}
 | 
						|
 | 
						|
size_t WESDHandler::getSkipRows() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getSkipRows();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
std::string WESDHandler::getTableName() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getTableName();
 | 
						|
}
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
std::string WESDHandler::getSchemaName() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getSchemaName();
 | 
						|
}
 | 
						|
 | 
						|
ImportDataMode WESDHandler::getImportDataMode() const
 | 
						|
{
 | 
						|
  return fRef.fCmdArgs.getImportDataMode();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::sysLog(const logging::Message::Args& msgArgs, logging::LOG_TYPE logType,
 | 
						|
                         logging::Message::MessageID msgId)
 | 
						|
{
 | 
						|
  fRef.fpSysLog->logMsg(msgArgs, logType, msgId);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
std::string WESDHandler::getTime2Str() const
 | 
						|
{
 | 
						|
  char aBuff[64];
 | 
						|
  time_t aTime;
 | 
						|
  struct tm pTm;
 | 
						|
  time(&aTime);
 | 
						|
  localtime_r(&aTime, &pTm);
 | 
						|
 | 
						|
  //				  M   D   H   M   S
 | 
						|
  snprintf(aBuff, sizeof(aBuff), "%02d%02d%02d%02d%02d", pTm.tm_mon + 1, pTm.tm_mday, pTm.tm_hour, pTm.tm_min,
 | 
						|
           pTm.tm_sec);
 | 
						|
 | 
						|
  return aBuff;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::setInputFileList(std::string InFileName)
 | 
						|
{
 | 
						|
  fFileReadThread.chkForListOfFiles(InFileName);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onHandlingSignal()
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  char aDefCon[16], aRedCol[16];
 | 
						|
  snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
 | 
						|
  snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m");
 | 
						|
  aStrStr << aRedCol << "Received signal to terminate the process." << aDefCon;
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  logging::Message::Args errMsgArgs;
 | 
						|
  errMsgArgs.add(aStrStr.str());
 | 
						|
  fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
 | 
						|
 | 
						|
  std::stringstream aStrStr1;
 | 
						|
  aStrStr1 << "Handling signal ......";
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr1.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  fRef.fSignaled = false;
 | 
						|
  bool aTblLockReleased = false;
 | 
						|
  bool aRollbackSuccess = false;
 | 
						|
  onCpimportFail(0, true);
 | 
						|
  usleep(2000000 * fPmCount);
 | 
						|
 | 
						|
  // BUG 4649  - Some systems taking too long to finish the process.
 | 
						|
  //				So we have to wait some more time.
 | 
						|
  std::stringstream aStrStr2;
 | 
						|
  aStrStr2 << "Rolling back ..........";
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr2.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  for (int aIdx = 0; aIdx < 60; aIdx++)
 | 
						|
  {
 | 
						|
    int aStatus = check4RollbackRslts();
 | 
						|
 | 
						|
    if (1 == aStatus)
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Rollback Successful... " << endl;
 | 
						|
 | 
						|
      aRollbackSuccess = true;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    else if (-1 == aStatus)
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Rollback Failed... " << endl;
 | 
						|
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    usleep(2000000 * fPmCount);
 | 
						|
  }
 | 
						|
 | 
						|
  // Bug 5774 - if rollback failed, leave the tablelock
 | 
						|
  if (!aRollbackSuccess)
 | 
						|
  {
 | 
						|
    std::stringstream aStrStr2a;
 | 
						|
    aStrStr2a << "Rollback Failed; Leaving Tablelock ... ";
 | 
						|
 | 
						|
    if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
      fLog.logMsg(aStrStr2a.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
    return;
 | 
						|
  }
 | 
						|
 | 
						|
  std::stringstream aStrStr3;
 | 
						|
  aStrStr3 << "Cleaning up ..........";
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr3.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  for (int aIdx = 0; aIdx < 60; aIdx++)
 | 
						|
  {
 | 
						|
    int aStatus = check4CleanupRslts();
 | 
						|
 | 
						|
    if (aStatus == 1)
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Cleanup Successful... " << endl;
 | 
						|
 | 
						|
      releaseTableLocks();
 | 
						|
      aTblLockReleased = true;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    usleep(2000000 * fPmCount);
 | 
						|
  }
 | 
						|
 | 
						|
  if ((!aTblLockReleased) && (aRollbackSuccess))
 | 
						|
  {
 | 
						|
    releaseTableLocks();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onHandlingSigHup()
 | 
						|
{
 | 
						|
  std::stringstream aStrStr;
 | 
						|
  char aDefCon[16], aRedCol[16];
 | 
						|
  snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
 | 
						|
  snprintf(aRedCol, sizeof(aRedCol), "\033[0;31m");
 | 
						|
  aStrStr << aRedCol << "Interrupt received .... Program exiting." << aDefCon;
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  logging::Message::Args errMsgArgs;
 | 
						|
  errMsgArgs.add(aStrStr.str());
 | 
						|
  fRef.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_INFO, logging::M0000);
 | 
						|
 | 
						|
  fRef.fSigHup = false;
 | 
						|
  // onCpimportFail(0, true);  - cancelOutstandingCpimports will hang on send()
 | 
						|
  // cancelOutstandingCpimports();
 | 
						|
  exit(1);  // Hard exit on SIGHUP signal
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::onDisconnectFailure()
 | 
						|
{
 | 
						|
  string aStr("Trying to reconnect and rollback");
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStr, MSGLVL_INFO1);
 | 
						|
 | 
						|
  for (int aSec = 0; aSec < 15; aSec++)
 | 
						|
  {
 | 
						|
    bool aDisconnect = false;
 | 
						|
    usleep(1000000);
 | 
						|
 | 
						|
    for (int PmId = 1; PmId <= fPmCount; ++PmId)
 | 
						|
    {
 | 
						|
      if (fWeSplClients[PmId] != 0)
 | 
						|
      {
 | 
						|
        if (!fWeSplClients[PmId]->isConnected())
 | 
						|
        {
 | 
						|
          aDisconnect = true;
 | 
						|
 | 
						|
          try
 | 
						|
          {
 | 
						|
            fWeSplClients[PmId]->setup();
 | 
						|
          }
 | 
						|
          catch (runtime_error&)
 | 
						|
          {
 | 
						|
            cout << "Unable to connect to PM" << PmId << "; Trying again..." << endl;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (!aDisconnect)
 | 
						|
      break;
 | 
						|
  }
 | 
						|
 | 
						|
  doRollback();
 | 
						|
 | 
						|
  bool aTblLockReleased = false;
 | 
						|
  bool aRollbackSuccess = true;
 | 
						|
  // BUG 4649  - Some systems taking too long to finish the process.
 | 
						|
  //				So we have to wait some more time.
 | 
						|
  std::stringstream aStrStr2;
 | 
						|
  aStrStr2 << "Rolling back ..........";
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr2.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  for (int aIdx = 0; aIdx < 10; aIdx++)
 | 
						|
  {
 | 
						|
    int aStatus = check4RollbackRslts();
 | 
						|
 | 
						|
    if (1 == aStatus)
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Rollback Successful... " << endl;
 | 
						|
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    else if (-1 == aStatus)
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Rollback Failed... " << endl;
 | 
						|
 | 
						|
      aRollbackSuccess = false;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    usleep(2000000 * fPmCount);
 | 
						|
  }
 | 
						|
 | 
						|
  std::stringstream aStrStr3;
 | 
						|
  aStrStr3 << "Cleaning up ..........";
 | 
						|
 | 
						|
  if (fRef.fCmdArgs.getConsoleOutput())
 | 
						|
    fLog.logMsg(aStrStr3.str(), MSGLVL_INFO1);
 | 
						|
 | 
						|
  for (int aIdx = 0; aIdx < 10; aIdx++)
 | 
						|
  {
 | 
						|
    int aStatus = check4CleanupRslts();
 | 
						|
 | 
						|
    if (aStatus == 1)
 | 
						|
    {
 | 
						|
      if (getDebugLvl())
 | 
						|
        cout << "Cleanup Successful... " << endl;
 | 
						|
 | 
						|
      if (aRollbackSuccess)
 | 
						|
      {
 | 
						|
        releaseTableLocks();
 | 
						|
        aTblLockReleased = true;
 | 
						|
      }
 | 
						|
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    usleep(2000000 * fPmCount);
 | 
						|
  }
 | 
						|
 | 
						|
  if ((!aTblLockReleased) && (aRollbackSuccess))
 | 
						|
  {
 | 
						|
    releaseTableLocks();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
void WESDHandler::setDisconnectFailure(bool Flag)
 | 
						|
{
 | 
						|
  if (fFileReadThread.isContinue())  // check already stopped running
 | 
						|
  {
 | 
						|
    sendEODMsg();
 | 
						|
    fFileReadThread.shutdown();
 | 
						|
  }
 | 
						|
 | 
						|
  fDisconnectFailure = Flag;
 | 
						|
  fRef.onSigInterrupt(1);  // process altogether is a failure
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
 | 
						|
} /* namespace WriteEngine */
 |