You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-29 08:21:15 +03:00
This patch introduces centralized logic of selecting what dbroot is accessible in PrimProc on what node. The logic is in OamCache for time being and can be moved later.
1523 lines
44 KiB
C++
1523 lines
44 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*
|
|
* $Id: we_redistributeworkerthread.cpp 4646 2013-05-23 20:58:08Z xlou $
|
|
*/
|
|
|
|
#include <iostream>
|
|
#include <set>
|
|
#include <vector>
|
|
#include <cassert>
|
|
#include <stdexcept>
|
|
#include <sstream>
|
|
#include <string>
|
|
#include <unistd.h>
|
|
using namespace std;
|
|
|
|
#include "boost/scoped_ptr.hpp"
|
|
#include "boost/scoped_array.hpp"
|
|
#include "boost/thread/mutex.hpp"
|
|
#include "boost/filesystem.hpp"
|
|
using namespace boost;
|
|
|
|
#include "installdir.h"
|
|
|
|
#include "configcpp.h"
|
|
using namespace config;
|
|
|
|
#include "liboamcpp.h"
|
|
#include "oamcache.h"
|
|
using namespace oam;
|
|
|
|
#include "dbrm.h"
|
|
using namespace BRM;
|
|
|
|
#include "messagequeue.h"
|
|
#include "bytestream.h"
|
|
using namespace messageqcpp;
|
|
|
|
#include "calpontsystemcatalog.h"
|
|
using namespace execplan;
|
|
|
|
#include "exceptclasses.h"
|
|
using namespace logging;
|
|
|
|
#include "IDBFileSystem.h"
|
|
#include "IDBPolicy.h"
|
|
using namespace idbdatafile;
|
|
|
|
#include "we_fileop.h"
|
|
#include "we_messages.h"
|
|
#include "we_convertor.h"
|
|
#include "we_redistributedef.h"
|
|
#include "we_redistributecontrol.h"
|
|
#include "we_redistributeworkerthread.h"
|
|
|
|
namespace redistribute
|
|
{
|
|
// static variables
|
|
boost::mutex RedistributeWorkerThread::fActionMutex;
|
|
volatile bool RedistributeWorkerThread::fStopAction = false;
|
|
volatile bool RedistributeWorkerThread::fCommitted = false;
|
|
string RedistributeWorkerThread::fWesInUse;
|
|
|
|
RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios)
|
|
: fBs(bs), fIOSocket(ios), fTableLockId(0), fErrorCode(RED_EC_OK), fNewFilePtr(NULL), fOldFilePtr(NULL)
|
|
{
|
|
fWriteBuffer.reset(new char[CHUNK_SIZE]);
|
|
}
|
|
|
|
RedistributeWorkerThread::~RedistributeWorkerThread()
|
|
{
|
|
boost::mutex::scoped_lock lock(fActionMutex);
|
|
|
|
if (fNewFilePtr)
|
|
closeFile(fNewFilePtr);
|
|
|
|
if (fOldFilePtr)
|
|
closeFile(fOldFilePtr);
|
|
|
|
// make sure releasing the table lock.
|
|
if (fTableLockId > 0)
|
|
{
|
|
fDbrm->releaseTableLock(fTableLockId);
|
|
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Releasing table lock in destructor. ", fTableLockId);
|
|
}
|
|
}
|
|
|
|
void RedistributeWorkerThread::operator()()
|
|
{
|
|
memcpy(&fMsgHeader, fBs.buf(), sizeof(RedistributeMsgHeader));
|
|
fBs.advance(sizeof(RedistributeMsgHeader));
|
|
|
|
if (fMsgHeader.messageId == RED_ACTN_REQUEST)
|
|
handleRequest();
|
|
else if (fMsgHeader.messageId == RED_ACTN_STOP)
|
|
handleStop();
|
|
else if (fMsgHeader.messageId == RED_DATA_INIT)
|
|
handleData();
|
|
else
|
|
handleUnknowJobMsg();
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleRequest()
|
|
{
|
|
try
|
|
{
|
|
// clear stop flag if ever set.
|
|
{
|
|
boost::mutex::scoped_lock lock(fActionMutex);
|
|
fStopAction = false;
|
|
fCommitted = false;
|
|
}
|
|
|
|
if (setup() == 0)
|
|
{
|
|
if (fBs.length() >= sizeof(RedistributePlanEntry))
|
|
{
|
|
memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry));
|
|
fBs.advance(sizeof(RedistributePlanEntry));
|
|
fMyId.first = fPlanEntry.source;
|
|
fMyId.second = fOamCache->getOwnerPM(fMyId.first);
|
|
fPeerId.first = fPlanEntry.destination;
|
|
fPeerId.second = fOamCache->getOwnerPM(fPeerId.first);
|
|
|
|
if (grabTableLock() == 0)
|
|
{
|
|
// workaround extentmap slow update
|
|
sleep(1);
|
|
|
|
// build segment & entry list after grabbing the table lock.
|
|
if (buildEntryList() == 0)
|
|
{
|
|
if (sendData() == 0)
|
|
{
|
|
// do bulk update
|
|
updateDbrm();
|
|
}
|
|
}
|
|
|
|
// conversation to peer after got table lock
|
|
// confirm commit or abort
|
|
confirmToPeer();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
catch (const std::exception&)
|
|
{
|
|
}
|
|
catch (...)
|
|
{
|
|
}
|
|
|
|
sendResponse(RED_ACTN_REQUEST);
|
|
|
|
boost::mutex::scoped_lock lock(fActionMutex);
|
|
fWesInUse.clear();
|
|
fMsgQueueClient.reset();
|
|
|
|
fStopAction = false;
|
|
fCommitted = false;
|
|
}
|
|
|
|
int RedistributeWorkerThread::setup()
|
|
{
|
|
int ret = 0;
|
|
|
|
try
|
|
{
|
|
fConfig = Config::makeConfig();
|
|
fOamCache = oam::OamCache::makeOamCache();
|
|
fDbrm = RedistributeControl::instance()->fDbrm;
|
|
|
|
// for segment file # workaround
|
|
// string tmp = fConfig->getConfig("ExtentMap", "FilesPerColumnPartition");
|
|
// int filesPerPartition = fConfig->fromText(tmp);
|
|
// if (filesPerPartition == 0)
|
|
// filesPerPartition = DEFAULT_FILES_PER_COLUMN_PARTITION;
|
|
// int dbrootNum = fOamCache->getDBRootNums().size();
|
|
// if (dbrootNum == 0)
|
|
// {
|
|
// fErrorMsg = "OamCache->getDBRootNums() failed.";
|
|
// logMessage(fErrorMsg, __LINE__);
|
|
// return 1;
|
|
// }
|
|
// if ((filesPerPartition % dbrootNum) != 0)
|
|
// {
|
|
// fErrorMsg = "ExtentMap::FilesPerColumnPartition is not a multiple of db root number.";
|
|
// logMessage(fErrorMsg, __LINE__);
|
|
// return 1;
|
|
// }
|
|
// fSegPerRoot = filesPerPartition / dbrootNum;
|
|
}
|
|
catch (const std::exception&)
|
|
{
|
|
ret = 2;
|
|
}
|
|
catch (...)
|
|
{
|
|
ret = 2;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int RedistributeWorkerThread::grabTableLock()
|
|
{
|
|
fTableLockId = 0;
|
|
|
|
try
|
|
{
|
|
vector<uint32_t> pms;
|
|
pms.push_back(fMyId.second);
|
|
|
|
if (fMyId.second != fPeerId.second)
|
|
pms.push_back(fPeerId.second);
|
|
|
|
struct timespec ts;
|
|
ts.tv_sec = 0;
|
|
ts.tv_nsec = 100 * 1000000;
|
|
|
|
while (fTableLockId == 0 && !fStopAction)
|
|
{
|
|
// make sure it's not stopped.
|
|
if (fStopAction)
|
|
return RED_EC_USER_STOP;
|
|
|
|
// always wait long enough for ddl/dml/cpimport to get table lock
|
|
// for now, triple the ddl/dml/cpimport retry interval: 3 * 100ms
|
|
struct timespec tmp = ts;
|
|
|
|
while (nanosleep(&tmp, &ts) < 0)
|
|
;
|
|
|
|
tmp = ts;
|
|
|
|
try
|
|
{
|
|
uint32_t processID = ::getpid();
|
|
int32_t txnId = 0;
|
|
int32_t sessionId = 0;
|
|
string processName = "WriteEngineServer";
|
|
fTableLockId = fDbrm->getTableLock(pms, fPlanEntry.table, &processName, &processID, &sessionId,
|
|
&txnId, BRM::LOADING);
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
fErrorCode = RED_EC_IDB_HARD_FAIL;
|
|
logMessage(string("getTableLock exception") + ex.what(), __LINE__);
|
|
}
|
|
catch (...)
|
|
{
|
|
fErrorCode = RED_EC_IDB_HARD_FAIL;
|
|
logMessage("getTableLock exception", __LINE__);
|
|
|
|
// no need to throw
|
|
// throw IDBExcept(ERR_HARD_FAILURE);
|
|
}
|
|
}
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage(string(ex.what()) + " when try to get table lock: ", fTableLockId);
|
|
}
|
|
catch (...)
|
|
{
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Unknown exception when try to get table lock: ", fTableLockId);
|
|
}
|
|
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Got table lock: ", fTableLockId);
|
|
|
|
return ((fTableLockId > 0) ? 0 : -1);
|
|
}
|
|
|
|
int RedistributeWorkerThread::buildEntryList()
|
|
{
|
|
int ret = 0;
|
|
|
|
try
|
|
{
|
|
// get all column oids
|
|
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
|
|
const CalpontSystemCatalog::TableName table = csc->tableName(fPlanEntry.table);
|
|
CalpontSystemCatalog::RIDList cols = csc->columnRIDs(table, true);
|
|
CalpontSystemCatalog::OID tableAuxColOid = csc->tableAUXColumnOID(table);
|
|
|
|
for (CalpontSystemCatalog::RIDList::iterator i = cols.begin(); i != cols.end(); i++)
|
|
fOids.push_back(i->objnum);
|
|
|
|
CalpontSystemCatalog::DictOIDList dicts = csc->dictOIDs(table);
|
|
|
|
for (CalpontSystemCatalog::DictOIDList::iterator i = dicts.begin(); i != dicts.end(); i++)
|
|
fOids.push_back(i->dictOID);
|
|
|
|
if (tableAuxColOid > 3000)
|
|
{
|
|
fOids.push_back(tableAuxColOid);
|
|
}
|
|
|
|
bool firstOid = true; // for adding segments, all columns have the same lay out.
|
|
uint16_t source = fPlanEntry.source;
|
|
uint16_t target = fPlanEntry.destination;
|
|
uint16_t partition = fPlanEntry.partition;
|
|
uint32_t minWidth = 8; // column width greater than 8 will be dictionary.
|
|
|
|
for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
|
|
{
|
|
vector<EMEntry> entries;
|
|
int rc = fDbrm->getExtents(*i, entries, false, false, true);
|
|
|
|
if (rc != 0 || entries.size() == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error in DBRM getExtents; oid:" << *i << "; returnCode: " << rc;
|
|
throw runtime_error(oss.str());
|
|
}
|
|
|
|
// same oid has the same column width
|
|
uint32_t colWid = entries.front().colWid;
|
|
vector<EMEntry>::iterator targetHwmEntry = entries.end(); // for HWM_0 workaround
|
|
|
|
if (colWid > 0 && colWid < minWidth)
|
|
minWidth = colWid;
|
|
|
|
for (vector<EMEntry>::iterator j = entries.begin(); j != entries.end(); j++)
|
|
{
|
|
if (j->dbRoot == source && j->partitionNum == partition)
|
|
{
|
|
fUpdateRtEntries.push_back(BulkUpdateDBRootArg(j->range.start, target));
|
|
|
|
if (firstOid)
|
|
fSegments.insert(j->segmentNum);
|
|
}
|
|
|
|
#if 0
|
|
else if (j->dbRoot == target && j->partitionNum == partition)
|
|
{
|
|
// the partition already exists on the target dbroot
|
|
fErrorCode = RED_EC_PART_EXIST_ON_TARGET;
|
|
ostringstream oss;
|
|
oss << "oid:" << *i << ", partition:" << partition << " exists, source:"
|
|
<< source << ", destination:" << target;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
#endif
|
|
|
|
// workaround for HWM_0 of highest extents of the oid on target dbroot.
|
|
if (j->dbRoot == target)
|
|
{
|
|
if (targetHwmEntry == entries.end())
|
|
{
|
|
targetHwmEntry = j;
|
|
}
|
|
else
|
|
{
|
|
if (j->partitionNum > targetHwmEntry->partitionNum)
|
|
{
|
|
targetHwmEntry = j;
|
|
}
|
|
else if (j->partitionNum == targetHwmEntry->partitionNum &&
|
|
j->blockOffset > targetHwmEntry->blockOffset)
|
|
{
|
|
targetHwmEntry = j;
|
|
}
|
|
else if (j->partitionNum == targetHwmEntry->partitionNum &&
|
|
j->blockOffset == targetHwmEntry->blockOffset &&
|
|
j->segmentNum > targetHwmEntry->segmentNum)
|
|
{
|
|
targetHwmEntry = j;
|
|
}
|
|
}
|
|
}
|
|
} // for em entries
|
|
|
|
// HWM_0 workaround
|
|
// HWM 0 has two possibilities:
|
|
// 1. segment file has one extent, and the first block is not full yet.
|
|
// 2. segment file has more than one extents, the HWM of the extents other than
|
|
// the last extent is set to 0, that is only last extent has none-zero HWM.
|
|
// In tuple-bps::makeJob, there is a check to handle last extent has 0 hwm:
|
|
// (scannedExtents[i].HWM == 0 && (int) i < lastExtent[scannedExtents[i].dbRoot-1])
|
|
// lbidsToScan = scannedExtents[i].range.size * 1024;
|
|
// Based on this check, the number of block to scan is caculated.
|
|
// After redistributing the partitions, the original case 1 extent on destination
|
|
// may not be the highest extent in the dbroot, and result in a full extent scan.
|
|
// This scan will fail because there is no enough blocks if this is an abbreviated
|
|
// extent or not enough chunks if the column is compressed.
|
|
// The workaround is to bump up the HWM to 1 if moved in partitions are greater.
|
|
if (targetHwmEntry != entries.end() && // exclude no extent case
|
|
targetHwmEntry->colWid > 0 && // exclude dictionary
|
|
targetHwmEntry->HWM == 0 && targetHwmEntry->partitionNum < partition)
|
|
{
|
|
BulkSetHWMArg arg;
|
|
arg.oid = *i;
|
|
arg.partNum = targetHwmEntry->partitionNum;
|
|
arg.segNum = targetHwmEntry->segmentNum;
|
|
arg.hwm = targetHwmEntry->colWid; // will correct later based on minWidth
|
|
|
|
fUpdateHwmEntries.push_back(arg);
|
|
}
|
|
} // for oids
|
|
|
|
// HWM_0 workaround
|
|
// Caculate the min(column width), the HWM(bump up to) for each column.
|
|
if (fUpdateHwmEntries.size() > 0)
|
|
{
|
|
// update the HWM based in column width, not include dictionary extents
|
|
for (vector<BRM::BulkSetHWMArg>::iterator j = fUpdateHwmEntries.begin(); j != fUpdateHwmEntries.end();
|
|
j++)
|
|
{
|
|
if (j->hwm <= 8)
|
|
j->hwm /= minWidth;
|
|
else
|
|
j->hwm = 1; // not needed, but in case
|
|
}
|
|
}
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
fErrorCode = RED_EC_EXTENT_ERROR;
|
|
fErrorMsg = ex.what();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
ret = fErrorCode;
|
|
}
|
|
catch (...)
|
|
{
|
|
fErrorCode = RED_EC_EXTENT_ERROR;
|
|
fErrorMsg = "get extent error.";
|
|
logMessage(fErrorMsg, __LINE__);
|
|
ret = fErrorCode;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int RedistributeWorkerThread::sendData()
|
|
{
|
|
WriteEngine::FileOp fileOp; // just to get filename, not for file operations
|
|
bool remotePM = (fMyId.second != fPeerId.second);
|
|
uint32_t dbroot = fPlanEntry.source;
|
|
uint32_t partition = fPlanEntry.partition;
|
|
int16_t source = fPlanEntry.source;
|
|
int16_t dest = fPlanEntry.destination;
|
|
|
|
IDBDataFile::Types fileType = (IDBPolicy::useHdfs() ? IDBDataFile::HDFS
|
|
: IDBPolicy::useCloud() ? IDBDataFile::CLOUD
|
|
: IDBDataFile::UNBUFFERED);
|
|
|
|
IDBFileSystem& fs = IDBFileSystem::getFs(fileType);
|
|
|
|
if ((remotePM) && (fileType != IDBDataFile::HDFS))
|
|
{
|
|
if (connectToWes(fPeerId.second) != 0)
|
|
{
|
|
fErrorCode = RED_EC_CONNECT_FAIL;
|
|
ostringstream oss;
|
|
oss << "Failed to connect to PM" << fPeerId.second << " from PM" << fMyId.second;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
// start to send each segment file
|
|
uint32_t seq = 0;
|
|
ByteStream bs;
|
|
|
|
// start conversion with peer, hand shaking.
|
|
RedistributeMsgHeader header(dest, source, seq++, RED_DATA_INIT);
|
|
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
|
|
bs.append((const ByteStream::byte*)&header, sizeof(header));
|
|
fMsgQueueClient->write(bs);
|
|
|
|
SBS sbs = fMsgQueueClient->read();
|
|
|
|
if (!checkDataTransferAck(sbs, 0))
|
|
return fErrorCode;
|
|
|
|
for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
|
|
{
|
|
for (set<int16_t>::iterator j = fSegments.begin(); j != fSegments.end(); ++j)
|
|
{
|
|
char fileName[WriteEngine::FILE_NAME_SIZE];
|
|
int rc = fileOp.oid2FileName(*i, fileName, false, dbroot, partition, *j);
|
|
|
|
if (rc == WriteEngine::NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "<=redistributing: " << fileName << ", oid=" << *i << ", db=" << source
|
|
<< ", part=" << partition << ", seg=" << *j << " to db=" << dest;
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
else
|
|
{
|
|
fErrorCode = RED_EC_OID_TO_FILENAME;
|
|
ostringstream oss;
|
|
oss << "Failed to get file name: oid=" << *i << ", dbroot=" << dbroot << ", partition=" << partition
|
|
<< ", segment=" << *j;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
if (fOldFilePtr != NULL)
|
|
closeFile(fOldFilePtr);
|
|
|
|
errno = 0;
|
|
FILE* fOldFilePtr = fopen(fileName, "rb");
|
|
|
|
if (fOldFilePtr != NULL)
|
|
{
|
|
ostringstream oss;
|
|
oss << "open " << fileName << ", oid=" << *i << ", dbroot=" << dbroot << ", partition=" << partition
|
|
<< ", segment=" << *j << ". " << fOldFilePtr;
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
else
|
|
{
|
|
int e = errno;
|
|
fErrorCode = RED_EC_OPEN_FILE_FAIL;
|
|
ostringstream oss;
|
|
oss << "Failed to open " << fileName << ", oid=" << *i << ", dbroot=" << dbroot
|
|
<< ", partition=" << partition << ", segment=" << *j << ". " << strerror(e) << " (" << e << ")";
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
// add to set for remove after commit
|
|
addToDirSet(fileName, true);
|
|
|
|
char chunk[CHUNK_SIZE];
|
|
errno = 0;
|
|
fseek(fOldFilePtr, 0, SEEK_END); // go to end of file
|
|
long fileSize = ftell(fOldFilePtr); // get current file size
|
|
|
|
if (fileSize < 0)
|
|
{
|
|
int e = errno;
|
|
ostringstream oss;
|
|
oss << "Fail to tell file size: " << strerror(e) << " (" << e << ")";
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_FSEEK_FAIL;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
// send start message to have the file of fileSize created at target dbroot.
|
|
bs.restart();
|
|
RedistributeMsgHeader header(dest, source, seq++, RED_DATA_START);
|
|
RedistributeDataControl dataControl(*i, dest, partition, *j, fileSize);
|
|
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
|
|
bs.append((const ByteStream::byte*)&header, sizeof(header));
|
|
bs.append((const ByteStream::byte*)&dataControl, sizeof(dataControl));
|
|
fMsgQueueClient->write(bs);
|
|
|
|
sbs = fMsgQueueClient->read();
|
|
|
|
if (!checkDataTransferAck(sbs, fileSize))
|
|
return fErrorCode;
|
|
|
|
// now send the file chunk by chunk.
|
|
rewind(fOldFilePtr);
|
|
int64_t bytesLeft = fileSize;
|
|
size_t bytesSend = CHUNK_SIZE;
|
|
header.messageId = RED_DATA_CONT;
|
|
|
|
while (bytesLeft > 0)
|
|
{
|
|
if (fStopAction)
|
|
{
|
|
closeFile(fOldFilePtr);
|
|
fOldFilePtr = NULL;
|
|
return RED_EC_USER_STOP;
|
|
}
|
|
|
|
if (bytesLeft < (long)CHUNK_SIZE)
|
|
bytesSend = bytesLeft;
|
|
|
|
errno = 0;
|
|
size_t n = fread(chunk, 1, bytesSend, fOldFilePtr);
|
|
|
|
if (n != bytesSend)
|
|
{
|
|
int e = errno;
|
|
ostringstream oss;
|
|
oss << "Fail to read: " << strerror(e) << " (" << e << ")";
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_FREAD_FAIL;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
header.sequenceNum = seq++;
|
|
bs.restart();
|
|
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
|
|
bs.append((const ByteStream::byte*)&header, sizeof(header));
|
|
bs << (size_t)bytesSend;
|
|
bs.append((const ByteStream::byte*)chunk, bytesSend);
|
|
fMsgQueueClient->write(bs);
|
|
|
|
sbs = fMsgQueueClient->read();
|
|
|
|
if (!checkDataTransferAck(sbs, bytesSend))
|
|
return fErrorCode;
|
|
|
|
bytesLeft -= bytesSend;
|
|
}
|
|
|
|
closeFile(fOldFilePtr);
|
|
fOldFilePtr = NULL;
|
|
|
|
header.messageId = RED_DATA_FINISH;
|
|
header.sequenceNum = seq++;
|
|
bs.restart();
|
|
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
|
|
bs.append((const ByteStream::byte*)&header, sizeof(header));
|
|
bs << (uint64_t)fileSize;
|
|
fMsgQueueClient->write(bs);
|
|
|
|
sbs = fMsgQueueClient->read();
|
|
|
|
if (!checkDataTransferAck(sbs, fileSize))
|
|
return fErrorCode;
|
|
|
|
} // segments
|
|
} // for oids
|
|
} // remote peer non-hdfs
|
|
else // local or HDFS file copy
|
|
{
|
|
std::map<int, std::string> rootToPathMap;
|
|
|
|
// use cp, in case failed in middle. May consider to use rename if possible.
|
|
for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
|
|
{
|
|
for (set<int16_t>::iterator j = fSegments.begin(); j != fSegments.end(); ++j)
|
|
{
|
|
if (fStopAction)
|
|
return RED_EC_USER_STOP;
|
|
|
|
if (fileType == IDBDataFile::HDFS) // HDFS file copy
|
|
{
|
|
string sourceName;
|
|
int rc = buildFullHdfsPath(rootToPathMap, // map of root to path
|
|
*i, // OID
|
|
source, // dbroot
|
|
partition, // partition
|
|
*j, // segment
|
|
sourceName); // full path name
|
|
|
|
if (rc != 0)
|
|
{
|
|
fErrorCode = RED_EC_OID_TO_FILENAME;
|
|
ostringstream oss;
|
|
oss << "Failed to get src file name: oid=" << *i << ", dbroot=" << source
|
|
<< ", partition=" << partition << ", segment=" << *j;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
string destName;
|
|
rc = buildFullHdfsPath(rootToPathMap, // map of root to path
|
|
*i, // OID
|
|
dest, // dbroot
|
|
partition, // partition
|
|
*j, // segment
|
|
destName); // full path name
|
|
|
|
if (rc != 0)
|
|
{
|
|
fErrorCode = RED_EC_OID_TO_FILENAME;
|
|
ostringstream oss;
|
|
oss << "Failed to get dest file name: oid=" << *i << ", dbroot=" << dest
|
|
<< ", partition=" << partition << ", segment=" << *j;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
ostringstream oss;
|
|
oss << "<=redistributing(hdfs): " << sourceName << ", oid=" << *i << ", db=" << source
|
|
<< ", part=" << partition << ", seg=" << *j << " to db=" << dest;
|
|
logMessage(oss.str(), __LINE__);
|
|
|
|
// add to set for remove after commit/abort
|
|
addToDirSet(sourceName.c_str(), true);
|
|
addToDirSet(destName.c_str(), false);
|
|
|
|
int ret = fs.copyFile(sourceName.c_str(), destName.c_str());
|
|
|
|
if (ret != 0)
|
|
{
|
|
fErrorCode = RED_EC_COPY_FILE_FAIL;
|
|
ostringstream oss;
|
|
oss << "Failed to copy " << sourceName << " to " << destName << "; error is: " << strerror(errno);
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
}
|
|
else // local file copy
|
|
{
|
|
char sourceName[WriteEngine::FILE_NAME_SIZE];
|
|
int rc = fileOp.oid2FileName(*i, sourceName, false, source, partition, *j);
|
|
|
|
if (rc != WriteEngine::NO_ERROR)
|
|
{
|
|
fErrorCode = RED_EC_OID_TO_FILENAME;
|
|
ostringstream oss;
|
|
oss << "Failed to get file name: oid=" << *i << ", dbroot=" << source
|
|
<< ", partition=" << partition << ", segment=" << *j;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
char destName[WriteEngine::FILE_NAME_SIZE];
|
|
rc = fileOp.oid2FileName(*i, destName, true, dest, partition, *j);
|
|
|
|
if (rc != WriteEngine::NO_ERROR)
|
|
{
|
|
fErrorCode = RED_EC_OID_TO_FILENAME;
|
|
ostringstream oss;
|
|
oss << "Failed to get file name: oid=" << *i << ", dbroot=" << dest << ", partition=" << partition
|
|
<< ", segment=" << *j;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
|
|
ostringstream oss;
|
|
oss << "<=redistributing(copy): " << sourceName << ", oid=" << *i << ", db=" << source
|
|
<< ", part=" << partition << ", seg=" << *j << " to db=" << dest;
|
|
logMessage(oss.str(), __LINE__);
|
|
|
|
// add to set for remove after commit/abort
|
|
addToDirSet(sourceName, true);
|
|
addToDirSet(destName, false);
|
|
|
|
// Using boost::copy_file() instead of IDBFileSystem::copy-
|
|
// File() so we can capture/report any boost exception error
|
|
// msg that IDBFileSystem::copyFile() currently swallows.
|
|
try
|
|
{
|
|
boost::filesystem::copy_file(sourceName, destName);
|
|
}
|
|
|
|
#if BOOST_VERSION >= 105200
|
|
catch (boost::filesystem::filesystem_error& e)
|
|
#else
|
|
catch (boost::filesystem::basic_filesystem_error<filesystem::path>& e)
|
|
#endif
|
|
{
|
|
fErrorCode = RED_EC_COPY_FILE_FAIL;
|
|
ostringstream oss;
|
|
oss << "Failed to copy " << sourceName << " to " << destName << "; error is: " << e.what();
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
return fErrorCode;
|
|
}
|
|
}
|
|
} // segment
|
|
} // oid
|
|
} // !remote
|
|
|
|
return 0;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Construct a full path name based on the given oid, root, partition, and seg.
|
|
// The rootToPathMap is the map of dbroot to dbrootPath that we are using. We
|
|
// are using this function instead of the usual FileOp::oid2FileName() function,
|
|
// because that function only works with "local" DBRoots. In the case of
|
|
// an HDFS copy, we will be copying files from/to DBRoots that are not on the
|
|
// local PM.
|
|
//------------------------------------------------------------------------------
|
|
int RedistributeWorkerThread::buildFullHdfsPath(std::map<int, std::string>& rootToPathMap, int64_t colOid,
|
|
int16_t dbRoot, uint32_t partition, int16_t segment,
|
|
std::string& fullFileName)
|
|
{
|
|
std::map<int, std::string>::const_iterator iter = rootToPathMap.find(dbRoot);
|
|
|
|
if (iter == rootToPathMap.end())
|
|
{
|
|
ostringstream oss;
|
|
oss << "DBRoot" << dbRoot;
|
|
std::string dbRootPath = fConfig->getConfig("SystemConfig", oss.str());
|
|
|
|
if (dbRootPath.empty())
|
|
{
|
|
return 1;
|
|
}
|
|
|
|
rootToPathMap[dbRoot] = dbRootPath;
|
|
iter = rootToPathMap.find(dbRoot);
|
|
}
|
|
|
|
char tempFileName[WriteEngine::FILE_NAME_SIZE];
|
|
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
|
|
|
|
int rc = WriteEngine::Convertor::oid2FileName(colOid, tempFileName, dbDir, partition, segment);
|
|
|
|
if (rc != WriteEngine::NO_ERROR)
|
|
{
|
|
return 2;
|
|
}
|
|
|
|
ostringstream fullFileNameOss;
|
|
fullFileNameOss << iter->second << '/' << tempFileName;
|
|
fullFileName = fullFileNameOss.str();
|
|
|
|
return 0;
|
|
}
|
|
|
|
int RedistributeWorkerThread::connectToWes(int pmId)
|
|
{
|
|
int ret = 0;
|
|
ostringstream oss;
|
|
oss << "pm" << pmId << "_WriteEngineServer";
|
|
|
|
try
|
|
{
|
|
fMsgQueueClient.reset(new MessageQueueClient(oss.str(), fConfig));
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- " + ex.what();
|
|
ret = 1;
|
|
}
|
|
catch (...)
|
|
{
|
|
fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- unknown";
|
|
ret = 2;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int RedistributeWorkerThread::updateDbrm()
|
|
{
|
|
int rc1 = BRM::ERR_OK;
|
|
int rc2 = BRM::ERR_OK;
|
|
boost::mutex::scoped_lock lock(fActionMutex);
|
|
|
|
// cannot stop after extent map is updated.
|
|
if (!fStopAction)
|
|
{
|
|
if (fUpdateHwmEntries.size() > 0)
|
|
rc1 = fDbrm->bulkSetHWM(fUpdateHwmEntries, 0);
|
|
|
|
if (rc1 == BRM::ERR_OK)
|
|
{
|
|
int rc2 = fDbrm->bulkUpdateDBRoot(fUpdateRtEntries);
|
|
|
|
if (rc2 == 0)
|
|
fCommitted = true;
|
|
else
|
|
fErrorCode = RED_EC_UPDATE_DBRM_FAIL;
|
|
}
|
|
|
|
// logging for debug
|
|
{
|
|
if (fUpdateHwmEntries.size() > 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "HWM_0 workaround, updateHWM(oid,part,seg,hwm)";
|
|
vector<BRM::BulkSetHWMArg>::iterator i = fUpdateHwmEntries.begin();
|
|
|
|
for (; i != fUpdateHwmEntries.end(); i++)
|
|
{
|
|
oss << ":(" << i->oid << "," << i->partNum << "," << i->segNum << "," << i->hwm << ")";
|
|
}
|
|
|
|
oss << ((rc1 == BRM::ERR_OK) ? " success" : " failed");
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
|
|
if (rc1 == BRM::ERR_OK)
|
|
{
|
|
ostringstream oss;
|
|
oss << "updateDBRoot(startLBID,dbRoot)";
|
|
vector<BRM::BulkUpdateDBRootArg>::iterator i = fUpdateRtEntries.begin();
|
|
|
|
for (; i != fUpdateRtEntries.end(); i++)
|
|
oss << ":(" << i->startLBID << "," << i->dbRoot << ")";
|
|
|
|
oss << ((rc2 == BRM::ERR_OK) ? " success" : " failed");
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
}
|
|
}
|
|
|
|
return ((rc1 == BRM::ERR_OK && rc2 == BRM::ERR_OK) ? 0 : -1);
|
|
}
|
|
|
|
bool RedistributeWorkerThread::checkDataTransferAck(SBS& sbs, size_t size)
|
|
{
|
|
if (sbs->length() == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Zero byte read, Network error.";
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
fErrorCode = RED_EC_NETWORK_FAIL;
|
|
}
|
|
else if (sbs->length() < (sizeof(RedistributeMsgHeader) + 1))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Short message, length=" << sbs->length();
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
fErrorCode = RED_EC_WKR_MSG_SHORT;
|
|
}
|
|
else
|
|
{
|
|
// Need check header info
|
|
ByteStream::byte wesMsgId;
|
|
*sbs >> wesMsgId;
|
|
// const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) sbs->buf();
|
|
sbs->advance(sizeof(RedistributeMsgHeader));
|
|
size_t ack;
|
|
*sbs >> ack;
|
|
|
|
if (ack != size)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Acked size does not match request: " << ack << "/" << size;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
fErrorCode = RED_EC_SIZE_NACK;
|
|
}
|
|
}
|
|
|
|
sbs.reset();
|
|
|
|
return (fErrorCode == RED_EC_OK);
|
|
}
|
|
|
|
void RedistributeWorkerThread::confirmToPeer()
|
|
{
|
|
if (fTableLockId > 0)
|
|
{
|
|
bool rc = false;
|
|
|
|
try
|
|
{
|
|
rc = fDbrm->releaseTableLock(fTableLockId);
|
|
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Releasing table lock... ", fTableLockId);
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
// too bad, the talbe lock is messed up.
|
|
fErrorMsg = ex.what();
|
|
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Release table exception: " + fErrorMsg, fTableLockId);
|
|
}
|
|
catch (...)
|
|
{
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Release table lock unknown exception. ", fTableLockId);
|
|
}
|
|
|
|
if (rc == true)
|
|
{
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Release table lock return true. ", fTableLockId);
|
|
fTableLockId = 0;
|
|
}
|
|
else
|
|
{
|
|
// let destructor try again.
|
|
// use the interface, line# replaced with lock id.
|
|
logMessage("Release table lock return false. ", fTableLockId);
|
|
}
|
|
}
|
|
|
|
IDBFileSystem& fs = (IDBPolicy::useHdfs() ? IDBFileSystem::getFs(IDBDataFile::HDFS)
|
|
: IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
|
|
: IDBFileSystem::getFs(IDBDataFile::BUFFERED));
|
|
|
|
uint32_t confirmCode = RED_DATA_COMMIT;
|
|
|
|
if (fErrorCode != RED_EC_OK || fStopAction == true) // fCommitted must be false
|
|
confirmCode = RED_DATA_ABORT;
|
|
|
|
if (fMyId.second != fPeerId.second)
|
|
{
|
|
if (fMsgQueueClient.get() != NULL)
|
|
{
|
|
ByteStream bs;
|
|
RedistributeMsgHeader header(fPeerId.first, fMyId.first, -1, confirmCode);
|
|
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
|
|
bs.append((const ByteStream::byte*)&header, sizeof(header));
|
|
fMsgQueueClient->write(bs);
|
|
|
|
// not going to retry for now, ignore the ack and close the connection.
|
|
fMsgQueueClient->read();
|
|
fMsgQueueClient.reset();
|
|
}
|
|
}
|
|
else if (confirmCode != RED_DATA_COMMIT)
|
|
{
|
|
for (set<string>::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++)
|
|
{
|
|
fs.remove(i->c_str()); // ignoring return code
|
|
}
|
|
}
|
|
|
|
// new files committed, remove old ones.
|
|
if (confirmCode == RED_DATA_COMMIT)
|
|
{
|
|
for (set<string>::iterator i = fOldDirSet.begin(); i != fOldDirSet.end(); i++)
|
|
{
|
|
fs.remove(i->c_str()); // ignoring return code
|
|
}
|
|
}
|
|
|
|
fNewDirSet.clear();
|
|
fOldDirSet.clear();
|
|
}
|
|
|
|
void RedistributeWorkerThread::addToDirSet(const char* fileName, bool isSource)
|
|
{
|
|
string path(fileName);
|
|
size_t found = path.find_last_of("/\\");
|
|
path = path.substr(0, found);
|
|
|
|
if (isSource)
|
|
fOldDirSet.insert(path);
|
|
else
|
|
fNewDirSet.insert(path);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleStop()
|
|
{
|
|
boost::mutex::scoped_lock lock(fActionMutex);
|
|
|
|
// cannot stop after extent map is updated.
|
|
if (!fCommitted)
|
|
fStopAction = true;
|
|
|
|
lock.unlock();
|
|
|
|
logMessage("User stop", __LINE__);
|
|
sendResponse(RED_ACTN_STOP);
|
|
}
|
|
|
|
void RedistributeWorkerThread::sendResponse(uint32_t type)
|
|
{
|
|
uint32_t tmp = fMsgHeader.destination;
|
|
fMsgHeader.destination = fMsgHeader.source;
|
|
fMsgHeader.source = tmp;
|
|
fMsgHeader.messageId = RED_ACTN_RESP;
|
|
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
|
|
if (type == RED_ACTN_REQUEST)
|
|
{
|
|
if (fErrorCode == RED_EC_OK && fStopAction == false)
|
|
fPlanEntry.status = RED_TRANS_SUCCESS;
|
|
else if (fErrorCode == RED_EC_PART_EXIST_ON_TARGET)
|
|
fPlanEntry.status = RED_TRANS_SKIPPED;
|
|
else if (fErrorCode != RED_EC_OK)
|
|
fPlanEntry.status = RED_TRANS_FAILED;
|
|
|
|
// else -- stopped, may try again if support resume
|
|
|
|
fBs.append((const ByteStream::byte*)&fPlanEntry, sizeof(fPlanEntry));
|
|
}
|
|
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleData()
|
|
{
|
|
bool done = false;
|
|
bool noExcept = true;
|
|
SBS sbs;
|
|
size_t size = 0;
|
|
|
|
try
|
|
{
|
|
do
|
|
{
|
|
switch (fMsgHeader.messageId)
|
|
{
|
|
case RED_DATA_INIT: handleDataInit(); break;
|
|
|
|
case RED_DATA_START: handleDataStart(sbs, size); break;
|
|
|
|
case RED_DATA_CONT: handleDataCont(sbs, size); break;
|
|
|
|
case RED_DATA_FINISH: handleDataFinish(sbs, size); break;
|
|
|
|
case RED_DATA_COMMIT:
|
|
handleDataCommit(sbs, size);
|
|
done = true;
|
|
break;
|
|
|
|
case RED_DATA_ABORT:
|
|
handleDataAbort(sbs, size);
|
|
done = true;
|
|
break;
|
|
|
|
default:
|
|
handleUnknowDataMsg();
|
|
done = true;
|
|
break;
|
|
}
|
|
|
|
if (!done)
|
|
{
|
|
// get next message
|
|
sbs = fIOSocket.read();
|
|
ByteStream::byte wesMsgId;
|
|
*sbs >> wesMsgId;
|
|
memcpy(&fMsgHeader, sbs->buf(), sizeof(RedistributeMsgHeader));
|
|
sbs->advance(sizeof(RedistributeMsgHeader));
|
|
}
|
|
} while (!done); // will break after commit/abort or catch an exception
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
noExcept = false;
|
|
logMessage(ex.what(), __LINE__);
|
|
}
|
|
catch (...)
|
|
{
|
|
noExcept = false;
|
|
}
|
|
|
|
if (noExcept == false)
|
|
{
|
|
// send NACK to peer
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << ((size_t)-1);
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
fBs.reset();
|
|
fIOSocket.close();
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleDataInit()
|
|
{
|
|
uint32_t tmp = fMsgHeader.destination;
|
|
fMsgHeader.destination = fMsgHeader.source;
|
|
fMsgHeader.source = tmp;
|
|
fMsgHeader.messageId = RED_DATA_ACK;
|
|
size_t size = 0;
|
|
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << size;
|
|
|
|
// finish the hand shaking
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleDataStart(SBS& sbs, size_t& size)
|
|
{
|
|
char fileName[WriteEngine::FILE_NAME_SIZE];
|
|
|
|
try
|
|
{
|
|
// extract the control data for the segment file
|
|
RedistributeDataControl dc;
|
|
|
|
if (sbs->length() >= sizeof(RedistributeDataControl))
|
|
{
|
|
memcpy(&dc, sbs->buf(), sizeof(RedistributeDataControl));
|
|
sbs->advance(sizeof(RedistributeDataControl));
|
|
size = dc.size;
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Short message, length=" << sbs->length();
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_WKR_MSG_SHORT;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
// create and open the file for writing.
|
|
WriteEngine::FileOp fileOp; // just to get filename, not for file operations
|
|
int rc = fileOp.oid2FileName(dc.oid, fileName, true, dc.dbroot, dc.partition, dc.segment);
|
|
|
|
if (rc == WriteEngine::NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "=>redistributing: " << fileName << ", oid=" << dc.oid << ", db=" << dc.dbroot
|
|
<< ", part=" << dc.partition << ", seg=" << dc.segment << " from db=" << fMsgHeader.source;
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
else
|
|
{
|
|
fErrorCode = RED_EC_OID_TO_FILENAME;
|
|
ostringstream oss;
|
|
oss << "Failed to get file name: oid=" << dc.oid << ", dbroot=" << dc.dbroot
|
|
<< ", partition=" << dc.partition << ", segment=" << dc.segment;
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
if (fNewFilePtr != NULL)
|
|
closeFile(fNewFilePtr);
|
|
|
|
errno = 0;
|
|
fNewFilePtr = fopen(fileName, "wb");
|
|
|
|
if (fNewFilePtr != NULL)
|
|
{
|
|
ostringstream oss;
|
|
oss << "open " << fileName << ", oid=" << dc.oid << ", dbroot=" << dc.dbroot
|
|
<< ", partition=" << dc.partition << ", segment=" << dc.segment << ". " << fNewFilePtr;
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
else
|
|
{
|
|
int e = errno;
|
|
fErrorCode = RED_EC_OPEN_FILE_FAIL;
|
|
ostringstream oss;
|
|
oss << "Failed to open " << fileName << ", oid=" << dc.oid << ", dbroot=" << dc.dbroot
|
|
<< ", partition=" << dc.partition << ", segment=" << dc.segment << ". " << strerror(e) << " (" << e
|
|
<< ")";
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
// set output buffering
|
|
errno = 0;
|
|
|
|
if (setvbuf(fNewFilePtr, fWriteBuffer.get(), _IOFBF, CHUNK_SIZE))
|
|
{
|
|
int e = errno;
|
|
ostringstream oss;
|
|
oss << "Failed to set i/o buffer: " << strerror(e) << " (" << e << ")";
|
|
fErrorMsg = oss.str();
|
|
logMessage(fErrorMsg, __LINE__);
|
|
|
|
// not throwing an exception now.
|
|
}
|
|
|
|
// add to set for remove after abort
|
|
addToDirSet(fileName, false);
|
|
|
|
// do a fseek will show the right size, but will not actually allocate the continuous block.
|
|
// do write 4k block till file size.
|
|
char buf[PRE_ALLOC_SIZE] = {1};
|
|
size_t nmemb = size / PRE_ALLOC_SIZE;
|
|
|
|
while (nmemb-- > 0)
|
|
{
|
|
errno = 0;
|
|
size_t n = fwrite(buf, PRE_ALLOC_SIZE, 1, fNewFilePtr);
|
|
|
|
if (n != 1)
|
|
{
|
|
int e = errno;
|
|
ostringstream oss;
|
|
oss << "Fail to preallocate file: " << strerror(e) << " (" << e << ")";
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_FWRITE_FAIL;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
}
|
|
|
|
// move back to beging to write real data
|
|
fflush(fNewFilePtr);
|
|
rewind(fNewFilePtr);
|
|
}
|
|
catch (const std::exception& ex)
|
|
{
|
|
// NACK
|
|
size = -1;
|
|
logMessage(ex.what(), __LINE__);
|
|
}
|
|
catch (...)
|
|
{
|
|
// NACK
|
|
size = -1;
|
|
}
|
|
|
|
// ack file size
|
|
fMsgHeader.messageId = RED_DATA_ACK;
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << size;
|
|
fIOSocket.write(fBs);
|
|
|
|
// reset to count the data received
|
|
size = 0;
|
|
sbs.reset();
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleDataCont(SBS& sbs, size_t& size)
|
|
{
|
|
size_t ack = 0;
|
|
|
|
try
|
|
{
|
|
size_t bytesRcvd = 0;
|
|
*sbs >> bytesRcvd;
|
|
|
|
if (bytesRcvd != sbs->length())
|
|
{
|
|
ostringstream oss;
|
|
oss << "Incorrect data length: " << sbs->length() << ", expecting " << bytesRcvd;
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_BS_TOO_SHORT;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
errno = 0;
|
|
size_t n = fwrite(sbs->buf(), 1, bytesRcvd, fNewFilePtr);
|
|
|
|
if (n != bytesRcvd)
|
|
{
|
|
int e = errno;
|
|
ostringstream oss;
|
|
oss << "Fail to write file: " << strerror(e) << " (" << e << ")";
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_FWRITE_FAIL;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
ack = bytesRcvd;
|
|
size += ack;
|
|
}
|
|
catch (const std::exception&)
|
|
{
|
|
// NACK
|
|
size = -1;
|
|
}
|
|
catch (...)
|
|
{
|
|
// NACK
|
|
ack = -1;
|
|
}
|
|
|
|
// ack received data
|
|
sbs.reset();
|
|
fMsgHeader.messageId = RED_DATA_ACK;
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << ack;
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleDataFinish(SBS& sbs, size_t& size)
|
|
{
|
|
size_t ack = 0;
|
|
|
|
// close open file
|
|
closeFile(fNewFilePtr);
|
|
fNewFilePtr = NULL;
|
|
|
|
try
|
|
{
|
|
size_t fileSize = 0;
|
|
*sbs >> fileSize;
|
|
|
|
if (fileSize != size)
|
|
{
|
|
ostringstream oss;
|
|
oss << "File size not match: local=" << size << ", remote=" << fileSize;
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_FILE_SIZE_NOT_MATCH;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
ack = size;
|
|
}
|
|
catch (const std::exception&)
|
|
{
|
|
// NACK
|
|
size = -1;
|
|
}
|
|
catch (...)
|
|
{
|
|
// NACK
|
|
ack = -1;
|
|
}
|
|
|
|
// ack received data
|
|
sbs.reset();
|
|
fMsgHeader.messageId = RED_DATA_ACK;
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << ack;
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleDataCommit(SBS& sbs, size_t& /*size*/)
|
|
{
|
|
size_t ack = 0;
|
|
sbs.reset();
|
|
fMsgHeader.messageId = RED_DATA_ACK;
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << ack;
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleDataAbort(SBS& sbs, size_t& /*size*/)
|
|
{
|
|
// close open file
|
|
if (fNewFilePtr != NULL)
|
|
closeFile(fNewFilePtr);
|
|
|
|
IDBFileSystem& fs = (IDBPolicy::useHdfs() ? IDBFileSystem::getFs(IDBDataFile::HDFS)
|
|
: IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
|
|
: IDBFileSystem::getFs(IDBDataFile::BUFFERED));
|
|
|
|
// remove local files
|
|
for (set<string>::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++)
|
|
{
|
|
fs.remove(i->c_str()); // ignoring return code
|
|
}
|
|
|
|
// send ack
|
|
sbs.reset();
|
|
size_t ack = 0;
|
|
fMsgHeader.messageId = RED_DATA_ACK;
|
|
fBs.restart();
|
|
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
|
|
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
|
|
fBs << ack;
|
|
fIOSocket.write(fBs);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleUnknowDataMsg()
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unknown data message: " << fMsgHeader.messageId;
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_UNKNOWN_DATA_MSG;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
throw runtime_error(fErrorMsg);
|
|
}
|
|
|
|
void RedistributeWorkerThread::handleUnknowJobMsg()
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unknown job message: " << fMsgHeader.messageId;
|
|
fErrorMsg = oss.str();
|
|
fErrorCode = RED_EC_UNKNOWN_JOB_MSG;
|
|
logMessage(fErrorMsg, __LINE__);
|
|
|
|
// protocol error, ignore and close connection.
|
|
}
|
|
|
|
void RedistributeWorkerThread::closeFile(FILE* f)
|
|
{
|
|
if (f == NULL)
|
|
return;
|
|
|
|
ostringstream oss;
|
|
oss << "close file* " << f << " ";
|
|
|
|
errno = 0;
|
|
int rc = fclose(f);
|
|
|
|
if (rc != 0)
|
|
oss << "error: " << strerror(errno) << " (" << errno << ")";
|
|
else
|
|
oss << "OK";
|
|
|
|
logMessage(oss.str(), __LINE__);
|
|
}
|
|
|
|
void RedistributeWorkerThread::logMessage(const string& msg, int line)
|
|
{
|
|
ostringstream oss;
|
|
oss << msg << " @workerThread:" << line;
|
|
RedistributeControl::instance()->logMessage(oss.str());
|
|
}
|
|
|
|
} // namespace redistribute
|
|
|