You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1524 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1524 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/*
 | 
						|
 * $Id: we_redistributeworkerthread.cpp 4646 2013-05-23 20:58:08Z xlou $
 | 
						|
 */
 | 
						|
 | 
						|
#include <iostream>
 | 
						|
#include <set>
 | 
						|
#include <vector>
 | 
						|
#include <cassert>
 | 
						|
#include <stdexcept>
 | 
						|
#include <sstream>
 | 
						|
#include <string>
 | 
						|
#include <unistd.h>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include "boost/scoped_ptr.hpp"
 | 
						|
#include "boost/scoped_array.hpp"
 | 
						|
#include "boost/thread/mutex.hpp"
 | 
						|
#include "boost/filesystem.hpp"
 | 
						|
using namespace boost;
 | 
						|
 | 
						|
#include "installdir.h"
 | 
						|
 | 
						|
#include "configcpp.h"
 | 
						|
using namespace config;
 | 
						|
 | 
						|
#include "liboamcpp.h"
 | 
						|
#include "oamcache.h"
 | 
						|
using namespace oam;
 | 
						|
 | 
						|
#include "dbrm.h"
 | 
						|
using namespace BRM;
 | 
						|
 | 
						|
#include "messagequeue.h"
 | 
						|
#include "bytestream.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
 | 
						|
#include "calpontsystemcatalog.h"
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
#include "exceptclasses.h"
 | 
						|
using namespace logging;
 | 
						|
 | 
						|
#include "IDBFileSystem.h"
 | 
						|
#include "IDBPolicy.h"
 | 
						|
using namespace idbdatafile;
 | 
						|
 | 
						|
#include "we_fileop.h"
 | 
						|
#include "we_messages.h"
 | 
						|
#include "we_convertor.h"
 | 
						|
#include "we_redistributedef.h"
 | 
						|
#include "we_redistributecontrol.h"
 | 
						|
#include "we_redistributeworkerthread.h"
 | 
						|
 | 
						|
namespace redistribute
 | 
						|
{
 | 
						|
// static variables
 | 
						|
boost::mutex RedistributeWorkerThread::fActionMutex;
 | 
						|
volatile bool RedistributeWorkerThread::fStopAction = false;
 | 
						|
volatile bool RedistributeWorkerThread::fCommitted = false;
 | 
						|
string RedistributeWorkerThread::fWesInUse;
 | 
						|
 | 
						|
RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios)
 | 
						|
 : fBs(bs), fIOSocket(ios), fTableLockId(0), fErrorCode(RED_EC_OK), fNewFilePtr(NULL), fOldFilePtr(NULL)
 | 
						|
{
 | 
						|
  fWriteBuffer.reset(new char[CHUNK_SIZE]);
 | 
						|
}
 | 
						|
 | 
						|
RedistributeWorkerThread::~RedistributeWorkerThread()
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock lock(fActionMutex);
 | 
						|
 | 
						|
  if (fNewFilePtr)
 | 
						|
    closeFile(fNewFilePtr);
 | 
						|
 | 
						|
  if (fOldFilePtr)
 | 
						|
    closeFile(fOldFilePtr);
 | 
						|
 | 
						|
  // make sure releasing the table lock.
 | 
						|
  if (fTableLockId > 0)
 | 
						|
  {
 | 
						|
    fDbrm->releaseTableLock(fTableLockId);
 | 
						|
 | 
						|
    // use the interface, line# replaced with lock id.
 | 
						|
    logMessage("Releasing table lock in destructor. ", fTableLockId);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::operator()()
 | 
						|
{
 | 
						|
  memcpy(&fMsgHeader, fBs.buf(), sizeof(RedistributeMsgHeader));
 | 
						|
  fBs.advance(sizeof(RedistributeMsgHeader));
 | 
						|
 | 
						|
  if (fMsgHeader.messageId == RED_ACTN_REQUEST)
 | 
						|
    handleRequest();
 | 
						|
  else if (fMsgHeader.messageId == RED_ACTN_STOP)
 | 
						|
    handleStop();
 | 
						|
  else if (fMsgHeader.messageId == RED_DATA_INIT)
 | 
						|
    handleData();
 | 
						|
  else
 | 
						|
    handleUnknowJobMsg();
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleRequest()
 | 
						|
{
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // clear stop flag if ever set.
 | 
						|
    {
 | 
						|
      boost::mutex::scoped_lock lock(fActionMutex);
 | 
						|
      fStopAction = false;
 | 
						|
      fCommitted = false;
 | 
						|
    }
 | 
						|
 | 
						|
    if (setup() == 0)
 | 
						|
    {
 | 
						|
      if (fBs.length() >= sizeof(RedistributePlanEntry))
 | 
						|
      {
 | 
						|
        memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry));
 | 
						|
        fBs.advance(sizeof(RedistributePlanEntry));
 | 
						|
        OamCache::dbRootPMMap_t dbrootToPM = fOamCache->getDBRootToPMMap();
 | 
						|
        fMyId.first = fPlanEntry.source;
 | 
						|
        fMyId.second = (*dbrootToPM)[fMyId.first];
 | 
						|
        fPeerId.first = fPlanEntry.destination;
 | 
						|
        fPeerId.second = (*dbrootToPM)[fPeerId.first];
 | 
						|
 | 
						|
        if (grabTableLock() == 0)
 | 
						|
        {
 | 
						|
          // workaround extentmap slow update
 | 
						|
          sleep(1);
 | 
						|
 | 
						|
          // build segment & entry list after grabbing the table lock.
 | 
						|
          if (buildEntryList() == 0)
 | 
						|
          {
 | 
						|
            if (sendData() == 0)
 | 
						|
            {
 | 
						|
              // do bulk update
 | 
						|
              updateDbrm();
 | 
						|
            }
 | 
						|
          }
 | 
						|
 | 
						|
          // conversation to peer after got table lock
 | 
						|
          // confirm commit or abort
 | 
						|
          confirmToPeer();
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (const std::exception&)
 | 
						|
  {
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
  }
 | 
						|
 | 
						|
  sendResponse(RED_ACTN_REQUEST);
 | 
						|
 | 
						|
  boost::mutex::scoped_lock lock(fActionMutex);
 | 
						|
  fWesInUse.clear();
 | 
						|
  fMsgQueueClient.reset();
 | 
						|
 | 
						|
  fStopAction = false;
 | 
						|
  fCommitted = false;
 | 
						|
}
 | 
						|
 | 
						|
int RedistributeWorkerThread::setup()
 | 
						|
{
 | 
						|
  int ret = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fConfig = Config::makeConfig();
 | 
						|
    fOamCache = oam::OamCache::makeOamCache();
 | 
						|
    fDbrm = RedistributeControl::instance()->fDbrm;
 | 
						|
 | 
						|
    // for segment file # workaround
 | 
						|
    // string tmp = fConfig->getConfig("ExtentMap", "FilesPerColumnPartition");
 | 
						|
    // int filesPerPartition = fConfig->fromText(tmp);
 | 
						|
    // if (filesPerPartition == 0)
 | 
						|
    //	filesPerPartition = DEFAULT_FILES_PER_COLUMN_PARTITION;
 | 
						|
    // int dbrootNum = fOamCache->getDBRootNums().size();
 | 
						|
    // if (dbrootNum == 0)
 | 
						|
    // {
 | 
						|
    //	fErrorMsg = "OamCache->getDBRootNums() failed.";
 | 
						|
    //	logMessage(fErrorMsg, __LINE__);
 | 
						|
    //	return 1;
 | 
						|
    // }
 | 
						|
    // if ((filesPerPartition % dbrootNum) != 0)
 | 
						|
    // {
 | 
						|
    //	fErrorMsg = "ExtentMap::FilesPerColumnPartition is not a multiple of db root number.";
 | 
						|
    //	logMessage(fErrorMsg, __LINE__);
 | 
						|
    //	return 1;
 | 
						|
    // }
 | 
						|
    // fSegPerRoot = filesPerPartition / dbrootNum;
 | 
						|
  }
 | 
						|
  catch (const std::exception&)
 | 
						|
  {
 | 
						|
    ret = 2;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    ret = 2;
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int RedistributeWorkerThread::grabTableLock()
 | 
						|
{
 | 
						|
  fTableLockId = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    vector<uint32_t> pms;
 | 
						|
    pms.push_back(fMyId.second);
 | 
						|
 | 
						|
    if (fMyId.second != fPeerId.second)
 | 
						|
      pms.push_back(fPeerId.second);
 | 
						|
 | 
						|
    struct timespec ts;
 | 
						|
    ts.tv_sec = 0;
 | 
						|
    ts.tv_nsec = 100 * 1000000;
 | 
						|
 | 
						|
    while (fTableLockId == 0 && !fStopAction)
 | 
						|
    {
 | 
						|
      // make sure it's not stopped.
 | 
						|
      if (fStopAction)
 | 
						|
        return RED_EC_USER_STOP;
 | 
						|
 | 
						|
        // always wait long enough for ddl/dml/cpimport to get table lock
 | 
						|
        // for now, triple the ddl/dml/cpimport retry interval: 3 * 100ms
 | 
						|
      struct timespec tmp = ts;
 | 
						|
 | 
						|
      while (nanosleep(&tmp, &ts) < 0)
 | 
						|
        ;
 | 
						|
 | 
						|
      tmp = ts;
 | 
						|
 | 
						|
      try
 | 
						|
      {
 | 
						|
        uint32_t processID = ::getpid();
 | 
						|
        int32_t txnId = 0;
 | 
						|
        int32_t sessionId = 0;
 | 
						|
        string processName = "WriteEngineServer";
 | 
						|
        fTableLockId = fDbrm->getTableLock(pms, fPlanEntry.table, &processName, &processID, &sessionId,
 | 
						|
                                           &txnId, BRM::LOADING);
 | 
						|
      }
 | 
						|
      catch (const std::exception& ex)
 | 
						|
      {
 | 
						|
        fErrorCode = RED_EC_IDB_HARD_FAIL;
 | 
						|
        logMessage(string("getTableLock exception") + ex.what(), __LINE__);
 | 
						|
      }
 | 
						|
      catch (...)
 | 
						|
      {
 | 
						|
        fErrorCode = RED_EC_IDB_HARD_FAIL;
 | 
						|
        logMessage("getTableLock exception", __LINE__);
 | 
						|
 | 
						|
        // no need to throw
 | 
						|
        // throw IDBExcept(ERR_HARD_FAILURE);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (const std::exception& ex)
 | 
						|
  {
 | 
						|
    // use the interface, line# replaced with lock id.
 | 
						|
    logMessage(string(ex.what()) + " when try to get table lock: ", fTableLockId);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    // use the interface, line# replaced with lock id.
 | 
						|
    logMessage("Unknown exception when try to get table lock: ", fTableLockId);
 | 
						|
  }
 | 
						|
 | 
						|
  // use the interface, line# replaced with lock id.
 | 
						|
  logMessage("Got table lock: ", fTableLockId);
 | 
						|
 | 
						|
  return ((fTableLockId > 0) ? 0 : -1);
 | 
						|
}
 | 
						|
 | 
						|
int RedistributeWorkerThread::buildEntryList()
 | 
						|
{
 | 
						|
  int ret = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // get all column oids
 | 
						|
    boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
 | 
						|
    const CalpontSystemCatalog::TableName table = csc->tableName(fPlanEntry.table);
 | 
						|
    CalpontSystemCatalog::RIDList cols = csc->columnRIDs(table, true);
 | 
						|
    CalpontSystemCatalog::OID tableAuxColOid = csc->tableAUXColumnOID(table);
 | 
						|
 | 
						|
    for (CalpontSystemCatalog::RIDList::iterator i = cols.begin(); i != cols.end(); i++)
 | 
						|
      fOids.push_back(i->objnum);
 | 
						|
 | 
						|
    CalpontSystemCatalog::DictOIDList dicts = csc->dictOIDs(table);
 | 
						|
 | 
						|
    for (CalpontSystemCatalog::DictOIDList::iterator i = dicts.begin(); i != dicts.end(); i++)
 | 
						|
      fOids.push_back(i->dictOID);
 | 
						|
 | 
						|
    if (tableAuxColOid > 3000)
 | 
						|
    {
 | 
						|
      fOids.push_back(tableAuxColOid);
 | 
						|
    }
 | 
						|
 | 
						|
    bool firstOid = true;  // for adding segments, all columns have the same lay out.
 | 
						|
    uint16_t source = fPlanEntry.source;
 | 
						|
    uint16_t target = fPlanEntry.destination;
 | 
						|
    uint16_t partition = fPlanEntry.partition;
 | 
						|
    uint32_t minWidth = 8;  // column width greater than 8 will be dictionary.
 | 
						|
 | 
						|
    for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
 | 
						|
    {
 | 
						|
      vector<EMEntry> entries;
 | 
						|
      int rc = fDbrm->getExtents(*i, entries, false, false, true);
 | 
						|
 | 
						|
      if (rc != 0 || entries.size() == 0)
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Error in DBRM getExtents; oid:" << *i << "; returnCode: " << rc;
 | 
						|
        throw runtime_error(oss.str());
 | 
						|
      }
 | 
						|
 | 
						|
      // same oid has the same column width
 | 
						|
      uint32_t colWid = entries.front().colWid;
 | 
						|
      vector<EMEntry>::iterator targetHwmEntry = entries.end();  // for HWM_0 workaround
 | 
						|
 | 
						|
      if (colWid > 0 && colWid < minWidth)
 | 
						|
        minWidth = colWid;
 | 
						|
 | 
						|
      for (vector<EMEntry>::iterator j = entries.begin(); j != entries.end(); j++)
 | 
						|
      {
 | 
						|
        if (j->dbRoot == source && j->partitionNum == partition)
 | 
						|
        {
 | 
						|
          fUpdateRtEntries.push_back(BulkUpdateDBRootArg(j->range.start, target));
 | 
						|
 | 
						|
          if (firstOid)
 | 
						|
            fSegments.insert(j->segmentNum);
 | 
						|
        }
 | 
						|
 | 
						|
#if 0
 | 
						|
                else if (j->dbRoot == target && j->partitionNum == partition)
 | 
						|
                {
 | 
						|
                    // the partition already exists on the target dbroot
 | 
						|
                    fErrorCode = RED_EC_PART_EXIST_ON_TARGET;
 | 
						|
                    ostringstream oss;
 | 
						|
                    oss << "oid:" << *i << ", partition:" << partition << " exists, source:"
 | 
						|
                        << source << ", destination:" << target;
 | 
						|
                    fErrorMsg = oss.str();
 | 
						|
                    logMessage(fErrorMsg, __LINE__);
 | 
						|
                    return fErrorCode;
 | 
						|
                }
 | 
						|
 | 
						|
#endif
 | 
						|
 | 
						|
        // workaround for HWM_0 of highest extents of the oid on target dbroot.
 | 
						|
        if (j->dbRoot == target)
 | 
						|
        {
 | 
						|
          if (targetHwmEntry == entries.end())
 | 
						|
          {
 | 
						|
            targetHwmEntry = j;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            if (j->partitionNum > targetHwmEntry->partitionNum)
 | 
						|
            {
 | 
						|
              targetHwmEntry = j;
 | 
						|
            }
 | 
						|
            else if (j->partitionNum == targetHwmEntry->partitionNum &&
 | 
						|
                     j->blockOffset > targetHwmEntry->blockOffset)
 | 
						|
            {
 | 
						|
              targetHwmEntry = j;
 | 
						|
            }
 | 
						|
            else if (j->partitionNum == targetHwmEntry->partitionNum &&
 | 
						|
                     j->blockOffset == targetHwmEntry->blockOffset &&
 | 
						|
                     j->segmentNum > targetHwmEntry->segmentNum)
 | 
						|
            {
 | 
						|
              targetHwmEntry = j;
 | 
						|
            }
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }  // for em entries
 | 
						|
 | 
						|
      // HWM_0 workaround
 | 
						|
      // HWM 0 has two possibilities:
 | 
						|
      //     1. segment file has one extent, and the first block is not full yet.
 | 
						|
      //     2. segment file has more than one extents, the HWM of the extents other than
 | 
						|
      //        the last extent is set to 0, that is only last extent has none-zero HWM.
 | 
						|
      // In tuple-bps::makeJob, there is a check to handle last extent has 0 hwm:
 | 
						|
      //  (scannedExtents[i].HWM == 0 && (int) i < lastExtent[scannedExtents[i].dbRoot-1])
 | 
						|
      //          lbidsToScan = scannedExtents[i].range.size * 1024;
 | 
						|
      // Based on this check, the number of block to scan is caculated.
 | 
						|
      // After redistributing the partitions, the original case 1 extent on destination
 | 
						|
      // may not be the highest extent in the dbroot, and result in a full extent scan.
 | 
						|
      // This scan will fail because there is no enough blocks if this is an abbreviated
 | 
						|
      // extent or not enough chunks if the column is compressed.
 | 
						|
      // The workaround is to bump up the HWM to 1 if moved in partitions are greater.
 | 
						|
      if (targetHwmEntry != entries.end() &&  // exclude no extent case
 | 
						|
          targetHwmEntry->colWid > 0 &&       // exclude dictionary
 | 
						|
          targetHwmEntry->HWM == 0 && targetHwmEntry->partitionNum < partition)
 | 
						|
      {
 | 
						|
        BulkSetHWMArg arg;
 | 
						|
        arg.oid = *i;
 | 
						|
        arg.partNum = targetHwmEntry->partitionNum;
 | 
						|
        arg.segNum = targetHwmEntry->segmentNum;
 | 
						|
        arg.hwm = targetHwmEntry->colWid;  // will correct later based on minWidth
 | 
						|
 | 
						|
        fUpdateHwmEntries.push_back(arg);
 | 
						|
      }
 | 
						|
    }  // for oids
 | 
						|
 | 
						|
    // HWM_0 workaround
 | 
						|
    // Caculate the min(column width), the HWM(bump up to) for each column.
 | 
						|
    if (fUpdateHwmEntries.size() > 0)
 | 
						|
    {
 | 
						|
      // update the HWM based in column width, not include dictionary extents
 | 
						|
      for (vector<BRM::BulkSetHWMArg>::iterator j = fUpdateHwmEntries.begin(); j != fUpdateHwmEntries.end();
 | 
						|
           j++)
 | 
						|
      {
 | 
						|
        if (j->hwm <= 8)
 | 
						|
          j->hwm /= minWidth;
 | 
						|
        else
 | 
						|
          j->hwm = 1;  // not needed, but in case
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (const std::exception& ex)
 | 
						|
  {
 | 
						|
    fErrorCode = RED_EC_EXTENT_ERROR;
 | 
						|
    fErrorMsg = ex.what();
 | 
						|
    logMessage(fErrorMsg, __LINE__);
 | 
						|
    ret = fErrorCode;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    fErrorCode = RED_EC_EXTENT_ERROR;
 | 
						|
    fErrorMsg = "get extent error.";
 | 
						|
    logMessage(fErrorMsg, __LINE__);
 | 
						|
    ret = fErrorCode;
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int RedistributeWorkerThread::sendData()
 | 
						|
{
 | 
						|
  WriteEngine::FileOp fileOp;  // just to get filename, not for file operations
 | 
						|
  bool remotePM = (fMyId.second != fPeerId.second);
 | 
						|
  uint32_t dbroot = fPlanEntry.source;
 | 
						|
  uint32_t partition = fPlanEntry.partition;
 | 
						|
  int16_t source = fPlanEntry.source;
 | 
						|
  int16_t dest = fPlanEntry.destination;
 | 
						|
 | 
						|
  IDBDataFile::Types fileType = (IDBPolicy::useHdfs()    ? IDBDataFile::HDFS
 | 
						|
                                 : IDBPolicy::useCloud() ? IDBDataFile::CLOUD
 | 
						|
                                                         : IDBDataFile::UNBUFFERED);
 | 
						|
 | 
						|
  IDBFileSystem& fs = IDBFileSystem::getFs(fileType);
 | 
						|
 | 
						|
  if ((remotePM) && (fileType != IDBDataFile::HDFS))
 | 
						|
  {
 | 
						|
    if (connectToWes(fPeerId.second) != 0)
 | 
						|
    {
 | 
						|
      fErrorCode = RED_EC_CONNECT_FAIL;
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Failed to connect to PM" << fPeerId.second << " from PM" << fMyId.second;
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      return fErrorCode;
 | 
						|
    }
 | 
						|
 | 
						|
    // start to send each segment file
 | 
						|
    uint32_t seq = 0;
 | 
						|
    ByteStream bs;
 | 
						|
 | 
						|
    // start conversion with peer, hand shaking.
 | 
						|
    RedistributeMsgHeader header(dest, source, seq++, RED_DATA_INIT);
 | 
						|
    bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
 | 
						|
    bs.append((const ByteStream::byte*)&header, sizeof(header));
 | 
						|
    fMsgQueueClient->write(bs);
 | 
						|
 | 
						|
    SBS sbs = fMsgQueueClient->read();
 | 
						|
 | 
						|
    if (!checkDataTransferAck(sbs, 0))
 | 
						|
      return fErrorCode;
 | 
						|
 | 
						|
    for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
 | 
						|
    {
 | 
						|
      for (set<int16_t>::iterator j = fSegments.begin(); j != fSegments.end(); ++j)
 | 
						|
      {
 | 
						|
        char fileName[WriteEngine::FILE_NAME_SIZE];
 | 
						|
        int rc = fileOp.oid2FileName(*i, fileName, false, dbroot, partition, *j);
 | 
						|
 | 
						|
        if (rc == WriteEngine::NO_ERROR)
 | 
						|
        {
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "<=redistributing: " << fileName << ", oid=" << *i << ", db=" << source
 | 
						|
              << ", part=" << partition << ", seg=" << *j << " to db=" << dest;
 | 
						|
          logMessage(oss.str(), __LINE__);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          fErrorCode = RED_EC_OID_TO_FILENAME;
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "Failed to get file name: oid=" << *i << ", dbroot=" << dbroot << ", partition=" << partition
 | 
						|
              << ", segment=" << *j;
 | 
						|
          fErrorMsg = oss.str();
 | 
						|
          logMessage(fErrorMsg, __LINE__);
 | 
						|
          return fErrorCode;
 | 
						|
        }
 | 
						|
 | 
						|
        if (fOldFilePtr != NULL)
 | 
						|
          closeFile(fOldFilePtr);
 | 
						|
 | 
						|
        errno = 0;
 | 
						|
        FILE* fOldFilePtr = fopen(fileName, "rb");
 | 
						|
 | 
						|
        if (fOldFilePtr != NULL)
 | 
						|
        {
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "open " << fileName << ", oid=" << *i << ", dbroot=" << dbroot << ", partition=" << partition
 | 
						|
              << ", segment=" << *j << ". " << fOldFilePtr;
 | 
						|
          logMessage(oss.str(), __LINE__);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          int e = errno;
 | 
						|
          fErrorCode = RED_EC_OPEN_FILE_FAIL;
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "Failed to open " << fileName << ", oid=" << *i << ", dbroot=" << dbroot
 | 
						|
              << ", partition=" << partition << ", segment=" << *j << ". " << strerror(e) << " (" << e << ")";
 | 
						|
          fErrorMsg = oss.str();
 | 
						|
          logMessage(fErrorMsg, __LINE__);
 | 
						|
          return fErrorCode;
 | 
						|
        }
 | 
						|
 | 
						|
        // add to set for remove after commit
 | 
						|
        addToDirSet(fileName, true);
 | 
						|
 | 
						|
        char chunk[CHUNK_SIZE];
 | 
						|
        errno = 0;
 | 
						|
        fseek(fOldFilePtr, 0, SEEK_END);     // go to end of file
 | 
						|
        long fileSize = ftell(fOldFilePtr);  // get current file size
 | 
						|
 | 
						|
        if (fileSize < 0)
 | 
						|
        {
 | 
						|
          int e = errno;
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "Fail to tell file size: " << strerror(e) << " (" << e << ")";
 | 
						|
          fErrorMsg = oss.str();
 | 
						|
          fErrorCode = RED_EC_FSEEK_FAIL;
 | 
						|
          logMessage(fErrorMsg, __LINE__);
 | 
						|
          return fErrorCode;
 | 
						|
        }
 | 
						|
 | 
						|
        // send start message to have the file of fileSize created at target dbroot.
 | 
						|
        bs.restart();
 | 
						|
        RedistributeMsgHeader header(dest, source, seq++, RED_DATA_START);
 | 
						|
        RedistributeDataControl dataControl(*i, dest, partition, *j, fileSize);
 | 
						|
        bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
 | 
						|
        bs.append((const ByteStream::byte*)&header, sizeof(header));
 | 
						|
        bs.append((const ByteStream::byte*)&dataControl, sizeof(dataControl));
 | 
						|
        fMsgQueueClient->write(bs);
 | 
						|
 | 
						|
        sbs = fMsgQueueClient->read();
 | 
						|
 | 
						|
        if (!checkDataTransferAck(sbs, fileSize))
 | 
						|
          return fErrorCode;
 | 
						|
 | 
						|
        // now send the file chunk by chunk.
 | 
						|
        rewind(fOldFilePtr);
 | 
						|
        int64_t bytesLeft = fileSize;
 | 
						|
        size_t bytesSend = CHUNK_SIZE;
 | 
						|
        header.messageId = RED_DATA_CONT;
 | 
						|
 | 
						|
        while (bytesLeft > 0)
 | 
						|
        {
 | 
						|
          if (fStopAction)
 | 
						|
          {
 | 
						|
            closeFile(fOldFilePtr);
 | 
						|
            fOldFilePtr = NULL;
 | 
						|
            return RED_EC_USER_STOP;
 | 
						|
          }
 | 
						|
 | 
						|
          if (bytesLeft < (long)CHUNK_SIZE)
 | 
						|
            bytesSend = bytesLeft;
 | 
						|
 | 
						|
          errno = 0;
 | 
						|
          size_t n = fread(chunk, 1, bytesSend, fOldFilePtr);
 | 
						|
 | 
						|
          if (n != bytesSend)
 | 
						|
          {
 | 
						|
            int e = errno;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Fail to read: " << strerror(e) << " (" << e << ")";
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            fErrorCode = RED_EC_FREAD_FAIL;
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
 | 
						|
          header.sequenceNum = seq++;
 | 
						|
          bs.restart();
 | 
						|
          bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
 | 
						|
          bs.append((const ByteStream::byte*)&header, sizeof(header));
 | 
						|
          bs << (size_t)bytesSend;
 | 
						|
          bs.append((const ByteStream::byte*)chunk, bytesSend);
 | 
						|
          fMsgQueueClient->write(bs);
 | 
						|
 | 
						|
          sbs = fMsgQueueClient->read();
 | 
						|
 | 
						|
          if (!checkDataTransferAck(sbs, bytesSend))
 | 
						|
            return fErrorCode;
 | 
						|
 | 
						|
          bytesLeft -= bytesSend;
 | 
						|
        }
 | 
						|
 | 
						|
        closeFile(fOldFilePtr);
 | 
						|
        fOldFilePtr = NULL;
 | 
						|
 | 
						|
        header.messageId = RED_DATA_FINISH;
 | 
						|
        header.sequenceNum = seq++;
 | 
						|
        bs.restart();
 | 
						|
        bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
 | 
						|
        bs.append((const ByteStream::byte*)&header, sizeof(header));
 | 
						|
        bs << (uint64_t)fileSize;
 | 
						|
        fMsgQueueClient->write(bs);
 | 
						|
 | 
						|
        sbs = fMsgQueueClient->read();
 | 
						|
 | 
						|
        if (!checkDataTransferAck(sbs, fileSize))
 | 
						|
          return fErrorCode;
 | 
						|
 | 
						|
      }  // segments
 | 
						|
    }    // for oids
 | 
						|
  }      // remote peer non-hdfs
 | 
						|
  else   // local or HDFS file copy
 | 
						|
  {
 | 
						|
    std::map<int, std::string> rootToPathMap;
 | 
						|
 | 
						|
    // use cp, in case failed in middle.  May consider to use rename if possible.
 | 
						|
    for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
 | 
						|
    {
 | 
						|
      for (set<int16_t>::iterator j = fSegments.begin(); j != fSegments.end(); ++j)
 | 
						|
      {
 | 
						|
        if (fStopAction)
 | 
						|
          return RED_EC_USER_STOP;
 | 
						|
 | 
						|
        if (fileType == IDBDataFile::HDFS)  // HDFS file copy
 | 
						|
        {
 | 
						|
          string sourceName;
 | 
						|
          int rc = buildFullHdfsPath(rootToPathMap,  // map of root to path
 | 
						|
                                     *i,             // OID
 | 
						|
                                     source,         // dbroot
 | 
						|
                                     partition,      // partition
 | 
						|
                                     *j,             // segment
 | 
						|
                                     sourceName);    // full path name
 | 
						|
 | 
						|
          if (rc != 0)
 | 
						|
          {
 | 
						|
            fErrorCode = RED_EC_OID_TO_FILENAME;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Failed to get src file name: oid=" << *i << ", dbroot=" << source
 | 
						|
                << ", partition=" << partition << ", segment=" << *j;
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
 | 
						|
          string destName;
 | 
						|
          rc = buildFullHdfsPath(rootToPathMap,  // map of root to path
 | 
						|
                                 *i,             // OID
 | 
						|
                                 dest,           // dbroot
 | 
						|
                                 partition,      // partition
 | 
						|
                                 *j,             // segment
 | 
						|
                                 destName);      // full path name
 | 
						|
 | 
						|
          if (rc != 0)
 | 
						|
          {
 | 
						|
            fErrorCode = RED_EC_OID_TO_FILENAME;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Failed to get dest file name: oid=" << *i << ", dbroot=" << dest
 | 
						|
                << ", partition=" << partition << ", segment=" << *j;
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "<=redistributing(hdfs): " << sourceName << ", oid=" << *i << ", db=" << source
 | 
						|
              << ", part=" << partition << ", seg=" << *j << " to db=" << dest;
 | 
						|
          logMessage(oss.str(), __LINE__);
 | 
						|
 | 
						|
          // add to set for remove after commit/abort
 | 
						|
          addToDirSet(sourceName.c_str(), true);
 | 
						|
          addToDirSet(destName.c_str(), false);
 | 
						|
 | 
						|
          int ret = fs.copyFile(sourceName.c_str(), destName.c_str());
 | 
						|
 | 
						|
          if (ret != 0)
 | 
						|
          {
 | 
						|
            fErrorCode = RED_EC_COPY_FILE_FAIL;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Failed to copy " << sourceName << " to " << destName << "; error is: " << strerror(errno);
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
        }
 | 
						|
        else  // local file copy
 | 
						|
        {
 | 
						|
          char sourceName[WriteEngine::FILE_NAME_SIZE];
 | 
						|
          int rc = fileOp.oid2FileName(*i, sourceName, false, source, partition, *j);
 | 
						|
 | 
						|
          if (rc != WriteEngine::NO_ERROR)
 | 
						|
          {
 | 
						|
            fErrorCode = RED_EC_OID_TO_FILENAME;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Failed to get file name: oid=" << *i << ", dbroot=" << source
 | 
						|
                << ", partition=" << partition << ", segment=" << *j;
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
 | 
						|
          char destName[WriteEngine::FILE_NAME_SIZE];
 | 
						|
          rc = fileOp.oid2FileName(*i, destName, true, dest, partition, *j);
 | 
						|
 | 
						|
          if (rc != WriteEngine::NO_ERROR)
 | 
						|
          {
 | 
						|
            fErrorCode = RED_EC_OID_TO_FILENAME;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Failed to get file name: oid=" << *i << ", dbroot=" << dest << ", partition=" << partition
 | 
						|
                << ", segment=" << *j;
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
 | 
						|
          ostringstream oss;
 | 
						|
          oss << "<=redistributing(copy): " << sourceName << ", oid=" << *i << ", db=" << source
 | 
						|
              << ", part=" << partition << ", seg=" << *j << " to db=" << dest;
 | 
						|
          logMessage(oss.str(), __LINE__);
 | 
						|
 | 
						|
          // add to set for remove after commit/abort
 | 
						|
          addToDirSet(sourceName, true);
 | 
						|
          addToDirSet(destName, false);
 | 
						|
 | 
						|
          // Using boost::copy_file() instead of IDBFileSystem::copy-
 | 
						|
          // File() so we can capture/report any boost exception error
 | 
						|
          // msg that IDBFileSystem::copyFile() currently swallows.
 | 
						|
          try
 | 
						|
          {
 | 
						|
            boost::filesystem::copy_file(sourceName, destName);
 | 
						|
          }
 | 
						|
 | 
						|
#if BOOST_VERSION >= 105200
 | 
						|
          catch (boost::filesystem::filesystem_error& e)
 | 
						|
#else
 | 
						|
          catch (boost::filesystem::basic_filesystem_error<filesystem::path>& e)
 | 
						|
#endif
 | 
						|
          {
 | 
						|
            fErrorCode = RED_EC_COPY_FILE_FAIL;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "Failed to copy " << sourceName << " to " << destName << "; error is: " << e.what();
 | 
						|
            fErrorMsg = oss.str();
 | 
						|
            logMessage(fErrorMsg, __LINE__);
 | 
						|
            return fErrorCode;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }  // segment
 | 
						|
    }    // oid
 | 
						|
  }      // !remote
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Construct a full path name based on the given oid, root, partition, and seg.
 | 
						|
// The rootToPathMap is the map of dbroot to dbrootPath that we are using.  We
 | 
						|
// are using this function instead of the usual FileOp::oid2FileName() function,
 | 
						|
// because that function only works with "local" DBRoots.  In the case of
 | 
						|
// an HDFS copy, we will be copying files from/to DBRoots that are not on the
 | 
						|
// local PM.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
int RedistributeWorkerThread::buildFullHdfsPath(std::map<int, std::string>& rootToPathMap, int64_t colOid,
 | 
						|
                                                int16_t dbRoot, uint32_t partition, int16_t segment,
 | 
						|
                                                std::string& fullFileName)
 | 
						|
{
 | 
						|
  std::map<int, std::string>::const_iterator iter = rootToPathMap.find(dbRoot);
 | 
						|
 | 
						|
  if (iter == rootToPathMap.end())
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "DBRoot" << dbRoot;
 | 
						|
    std::string dbRootPath = fConfig->getConfig("SystemConfig", oss.str());
 | 
						|
 | 
						|
    if (dbRootPath.empty())
 | 
						|
    {
 | 
						|
      return 1;
 | 
						|
    }
 | 
						|
 | 
						|
    rootToPathMap[dbRoot] = dbRootPath;
 | 
						|
    iter = rootToPathMap.find(dbRoot);
 | 
						|
  }
 | 
						|
 | 
						|
  char tempFileName[WriteEngine::FILE_NAME_SIZE];
 | 
						|
  char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
 | 
						|
 | 
						|
  int rc = WriteEngine::Convertor::oid2FileName(colOid, tempFileName, dbDir, partition, segment);
 | 
						|
 | 
						|
  if (rc != WriteEngine::NO_ERROR)
 | 
						|
  {
 | 
						|
    return 2;
 | 
						|
  }
 | 
						|
 | 
						|
  ostringstream fullFileNameOss;
 | 
						|
  fullFileNameOss << iter->second << '/' << tempFileName;
 | 
						|
  fullFileName = fullFileNameOss.str();
 | 
						|
 | 
						|
  return 0;
 | 
						|
}
 | 
						|
 | 
						|
int RedistributeWorkerThread::connectToWes(int pmId)
 | 
						|
{
 | 
						|
  int ret = 0;
 | 
						|
  ostringstream oss;
 | 
						|
  oss << "pm" << pmId << "_WriteEngineServer";
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fMsgQueueClient.reset(new MessageQueueClient(oss.str(), fConfig));
 | 
						|
  }
 | 
						|
  catch (const std::exception& ex)
 | 
						|
  {
 | 
						|
    fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- " + ex.what();
 | 
						|
    ret = 1;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- unknown";
 | 
						|
    ret = 2;
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int RedistributeWorkerThread::updateDbrm()
 | 
						|
{
 | 
						|
  int rc1 = BRM::ERR_OK;
 | 
						|
  int rc2 = BRM::ERR_OK;
 | 
						|
  boost::mutex::scoped_lock lock(fActionMutex);
 | 
						|
 | 
						|
  // cannot stop after extent map is updated.
 | 
						|
  if (!fStopAction)
 | 
						|
  {
 | 
						|
    if (fUpdateHwmEntries.size() > 0)
 | 
						|
      rc1 = fDbrm->bulkSetHWM(fUpdateHwmEntries, 0);
 | 
						|
 | 
						|
    if (rc1 == BRM::ERR_OK)
 | 
						|
    {
 | 
						|
      int rc2 = fDbrm->bulkUpdateDBRoot(fUpdateRtEntries);
 | 
						|
 | 
						|
      if (rc2 == 0)
 | 
						|
        fCommitted = true;
 | 
						|
      else
 | 
						|
        fErrorCode = RED_EC_UPDATE_DBRM_FAIL;
 | 
						|
    }
 | 
						|
 | 
						|
    // logging for debug
 | 
						|
    {
 | 
						|
      if (fUpdateHwmEntries.size() > 0)
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "HWM_0 workaround, updateHWM(oid,part,seg,hwm)";
 | 
						|
        vector<BRM::BulkSetHWMArg>::iterator i = fUpdateHwmEntries.begin();
 | 
						|
 | 
						|
        for (; i != fUpdateHwmEntries.end(); i++)
 | 
						|
        {
 | 
						|
          oss << ":(" << i->oid << "," << i->partNum << "," << i->segNum << "," << i->hwm << ")";
 | 
						|
        }
 | 
						|
 | 
						|
        oss << ((rc1 == BRM::ERR_OK) ? " success" : " failed");
 | 
						|
        logMessage(oss.str(), __LINE__);
 | 
						|
      }
 | 
						|
 | 
						|
      if (rc1 == BRM::ERR_OK)
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "updateDBRoot(startLBID,dbRoot)";
 | 
						|
        vector<BRM::BulkUpdateDBRootArg>::iterator i = fUpdateRtEntries.begin();
 | 
						|
 | 
						|
        for (; i != fUpdateRtEntries.end(); i++)
 | 
						|
          oss << ":(" << i->startLBID << "," << i->dbRoot << ")";
 | 
						|
 | 
						|
        oss << ((rc2 == BRM::ERR_OK) ? " success" : " failed");
 | 
						|
        logMessage(oss.str(), __LINE__);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return ((rc1 == BRM::ERR_OK && rc2 == BRM::ERR_OK) ? 0 : -1);
 | 
						|
}
 | 
						|
 | 
						|
bool RedistributeWorkerThread::checkDataTransferAck(SBS& sbs, size_t size)
 | 
						|
{
 | 
						|
  if (sbs->length() == 0)
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Zero byte read, Network error.";
 | 
						|
    fErrorMsg = oss.str();
 | 
						|
    logMessage(fErrorMsg, __LINE__);
 | 
						|
    fErrorCode = RED_EC_NETWORK_FAIL;
 | 
						|
  }
 | 
						|
  else if (sbs->length() < (sizeof(RedistributeMsgHeader) + 1))
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Short message, length=" << sbs->length();
 | 
						|
    fErrorMsg = oss.str();
 | 
						|
    logMessage(fErrorMsg, __LINE__);
 | 
						|
    fErrorCode = RED_EC_WKR_MSG_SHORT;
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    // Need check header info
 | 
						|
    ByteStream::byte wesMsgId;
 | 
						|
    *sbs >> wesMsgId;
 | 
						|
    // const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) sbs->buf();
 | 
						|
    sbs->advance(sizeof(RedistributeMsgHeader));
 | 
						|
    size_t ack;
 | 
						|
    *sbs >> ack;
 | 
						|
 | 
						|
    if (ack != size)
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Acked size does not match request: " << ack << "/" << size;
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      fErrorCode = RED_EC_SIZE_NACK;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  sbs.reset();
 | 
						|
 | 
						|
  return (fErrorCode == RED_EC_OK);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::confirmToPeer()
 | 
						|
{
 | 
						|
  if (fTableLockId > 0)
 | 
						|
  {
 | 
						|
    bool rc = false;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
      rc = fDbrm->releaseTableLock(fTableLockId);
 | 
						|
 | 
						|
      // use the interface, line# replaced with lock id.
 | 
						|
      logMessage("Releasing table lock... ", fTableLockId);
 | 
						|
    }
 | 
						|
    catch (const std::exception& ex)
 | 
						|
    {
 | 
						|
      // too bad, the talbe lock is messed up.
 | 
						|
      fErrorMsg = ex.what();
 | 
						|
 | 
						|
      // use the interface, line# replaced with lock id.
 | 
						|
      logMessage("Release table exception: " + fErrorMsg, fTableLockId);
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
      // use the interface, line# replaced with lock id.
 | 
						|
      logMessage("Release table lock unknown exception. ", fTableLockId);
 | 
						|
    }
 | 
						|
 | 
						|
    if (rc == true)
 | 
						|
    {
 | 
						|
      // use the interface, line# replaced with lock id.
 | 
						|
      logMessage("Release table lock return true. ", fTableLockId);
 | 
						|
      fTableLockId = 0;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      // let destructor try again.
 | 
						|
      // use the interface, line# replaced with lock id.
 | 
						|
      logMessage("Release table lock return false. ", fTableLockId);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  IDBFileSystem& fs = (IDBPolicy::useHdfs()    ? IDBFileSystem::getFs(IDBDataFile::HDFS)
 | 
						|
                       : IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
 | 
						|
                                               : IDBFileSystem::getFs(IDBDataFile::BUFFERED));
 | 
						|
 | 
						|
  uint32_t confirmCode = RED_DATA_COMMIT;
 | 
						|
 | 
						|
  if (fErrorCode != RED_EC_OK || fStopAction == true)  // fCommitted must be false
 | 
						|
    confirmCode = RED_DATA_ABORT;
 | 
						|
 | 
						|
  if (fMyId.second != fPeerId.second)
 | 
						|
  {
 | 
						|
    if (fMsgQueueClient.get() != NULL)
 | 
						|
    {
 | 
						|
      ByteStream bs;
 | 
						|
      RedistributeMsgHeader header(fPeerId.first, fMyId.first, -1, confirmCode);
 | 
						|
      bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
 | 
						|
      bs.append((const ByteStream::byte*)&header, sizeof(header));
 | 
						|
      fMsgQueueClient->write(bs);
 | 
						|
 | 
						|
      // not going to retry for now, ignore the ack and close the connection.
 | 
						|
      fMsgQueueClient->read();
 | 
						|
      fMsgQueueClient.reset();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else if (confirmCode != RED_DATA_COMMIT)
 | 
						|
  {
 | 
						|
    for (set<string>::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++)
 | 
						|
    {
 | 
						|
      fs.remove(i->c_str());  // ignoring return code
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // new files committed, remove old ones.
 | 
						|
  if (confirmCode == RED_DATA_COMMIT)
 | 
						|
  {
 | 
						|
    for (set<string>::iterator i = fOldDirSet.begin(); i != fOldDirSet.end(); i++)
 | 
						|
    {
 | 
						|
      fs.remove(i->c_str());  // ignoring return code
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  fNewDirSet.clear();
 | 
						|
  fOldDirSet.clear();
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::addToDirSet(const char* fileName, bool isSource)
 | 
						|
{
 | 
						|
  string path(fileName);
 | 
						|
  size_t found = path.find_last_of("/\\");
 | 
						|
  path = path.substr(0, found);
 | 
						|
 | 
						|
  if (isSource)
 | 
						|
    fOldDirSet.insert(path);
 | 
						|
  else
 | 
						|
    fNewDirSet.insert(path);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleStop()
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock lock(fActionMutex);
 | 
						|
 | 
						|
  // cannot stop after extent map is updated.
 | 
						|
  if (!fCommitted)
 | 
						|
    fStopAction = true;
 | 
						|
 | 
						|
  lock.unlock();
 | 
						|
 | 
						|
  logMessage("User stop", __LINE__);
 | 
						|
  sendResponse(RED_ACTN_STOP);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::sendResponse(uint32_t type)
 | 
						|
{
 | 
						|
  uint32_t tmp = fMsgHeader.destination;
 | 
						|
  fMsgHeader.destination = fMsgHeader.source;
 | 
						|
  fMsgHeader.source = tmp;
 | 
						|
  fMsgHeader.messageId = RED_ACTN_RESP;
 | 
						|
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
 | 
						|
  if (type == RED_ACTN_REQUEST)
 | 
						|
  {
 | 
						|
    if (fErrorCode == RED_EC_OK && fStopAction == false)
 | 
						|
      fPlanEntry.status = RED_TRANS_SUCCESS;
 | 
						|
    else if (fErrorCode == RED_EC_PART_EXIST_ON_TARGET)
 | 
						|
      fPlanEntry.status = RED_TRANS_SKIPPED;
 | 
						|
    else if (fErrorCode != RED_EC_OK)
 | 
						|
      fPlanEntry.status = RED_TRANS_FAILED;
 | 
						|
 | 
						|
    // else -- stopped, may try again if support resume
 | 
						|
 | 
						|
    fBs.append((const ByteStream::byte*)&fPlanEntry, sizeof(fPlanEntry));
 | 
						|
  }
 | 
						|
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleData()
 | 
						|
{
 | 
						|
  bool done = false;
 | 
						|
  bool noExcept = true;
 | 
						|
  SBS sbs;
 | 
						|
  size_t size = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    do
 | 
						|
    {
 | 
						|
      switch (fMsgHeader.messageId)
 | 
						|
      {
 | 
						|
        case RED_DATA_INIT: handleDataInit(); break;
 | 
						|
 | 
						|
        case RED_DATA_START: handleDataStart(sbs, size); break;
 | 
						|
 | 
						|
        case RED_DATA_CONT: handleDataCont(sbs, size); break;
 | 
						|
 | 
						|
        case RED_DATA_FINISH: handleDataFinish(sbs, size); break;
 | 
						|
 | 
						|
        case RED_DATA_COMMIT:
 | 
						|
          handleDataCommit(sbs, size);
 | 
						|
          done = true;
 | 
						|
          break;
 | 
						|
 | 
						|
        case RED_DATA_ABORT:
 | 
						|
          handleDataAbort(sbs, size);
 | 
						|
          done = true;
 | 
						|
          break;
 | 
						|
 | 
						|
        default:
 | 
						|
          handleUnknowDataMsg();
 | 
						|
          done = true;
 | 
						|
          break;
 | 
						|
      }
 | 
						|
 | 
						|
      if (!done)
 | 
						|
      {
 | 
						|
        // get next message
 | 
						|
        sbs = fIOSocket.read();
 | 
						|
        ByteStream::byte wesMsgId;
 | 
						|
        *sbs >> wesMsgId;
 | 
						|
        memcpy(&fMsgHeader, sbs->buf(), sizeof(RedistributeMsgHeader));
 | 
						|
        sbs->advance(sizeof(RedistributeMsgHeader));
 | 
						|
      }
 | 
						|
    } while (!done);  // will break after commit/abort or catch an exception
 | 
						|
  }
 | 
						|
  catch (const std::exception& ex)
 | 
						|
  {
 | 
						|
    noExcept = false;
 | 
						|
    logMessage(ex.what(), __LINE__);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    noExcept = false;
 | 
						|
  }
 | 
						|
 | 
						|
  if (noExcept == false)
 | 
						|
  {
 | 
						|
    // send NACK to peer
 | 
						|
    fBs.restart();
 | 
						|
    fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
    fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
    fBs << ((size_t)-1);
 | 
						|
    fIOSocket.write(fBs);
 | 
						|
  }
 | 
						|
 | 
						|
  fBs.reset();
 | 
						|
  fIOSocket.close();
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleDataInit()
 | 
						|
{
 | 
						|
  uint32_t tmp = fMsgHeader.destination;
 | 
						|
  fMsgHeader.destination = fMsgHeader.source;
 | 
						|
  fMsgHeader.source = tmp;
 | 
						|
  fMsgHeader.messageId = RED_DATA_ACK;
 | 
						|
  size_t size = 0;
 | 
						|
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
  fBs << size;
 | 
						|
 | 
						|
  // finish the hand shaking
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleDataStart(SBS& sbs, size_t& size)
 | 
						|
{
 | 
						|
  char fileName[WriteEngine::FILE_NAME_SIZE];
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // extract the control data for the segment file
 | 
						|
    RedistributeDataControl dc;
 | 
						|
 | 
						|
    if (sbs->length() >= sizeof(RedistributeDataControl))
 | 
						|
    {
 | 
						|
      memcpy(&dc, sbs->buf(), sizeof(RedistributeDataControl));
 | 
						|
      sbs->advance(sizeof(RedistributeDataControl));
 | 
						|
      size = dc.size;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Short message, length=" << sbs->length();
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      fErrorCode = RED_EC_WKR_MSG_SHORT;
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      throw runtime_error(fErrorMsg);
 | 
						|
    }
 | 
						|
 | 
						|
    // create and open the file for writing.
 | 
						|
    WriteEngine::FileOp fileOp;  // just to get filename, not for file operations
 | 
						|
    int rc = fileOp.oid2FileName(dc.oid, fileName, true, dc.dbroot, dc.partition, dc.segment);
 | 
						|
 | 
						|
    if (rc == WriteEngine::NO_ERROR)
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "=>redistributing: " << fileName << ", oid=" << dc.oid << ", db=" << dc.dbroot
 | 
						|
          << ", part=" << dc.partition << ", seg=" << dc.segment << " from db=" << fMsgHeader.source;
 | 
						|
      logMessage(oss.str(), __LINE__);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      fErrorCode = RED_EC_OID_TO_FILENAME;
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Failed to get file name: oid=" << dc.oid << ", dbroot=" << dc.dbroot
 | 
						|
          << ", partition=" << dc.partition << ", segment=" << dc.segment;
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      throw runtime_error(fErrorMsg);
 | 
						|
    }
 | 
						|
 | 
						|
    if (fNewFilePtr != NULL)
 | 
						|
      closeFile(fNewFilePtr);
 | 
						|
 | 
						|
    errno = 0;
 | 
						|
    fNewFilePtr = fopen(fileName, "wb");
 | 
						|
 | 
						|
    if (fNewFilePtr != NULL)
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "open " << fileName << ", oid=" << dc.oid << ", dbroot=" << dc.dbroot
 | 
						|
          << ", partition=" << dc.partition << ", segment=" << dc.segment << ". " << fNewFilePtr;
 | 
						|
      logMessage(oss.str(), __LINE__);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      int e = errno;
 | 
						|
      fErrorCode = RED_EC_OPEN_FILE_FAIL;
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Failed to open " << fileName << ", oid=" << dc.oid << ", dbroot=" << dc.dbroot
 | 
						|
          << ", partition=" << dc.partition << ", segment=" << dc.segment << ". " << strerror(e) << " (" << e
 | 
						|
          << ")";
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      throw runtime_error(fErrorMsg);
 | 
						|
    }
 | 
						|
 | 
						|
    // set output buffering
 | 
						|
    errno = 0;
 | 
						|
 | 
						|
    if (setvbuf(fNewFilePtr, fWriteBuffer.get(), _IOFBF, CHUNK_SIZE))
 | 
						|
    {
 | 
						|
      int e = errno;
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Failed to set i/o buffer: " << strerror(e) << " (" << e << ")";
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
 | 
						|
      // not throwing an exception now.
 | 
						|
    }
 | 
						|
 | 
						|
    // add to set for remove after abort
 | 
						|
    addToDirSet(fileName, false);
 | 
						|
 | 
						|
    // do a fseek will show the right size, but will not actually allocate the continuous block.
 | 
						|
    // do write 4k block till file size.
 | 
						|
    char buf[PRE_ALLOC_SIZE] = {1};
 | 
						|
    size_t nmemb = size / PRE_ALLOC_SIZE;
 | 
						|
 | 
						|
    while (nmemb-- > 0)
 | 
						|
    {
 | 
						|
      errno = 0;
 | 
						|
      size_t n = fwrite(buf, PRE_ALLOC_SIZE, 1, fNewFilePtr);
 | 
						|
 | 
						|
      if (n != 1)
 | 
						|
      {
 | 
						|
        int e = errno;
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Fail to preallocate file: " << strerror(e) << " (" << e << ")";
 | 
						|
        fErrorMsg = oss.str();
 | 
						|
        fErrorCode = RED_EC_FWRITE_FAIL;
 | 
						|
        logMessage(fErrorMsg, __LINE__);
 | 
						|
        throw runtime_error(fErrorMsg);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    // move back to beging to write real data
 | 
						|
    fflush(fNewFilePtr);
 | 
						|
    rewind(fNewFilePtr);
 | 
						|
  }
 | 
						|
  catch (const std::exception& ex)
 | 
						|
  {
 | 
						|
    // NACK
 | 
						|
    size = -1;
 | 
						|
    logMessage(ex.what(), __LINE__);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    // NACK
 | 
						|
    size = -1;
 | 
						|
  }
 | 
						|
 | 
						|
  // ack file size
 | 
						|
  fMsgHeader.messageId = RED_DATA_ACK;
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
  fBs << size;
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
 | 
						|
  // reset to count the data received
 | 
						|
  size = 0;
 | 
						|
  sbs.reset();
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleDataCont(SBS& sbs, size_t& size)
 | 
						|
{
 | 
						|
  size_t ack = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    size_t bytesRcvd = 0;
 | 
						|
    *sbs >> bytesRcvd;
 | 
						|
 | 
						|
    if (bytesRcvd != sbs->length())
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Incorrect data length: " << sbs->length() << ", expecting " << bytesRcvd;
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      fErrorCode = RED_EC_BS_TOO_SHORT;
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      throw runtime_error(fErrorMsg);
 | 
						|
    }
 | 
						|
 | 
						|
    errno = 0;
 | 
						|
    size_t n = fwrite(sbs->buf(), 1, bytesRcvd, fNewFilePtr);
 | 
						|
 | 
						|
    if (n != bytesRcvd)
 | 
						|
    {
 | 
						|
      int e = errno;
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "Fail to write file: " << strerror(e) << " (" << e << ")";
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      fErrorCode = RED_EC_FWRITE_FAIL;
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      throw runtime_error(fErrorMsg);
 | 
						|
    }
 | 
						|
 | 
						|
    ack = bytesRcvd;
 | 
						|
    size += ack;
 | 
						|
  }
 | 
						|
  catch (const std::exception&)
 | 
						|
  {
 | 
						|
    // NACK
 | 
						|
    size = -1;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    // NACK
 | 
						|
    ack = -1;
 | 
						|
  }
 | 
						|
 | 
						|
  // ack received data
 | 
						|
  sbs.reset();
 | 
						|
  fMsgHeader.messageId = RED_DATA_ACK;
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
  fBs << ack;
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleDataFinish(SBS& sbs, size_t& size)
 | 
						|
{
 | 
						|
  size_t ack = 0;
 | 
						|
 | 
						|
  // close open file
 | 
						|
  closeFile(fNewFilePtr);
 | 
						|
  fNewFilePtr = NULL;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    size_t fileSize = 0;
 | 
						|
    *sbs >> fileSize;
 | 
						|
 | 
						|
    if (fileSize != size)
 | 
						|
    {
 | 
						|
      ostringstream oss;
 | 
						|
      oss << "File size not match: local=" << size << ", remote=" << fileSize;
 | 
						|
      fErrorMsg = oss.str();
 | 
						|
      fErrorCode = RED_EC_FILE_SIZE_NOT_MATCH;
 | 
						|
      logMessage(fErrorMsg, __LINE__);
 | 
						|
      throw runtime_error(fErrorMsg);
 | 
						|
    }
 | 
						|
 | 
						|
    ack = size;
 | 
						|
  }
 | 
						|
  catch (const std::exception&)
 | 
						|
  {
 | 
						|
    // NACK
 | 
						|
    size = -1;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    // NACK
 | 
						|
    ack = -1;
 | 
						|
  }
 | 
						|
 | 
						|
  // ack received data
 | 
						|
  sbs.reset();
 | 
						|
  fMsgHeader.messageId = RED_DATA_ACK;
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
  fBs << ack;
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleDataCommit(SBS& sbs, size_t& /*size*/)
 | 
						|
{
 | 
						|
  size_t ack = 0;
 | 
						|
  sbs.reset();
 | 
						|
  fMsgHeader.messageId = RED_DATA_ACK;
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
  fBs << ack;
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleDataAbort(SBS& sbs, size_t& /*size*/)
 | 
						|
{
 | 
						|
  // close open file
 | 
						|
  if (fNewFilePtr != NULL)
 | 
						|
    closeFile(fNewFilePtr);
 | 
						|
 | 
						|
  IDBFileSystem& fs = (IDBPolicy::useHdfs()    ? IDBFileSystem::getFs(IDBDataFile::HDFS)
 | 
						|
                       : IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
 | 
						|
                                               : IDBFileSystem::getFs(IDBDataFile::BUFFERED));
 | 
						|
 | 
						|
  // remove local files
 | 
						|
  for (set<string>::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++)
 | 
						|
  {
 | 
						|
    fs.remove(i->c_str());  // ignoring return code
 | 
						|
  }
 | 
						|
 | 
						|
  // send ack
 | 
						|
  sbs.reset();
 | 
						|
  size_t ack = 0;
 | 
						|
  fMsgHeader.messageId = RED_DATA_ACK;
 | 
						|
  fBs.restart();
 | 
						|
  fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
 | 
						|
  fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
 | 
						|
  fBs << ack;
 | 
						|
  fIOSocket.write(fBs);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleUnknowDataMsg()
 | 
						|
{
 | 
						|
  ostringstream oss;
 | 
						|
  oss << "Unknown data message: " << fMsgHeader.messageId;
 | 
						|
  fErrorMsg = oss.str();
 | 
						|
  fErrorCode = RED_EC_UNKNOWN_DATA_MSG;
 | 
						|
  logMessage(fErrorMsg, __LINE__);
 | 
						|
  throw runtime_error(fErrorMsg);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::handleUnknowJobMsg()
 | 
						|
{
 | 
						|
  ostringstream oss;
 | 
						|
  oss << "Unknown job message: " << fMsgHeader.messageId;
 | 
						|
  fErrorMsg = oss.str();
 | 
						|
  fErrorCode = RED_EC_UNKNOWN_JOB_MSG;
 | 
						|
  logMessage(fErrorMsg, __LINE__);
 | 
						|
 | 
						|
  // protocol error, ignore and close connection.
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::closeFile(FILE* f)
 | 
						|
{
 | 
						|
  if (f == NULL)
 | 
						|
    return;
 | 
						|
 | 
						|
  ostringstream oss;
 | 
						|
  oss << "close file* " << f << " ";
 | 
						|
 | 
						|
  errno = 0;
 | 
						|
  int rc = fclose(f);
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
    oss << "error: " << strerror(errno) << " (" << errno << ")";
 | 
						|
  else
 | 
						|
    oss << "OK";
 | 
						|
 | 
						|
  logMessage(oss.str(), __LINE__);
 | 
						|
}
 | 
						|
 | 
						|
void RedistributeWorkerThread::logMessage(const string& msg, int line)
 | 
						|
{
 | 
						|
  ostringstream oss;
 | 
						|
  oss << msg << " @workerThread:" << line;
 | 
						|
  RedistributeControl::instance()->logMessage(oss.str());
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace redistribute
 | 
						|
 |