1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00
Files
mariadb-columnstore-engine/writeengine/redistribute/we_redistributeworkerthread.cpp
Serguey Zefirov 5aa2a824c2 feat(MCOL-6082): Multiple readers of dbroots using OamCache logic
This patch introduces centralized logic of selecting what dbroot is
accessible in PrimProc on what node. The logic is in OamCache for time
being and can be moved later.
2025-07-21 14:32:39 +03:00

1523 lines
44 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*
* $Id: we_redistributeworkerthread.cpp 4646 2013-05-23 20:58:08Z xlou $
*/
#include <iostream>
#include <set>
#include <vector>
#include <cassert>
#include <stdexcept>
#include <sstream>
#include <string>
#include <unistd.h>
using namespace std;
#include "boost/scoped_ptr.hpp"
#include "boost/scoped_array.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/filesystem.hpp"
using namespace boost;
#include "installdir.h"
#include "configcpp.h"
using namespace config;
#include "liboamcpp.h"
#include "oamcache.h"
using namespace oam;
#include "dbrm.h"
using namespace BRM;
#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;
#include "calpontsystemcatalog.h"
using namespace execplan;
#include "exceptclasses.h"
using namespace logging;
#include "IDBFileSystem.h"
#include "IDBPolicy.h"
using namespace idbdatafile;
#include "we_fileop.h"
#include "we_messages.h"
#include "we_convertor.h"
#include "we_redistributedef.h"
#include "we_redistributecontrol.h"
#include "we_redistributeworkerthread.h"
namespace redistribute
{
// static variables
boost::mutex RedistributeWorkerThread::fActionMutex;
volatile bool RedistributeWorkerThread::fStopAction = false;
volatile bool RedistributeWorkerThread::fCommitted = false;
string RedistributeWorkerThread::fWesInUse;
RedistributeWorkerThread::RedistributeWorkerThread(ByteStream& bs, IOSocket& ios)
: fBs(bs), fIOSocket(ios), fTableLockId(0), fErrorCode(RED_EC_OK), fNewFilePtr(NULL), fOldFilePtr(NULL)
{
fWriteBuffer.reset(new char[CHUNK_SIZE]);
}
RedistributeWorkerThread::~RedistributeWorkerThread()
{
boost::mutex::scoped_lock lock(fActionMutex);
if (fNewFilePtr)
closeFile(fNewFilePtr);
if (fOldFilePtr)
closeFile(fOldFilePtr);
// make sure releasing the table lock.
if (fTableLockId > 0)
{
fDbrm->releaseTableLock(fTableLockId);
// use the interface, line# replaced with lock id.
logMessage("Releasing table lock in destructor. ", fTableLockId);
}
}
void RedistributeWorkerThread::operator()()
{
memcpy(&fMsgHeader, fBs.buf(), sizeof(RedistributeMsgHeader));
fBs.advance(sizeof(RedistributeMsgHeader));
if (fMsgHeader.messageId == RED_ACTN_REQUEST)
handleRequest();
else if (fMsgHeader.messageId == RED_ACTN_STOP)
handleStop();
else if (fMsgHeader.messageId == RED_DATA_INIT)
handleData();
else
handleUnknowJobMsg();
}
void RedistributeWorkerThread::handleRequest()
{
try
{
// clear stop flag if ever set.
{
boost::mutex::scoped_lock lock(fActionMutex);
fStopAction = false;
fCommitted = false;
}
if (setup() == 0)
{
if (fBs.length() >= sizeof(RedistributePlanEntry))
{
memcpy(&fPlanEntry, fBs.buf(), sizeof(RedistributePlanEntry));
fBs.advance(sizeof(RedistributePlanEntry));
fMyId.first = fPlanEntry.source;
fMyId.second = fOamCache->getOwnerPM(fMyId.first);
fPeerId.first = fPlanEntry.destination;
fPeerId.second = fOamCache->getOwnerPM(fPeerId.first);
if (grabTableLock() == 0)
{
// workaround extentmap slow update
sleep(1);
// build segment & entry list after grabbing the table lock.
if (buildEntryList() == 0)
{
if (sendData() == 0)
{
// do bulk update
updateDbrm();
}
}
// conversation to peer after got table lock
// confirm commit or abort
confirmToPeer();
}
}
}
}
catch (const std::exception&)
{
}
catch (...)
{
}
sendResponse(RED_ACTN_REQUEST);
boost::mutex::scoped_lock lock(fActionMutex);
fWesInUse.clear();
fMsgQueueClient.reset();
fStopAction = false;
fCommitted = false;
}
int RedistributeWorkerThread::setup()
{
int ret = 0;
try
{
fConfig = Config::makeConfig();
fOamCache = oam::OamCache::makeOamCache();
fDbrm = RedistributeControl::instance()->fDbrm;
// for segment file # workaround
// string tmp = fConfig->getConfig("ExtentMap", "FilesPerColumnPartition");
// int filesPerPartition = fConfig->fromText(tmp);
// if (filesPerPartition == 0)
// filesPerPartition = DEFAULT_FILES_PER_COLUMN_PARTITION;
// int dbrootNum = fOamCache->getDBRootNums().size();
// if (dbrootNum == 0)
// {
// fErrorMsg = "OamCache->getDBRootNums() failed.";
// logMessage(fErrorMsg, __LINE__);
// return 1;
// }
// if ((filesPerPartition % dbrootNum) != 0)
// {
// fErrorMsg = "ExtentMap::FilesPerColumnPartition is not a multiple of db root number.";
// logMessage(fErrorMsg, __LINE__);
// return 1;
// }
// fSegPerRoot = filesPerPartition / dbrootNum;
}
catch (const std::exception&)
{
ret = 2;
}
catch (...)
{
ret = 2;
}
return ret;
}
int RedistributeWorkerThread::grabTableLock()
{
fTableLockId = 0;
try
{
vector<uint32_t> pms;
pms.push_back(fMyId.second);
if (fMyId.second != fPeerId.second)
pms.push_back(fPeerId.second);
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 100 * 1000000;
while (fTableLockId == 0 && !fStopAction)
{
// make sure it's not stopped.
if (fStopAction)
return RED_EC_USER_STOP;
// always wait long enough for ddl/dml/cpimport to get table lock
// for now, triple the ddl/dml/cpimport retry interval: 3 * 100ms
struct timespec tmp = ts;
while (nanosleep(&tmp, &ts) < 0)
;
tmp = ts;
try
{
uint32_t processID = ::getpid();
int32_t txnId = 0;
int32_t sessionId = 0;
string processName = "WriteEngineServer";
fTableLockId = fDbrm->getTableLock(pms, fPlanEntry.table, &processName, &processID, &sessionId,
&txnId, BRM::LOADING);
}
catch (const std::exception& ex)
{
fErrorCode = RED_EC_IDB_HARD_FAIL;
logMessage(string("getTableLock exception") + ex.what(), __LINE__);
}
catch (...)
{
fErrorCode = RED_EC_IDB_HARD_FAIL;
logMessage("getTableLock exception", __LINE__);
// no need to throw
// throw IDBExcept(ERR_HARD_FAILURE);
}
}
}
catch (const std::exception& ex)
{
// use the interface, line# replaced with lock id.
logMessage(string(ex.what()) + " when try to get table lock: ", fTableLockId);
}
catch (...)
{
// use the interface, line# replaced with lock id.
logMessage("Unknown exception when try to get table lock: ", fTableLockId);
}
// use the interface, line# replaced with lock id.
logMessage("Got table lock: ", fTableLockId);
return ((fTableLockId > 0) ? 0 : -1);
}
int RedistributeWorkerThread::buildEntryList()
{
int ret = 0;
try
{
// get all column oids
boost::shared_ptr<CalpontSystemCatalog> csc = CalpontSystemCatalog::makeCalpontSystemCatalog(0);
const CalpontSystemCatalog::TableName table = csc->tableName(fPlanEntry.table);
CalpontSystemCatalog::RIDList cols = csc->columnRIDs(table, true);
CalpontSystemCatalog::OID tableAuxColOid = csc->tableAUXColumnOID(table);
for (CalpontSystemCatalog::RIDList::iterator i = cols.begin(); i != cols.end(); i++)
fOids.push_back(i->objnum);
CalpontSystemCatalog::DictOIDList dicts = csc->dictOIDs(table);
for (CalpontSystemCatalog::DictOIDList::iterator i = dicts.begin(); i != dicts.end(); i++)
fOids.push_back(i->dictOID);
if (tableAuxColOid > 3000)
{
fOids.push_back(tableAuxColOid);
}
bool firstOid = true; // for adding segments, all columns have the same lay out.
uint16_t source = fPlanEntry.source;
uint16_t target = fPlanEntry.destination;
uint16_t partition = fPlanEntry.partition;
uint32_t minWidth = 8; // column width greater than 8 will be dictionary.
for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
{
vector<EMEntry> entries;
int rc = fDbrm->getExtents(*i, entries, false, false, true);
if (rc != 0 || entries.size() == 0)
{
ostringstream oss;
oss << "Error in DBRM getExtents; oid:" << *i << "; returnCode: " << rc;
throw runtime_error(oss.str());
}
// same oid has the same column width
uint32_t colWid = entries.front().colWid;
vector<EMEntry>::iterator targetHwmEntry = entries.end(); // for HWM_0 workaround
if (colWid > 0 && colWid < minWidth)
minWidth = colWid;
for (vector<EMEntry>::iterator j = entries.begin(); j != entries.end(); j++)
{
if (j->dbRoot == source && j->partitionNum == partition)
{
fUpdateRtEntries.push_back(BulkUpdateDBRootArg(j->range.start, target));
if (firstOid)
fSegments.insert(j->segmentNum);
}
#if 0
else if (j->dbRoot == target && j->partitionNum == partition)
{
// the partition already exists on the target dbroot
fErrorCode = RED_EC_PART_EXIST_ON_TARGET;
ostringstream oss;
oss << "oid:" << *i << ", partition:" << partition << " exists, source:"
<< source << ", destination:" << target;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
#endif
// workaround for HWM_0 of highest extents of the oid on target dbroot.
if (j->dbRoot == target)
{
if (targetHwmEntry == entries.end())
{
targetHwmEntry = j;
}
else
{
if (j->partitionNum > targetHwmEntry->partitionNum)
{
targetHwmEntry = j;
}
else if (j->partitionNum == targetHwmEntry->partitionNum &&
j->blockOffset > targetHwmEntry->blockOffset)
{
targetHwmEntry = j;
}
else if (j->partitionNum == targetHwmEntry->partitionNum &&
j->blockOffset == targetHwmEntry->blockOffset &&
j->segmentNum > targetHwmEntry->segmentNum)
{
targetHwmEntry = j;
}
}
}
} // for em entries
// HWM_0 workaround
// HWM 0 has two possibilities:
// 1. segment file has one extent, and the first block is not full yet.
// 2. segment file has more than one extents, the HWM of the extents other than
// the last extent is set to 0, that is only last extent has none-zero HWM.
// In tuple-bps::makeJob, there is a check to handle last extent has 0 hwm:
// (scannedExtents[i].HWM == 0 && (int) i < lastExtent[scannedExtents[i].dbRoot-1])
// lbidsToScan = scannedExtents[i].range.size * 1024;
// Based on this check, the number of block to scan is caculated.
// After redistributing the partitions, the original case 1 extent on destination
// may not be the highest extent in the dbroot, and result in a full extent scan.
// This scan will fail because there is no enough blocks if this is an abbreviated
// extent or not enough chunks if the column is compressed.
// The workaround is to bump up the HWM to 1 if moved in partitions are greater.
if (targetHwmEntry != entries.end() && // exclude no extent case
targetHwmEntry->colWid > 0 && // exclude dictionary
targetHwmEntry->HWM == 0 && targetHwmEntry->partitionNum < partition)
{
BulkSetHWMArg arg;
arg.oid = *i;
arg.partNum = targetHwmEntry->partitionNum;
arg.segNum = targetHwmEntry->segmentNum;
arg.hwm = targetHwmEntry->colWid; // will correct later based on minWidth
fUpdateHwmEntries.push_back(arg);
}
} // for oids
// HWM_0 workaround
// Caculate the min(column width), the HWM(bump up to) for each column.
if (fUpdateHwmEntries.size() > 0)
{
// update the HWM based in column width, not include dictionary extents
for (vector<BRM::BulkSetHWMArg>::iterator j = fUpdateHwmEntries.begin(); j != fUpdateHwmEntries.end();
j++)
{
if (j->hwm <= 8)
j->hwm /= minWidth;
else
j->hwm = 1; // not needed, but in case
}
}
}
catch (const std::exception& ex)
{
fErrorCode = RED_EC_EXTENT_ERROR;
fErrorMsg = ex.what();
logMessage(fErrorMsg, __LINE__);
ret = fErrorCode;
}
catch (...)
{
fErrorCode = RED_EC_EXTENT_ERROR;
fErrorMsg = "get extent error.";
logMessage(fErrorMsg, __LINE__);
ret = fErrorCode;
}
return ret;
}
int RedistributeWorkerThread::sendData()
{
WriteEngine::FileOp fileOp; // just to get filename, not for file operations
bool remotePM = (fMyId.second != fPeerId.second);
uint32_t dbroot = fPlanEntry.source;
uint32_t partition = fPlanEntry.partition;
int16_t source = fPlanEntry.source;
int16_t dest = fPlanEntry.destination;
IDBDataFile::Types fileType = (IDBPolicy::useHdfs() ? IDBDataFile::HDFS
: IDBPolicy::useCloud() ? IDBDataFile::CLOUD
: IDBDataFile::UNBUFFERED);
IDBFileSystem& fs = IDBFileSystem::getFs(fileType);
if ((remotePM) && (fileType != IDBDataFile::HDFS))
{
if (connectToWes(fPeerId.second) != 0)
{
fErrorCode = RED_EC_CONNECT_FAIL;
ostringstream oss;
oss << "Failed to connect to PM" << fPeerId.second << " from PM" << fMyId.second;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
// start to send each segment file
uint32_t seq = 0;
ByteStream bs;
// start conversion with peer, hand shaking.
RedistributeMsgHeader header(dest, source, seq++, RED_DATA_INIT);
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, sizeof(header));
fMsgQueueClient->write(bs);
SBS sbs = fMsgQueueClient->read();
if (!checkDataTransferAck(sbs, 0))
return fErrorCode;
for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
{
for (set<int16_t>::iterator j = fSegments.begin(); j != fSegments.end(); ++j)
{
char fileName[WriteEngine::FILE_NAME_SIZE];
int rc = fileOp.oid2FileName(*i, fileName, false, dbroot, partition, *j);
if (rc == WriteEngine::NO_ERROR)
{
ostringstream oss;
oss << "<=redistributing: " << fileName << ", oid=" << *i << ", db=" << source
<< ", part=" << partition << ", seg=" << *j << " to db=" << dest;
logMessage(oss.str(), __LINE__);
}
else
{
fErrorCode = RED_EC_OID_TO_FILENAME;
ostringstream oss;
oss << "Failed to get file name: oid=" << *i << ", dbroot=" << dbroot << ", partition=" << partition
<< ", segment=" << *j;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
if (fOldFilePtr != NULL)
closeFile(fOldFilePtr);
errno = 0;
FILE* fOldFilePtr = fopen(fileName, "rb");
if (fOldFilePtr != NULL)
{
ostringstream oss;
oss << "open " << fileName << ", oid=" << *i << ", dbroot=" << dbroot << ", partition=" << partition
<< ", segment=" << *j << ". " << fOldFilePtr;
logMessage(oss.str(), __LINE__);
}
else
{
int e = errno;
fErrorCode = RED_EC_OPEN_FILE_FAIL;
ostringstream oss;
oss << "Failed to open " << fileName << ", oid=" << *i << ", dbroot=" << dbroot
<< ", partition=" << partition << ", segment=" << *j << ". " << strerror(e) << " (" << e << ")";
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
// add to set for remove after commit
addToDirSet(fileName, true);
char chunk[CHUNK_SIZE];
errno = 0;
fseek(fOldFilePtr, 0, SEEK_END); // go to end of file
long fileSize = ftell(fOldFilePtr); // get current file size
if (fileSize < 0)
{
int e = errno;
ostringstream oss;
oss << "Fail to tell file size: " << strerror(e) << " (" << e << ")";
fErrorMsg = oss.str();
fErrorCode = RED_EC_FSEEK_FAIL;
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
// send start message to have the file of fileSize created at target dbroot.
bs.restart();
RedistributeMsgHeader header(dest, source, seq++, RED_DATA_START);
RedistributeDataControl dataControl(*i, dest, partition, *j, fileSize);
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, sizeof(header));
bs.append((const ByteStream::byte*)&dataControl, sizeof(dataControl));
fMsgQueueClient->write(bs);
sbs = fMsgQueueClient->read();
if (!checkDataTransferAck(sbs, fileSize))
return fErrorCode;
// now send the file chunk by chunk.
rewind(fOldFilePtr);
int64_t bytesLeft = fileSize;
size_t bytesSend = CHUNK_SIZE;
header.messageId = RED_DATA_CONT;
while (bytesLeft > 0)
{
if (fStopAction)
{
closeFile(fOldFilePtr);
fOldFilePtr = NULL;
return RED_EC_USER_STOP;
}
if (bytesLeft < (long)CHUNK_SIZE)
bytesSend = bytesLeft;
errno = 0;
size_t n = fread(chunk, 1, bytesSend, fOldFilePtr);
if (n != bytesSend)
{
int e = errno;
ostringstream oss;
oss << "Fail to read: " << strerror(e) << " (" << e << ")";
fErrorMsg = oss.str();
fErrorCode = RED_EC_FREAD_FAIL;
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
header.sequenceNum = seq++;
bs.restart();
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, sizeof(header));
bs << (size_t)bytesSend;
bs.append((const ByteStream::byte*)chunk, bytesSend);
fMsgQueueClient->write(bs);
sbs = fMsgQueueClient->read();
if (!checkDataTransferAck(sbs, bytesSend))
return fErrorCode;
bytesLeft -= bytesSend;
}
closeFile(fOldFilePtr);
fOldFilePtr = NULL;
header.messageId = RED_DATA_FINISH;
header.sequenceNum = seq++;
bs.restart();
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, sizeof(header));
bs << (uint64_t)fileSize;
fMsgQueueClient->write(bs);
sbs = fMsgQueueClient->read();
if (!checkDataTransferAck(sbs, fileSize))
return fErrorCode;
} // segments
} // for oids
} // remote peer non-hdfs
else // local or HDFS file copy
{
std::map<int, std::string> rootToPathMap;
// use cp, in case failed in middle. May consider to use rename if possible.
for (vector<int64_t>::iterator i = fOids.begin(); i != fOids.end(); i++)
{
for (set<int16_t>::iterator j = fSegments.begin(); j != fSegments.end(); ++j)
{
if (fStopAction)
return RED_EC_USER_STOP;
if (fileType == IDBDataFile::HDFS) // HDFS file copy
{
string sourceName;
int rc = buildFullHdfsPath(rootToPathMap, // map of root to path
*i, // OID
source, // dbroot
partition, // partition
*j, // segment
sourceName); // full path name
if (rc != 0)
{
fErrorCode = RED_EC_OID_TO_FILENAME;
ostringstream oss;
oss << "Failed to get src file name: oid=" << *i << ", dbroot=" << source
<< ", partition=" << partition << ", segment=" << *j;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
string destName;
rc = buildFullHdfsPath(rootToPathMap, // map of root to path
*i, // OID
dest, // dbroot
partition, // partition
*j, // segment
destName); // full path name
if (rc != 0)
{
fErrorCode = RED_EC_OID_TO_FILENAME;
ostringstream oss;
oss << "Failed to get dest file name: oid=" << *i << ", dbroot=" << dest
<< ", partition=" << partition << ", segment=" << *j;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
ostringstream oss;
oss << "<=redistributing(hdfs): " << sourceName << ", oid=" << *i << ", db=" << source
<< ", part=" << partition << ", seg=" << *j << " to db=" << dest;
logMessage(oss.str(), __LINE__);
// add to set for remove after commit/abort
addToDirSet(sourceName.c_str(), true);
addToDirSet(destName.c_str(), false);
int ret = fs.copyFile(sourceName.c_str(), destName.c_str());
if (ret != 0)
{
fErrorCode = RED_EC_COPY_FILE_FAIL;
ostringstream oss;
oss << "Failed to copy " << sourceName << " to " << destName << "; error is: " << strerror(errno);
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
}
else // local file copy
{
char sourceName[WriteEngine::FILE_NAME_SIZE];
int rc = fileOp.oid2FileName(*i, sourceName, false, source, partition, *j);
if (rc != WriteEngine::NO_ERROR)
{
fErrorCode = RED_EC_OID_TO_FILENAME;
ostringstream oss;
oss << "Failed to get file name: oid=" << *i << ", dbroot=" << source
<< ", partition=" << partition << ", segment=" << *j;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
char destName[WriteEngine::FILE_NAME_SIZE];
rc = fileOp.oid2FileName(*i, destName, true, dest, partition, *j);
if (rc != WriteEngine::NO_ERROR)
{
fErrorCode = RED_EC_OID_TO_FILENAME;
ostringstream oss;
oss << "Failed to get file name: oid=" << *i << ", dbroot=" << dest << ", partition=" << partition
<< ", segment=" << *j;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
ostringstream oss;
oss << "<=redistributing(copy): " << sourceName << ", oid=" << *i << ", db=" << source
<< ", part=" << partition << ", seg=" << *j << " to db=" << dest;
logMessage(oss.str(), __LINE__);
// add to set for remove after commit/abort
addToDirSet(sourceName, true);
addToDirSet(destName, false);
// Using boost::copy_file() instead of IDBFileSystem::copy-
// File() so we can capture/report any boost exception error
// msg that IDBFileSystem::copyFile() currently swallows.
try
{
boost::filesystem::copy_file(sourceName, destName);
}
#if BOOST_VERSION >= 105200
catch (boost::filesystem::filesystem_error& e)
#else
catch (boost::filesystem::basic_filesystem_error<filesystem::path>& e)
#endif
{
fErrorCode = RED_EC_COPY_FILE_FAIL;
ostringstream oss;
oss << "Failed to copy " << sourceName << " to " << destName << "; error is: " << e.what();
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
return fErrorCode;
}
}
} // segment
} // oid
} // !remote
return 0;
}
//------------------------------------------------------------------------------
// Construct a full path name based on the given oid, root, partition, and seg.
// The rootToPathMap is the map of dbroot to dbrootPath that we are using. We
// are using this function instead of the usual FileOp::oid2FileName() function,
// because that function only works with "local" DBRoots. In the case of
// an HDFS copy, we will be copying files from/to DBRoots that are not on the
// local PM.
//------------------------------------------------------------------------------
int RedistributeWorkerThread::buildFullHdfsPath(std::map<int, std::string>& rootToPathMap, int64_t colOid,
int16_t dbRoot, uint32_t partition, int16_t segment,
std::string& fullFileName)
{
std::map<int, std::string>::const_iterator iter = rootToPathMap.find(dbRoot);
if (iter == rootToPathMap.end())
{
ostringstream oss;
oss << "DBRoot" << dbRoot;
std::string dbRootPath = fConfig->getConfig("SystemConfig", oss.str());
if (dbRootPath.empty())
{
return 1;
}
rootToPathMap[dbRoot] = dbRootPath;
iter = rootToPathMap.find(dbRoot);
}
char tempFileName[WriteEngine::FILE_NAME_SIZE];
char dbDir[WriteEngine::MAX_DB_DIR_LEVEL][WriteEngine::MAX_DB_DIR_NAME_SIZE];
int rc = WriteEngine::Convertor::oid2FileName(colOid, tempFileName, dbDir, partition, segment);
if (rc != WriteEngine::NO_ERROR)
{
return 2;
}
ostringstream fullFileNameOss;
fullFileNameOss << iter->second << '/' << tempFileName;
fullFileName = fullFileNameOss.str();
return 0;
}
int RedistributeWorkerThread::connectToWes(int pmId)
{
int ret = 0;
ostringstream oss;
oss << "pm" << pmId << "_WriteEngineServer";
try
{
fMsgQueueClient.reset(new MessageQueueClient(oss.str(), fConfig));
}
catch (const std::exception& ex)
{
fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- " + ex.what();
ret = 1;
}
catch (...)
{
fErrorMsg = "Caught exception when connecting to " + oss.str() + " -- unknown";
ret = 2;
}
return ret;
}
int RedistributeWorkerThread::updateDbrm()
{
int rc1 = BRM::ERR_OK;
int rc2 = BRM::ERR_OK;
boost::mutex::scoped_lock lock(fActionMutex);
// cannot stop after extent map is updated.
if (!fStopAction)
{
if (fUpdateHwmEntries.size() > 0)
rc1 = fDbrm->bulkSetHWM(fUpdateHwmEntries, 0);
if (rc1 == BRM::ERR_OK)
{
int rc2 = fDbrm->bulkUpdateDBRoot(fUpdateRtEntries);
if (rc2 == 0)
fCommitted = true;
else
fErrorCode = RED_EC_UPDATE_DBRM_FAIL;
}
// logging for debug
{
if (fUpdateHwmEntries.size() > 0)
{
ostringstream oss;
oss << "HWM_0 workaround, updateHWM(oid,part,seg,hwm)";
vector<BRM::BulkSetHWMArg>::iterator i = fUpdateHwmEntries.begin();
for (; i != fUpdateHwmEntries.end(); i++)
{
oss << ":(" << i->oid << "," << i->partNum << "," << i->segNum << "," << i->hwm << ")";
}
oss << ((rc1 == BRM::ERR_OK) ? " success" : " failed");
logMessage(oss.str(), __LINE__);
}
if (rc1 == BRM::ERR_OK)
{
ostringstream oss;
oss << "updateDBRoot(startLBID,dbRoot)";
vector<BRM::BulkUpdateDBRootArg>::iterator i = fUpdateRtEntries.begin();
for (; i != fUpdateRtEntries.end(); i++)
oss << ":(" << i->startLBID << "," << i->dbRoot << ")";
oss << ((rc2 == BRM::ERR_OK) ? " success" : " failed");
logMessage(oss.str(), __LINE__);
}
}
}
return ((rc1 == BRM::ERR_OK && rc2 == BRM::ERR_OK) ? 0 : -1);
}
bool RedistributeWorkerThread::checkDataTransferAck(SBS& sbs, size_t size)
{
if (sbs->length() == 0)
{
ostringstream oss;
oss << "Zero byte read, Network error.";
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
fErrorCode = RED_EC_NETWORK_FAIL;
}
else if (sbs->length() < (sizeof(RedistributeMsgHeader) + 1))
{
ostringstream oss;
oss << "Short message, length=" << sbs->length();
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
fErrorCode = RED_EC_WKR_MSG_SHORT;
}
else
{
// Need check header info
ByteStream::byte wesMsgId;
*sbs >> wesMsgId;
// const RedistributeMsgHeader* h = (const RedistributeMsgHeader*) sbs->buf();
sbs->advance(sizeof(RedistributeMsgHeader));
size_t ack;
*sbs >> ack;
if (ack != size)
{
ostringstream oss;
oss << "Acked size does not match request: " << ack << "/" << size;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
fErrorCode = RED_EC_SIZE_NACK;
}
}
sbs.reset();
return (fErrorCode == RED_EC_OK);
}
void RedistributeWorkerThread::confirmToPeer()
{
if (fTableLockId > 0)
{
bool rc = false;
try
{
rc = fDbrm->releaseTableLock(fTableLockId);
// use the interface, line# replaced with lock id.
logMessage("Releasing table lock... ", fTableLockId);
}
catch (const std::exception& ex)
{
// too bad, the talbe lock is messed up.
fErrorMsg = ex.what();
// use the interface, line# replaced with lock id.
logMessage("Release table exception: " + fErrorMsg, fTableLockId);
}
catch (...)
{
// use the interface, line# replaced with lock id.
logMessage("Release table lock unknown exception. ", fTableLockId);
}
if (rc == true)
{
// use the interface, line# replaced with lock id.
logMessage("Release table lock return true. ", fTableLockId);
fTableLockId = 0;
}
else
{
// let destructor try again.
// use the interface, line# replaced with lock id.
logMessage("Release table lock return false. ", fTableLockId);
}
}
IDBFileSystem& fs = (IDBPolicy::useHdfs() ? IDBFileSystem::getFs(IDBDataFile::HDFS)
: IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
: IDBFileSystem::getFs(IDBDataFile::BUFFERED));
uint32_t confirmCode = RED_DATA_COMMIT;
if (fErrorCode != RED_EC_OK || fStopAction == true) // fCommitted must be false
confirmCode = RED_DATA_ABORT;
if (fMyId.second != fPeerId.second)
{
if (fMsgQueueClient.get() != NULL)
{
ByteStream bs;
RedistributeMsgHeader header(fPeerId.first, fMyId.first, -1, confirmCode);
bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;
bs.append((const ByteStream::byte*)&header, sizeof(header));
fMsgQueueClient->write(bs);
// not going to retry for now, ignore the ack and close the connection.
fMsgQueueClient->read();
fMsgQueueClient.reset();
}
}
else if (confirmCode != RED_DATA_COMMIT)
{
for (set<string>::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++)
{
fs.remove(i->c_str()); // ignoring return code
}
}
// new files committed, remove old ones.
if (confirmCode == RED_DATA_COMMIT)
{
for (set<string>::iterator i = fOldDirSet.begin(); i != fOldDirSet.end(); i++)
{
fs.remove(i->c_str()); // ignoring return code
}
}
fNewDirSet.clear();
fOldDirSet.clear();
}
void RedistributeWorkerThread::addToDirSet(const char* fileName, bool isSource)
{
string path(fileName);
size_t found = path.find_last_of("/\\");
path = path.substr(0, found);
if (isSource)
fOldDirSet.insert(path);
else
fNewDirSet.insert(path);
}
void RedistributeWorkerThread::handleStop()
{
boost::mutex::scoped_lock lock(fActionMutex);
// cannot stop after extent map is updated.
if (!fCommitted)
fStopAction = true;
lock.unlock();
logMessage("User stop", __LINE__);
sendResponse(RED_ACTN_STOP);
}
void RedistributeWorkerThread::sendResponse(uint32_t type)
{
uint32_t tmp = fMsgHeader.destination;
fMsgHeader.destination = fMsgHeader.source;
fMsgHeader.source = tmp;
fMsgHeader.messageId = RED_ACTN_RESP;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
if (type == RED_ACTN_REQUEST)
{
if (fErrorCode == RED_EC_OK && fStopAction == false)
fPlanEntry.status = RED_TRANS_SUCCESS;
else if (fErrorCode == RED_EC_PART_EXIST_ON_TARGET)
fPlanEntry.status = RED_TRANS_SKIPPED;
else if (fErrorCode != RED_EC_OK)
fPlanEntry.status = RED_TRANS_FAILED;
// else -- stopped, may try again if support resume
fBs.append((const ByteStream::byte*)&fPlanEntry, sizeof(fPlanEntry));
}
fIOSocket.write(fBs);
}
void RedistributeWorkerThread::handleData()
{
bool done = false;
bool noExcept = true;
SBS sbs;
size_t size = 0;
try
{
do
{
switch (fMsgHeader.messageId)
{
case RED_DATA_INIT: handleDataInit(); break;
case RED_DATA_START: handleDataStart(sbs, size); break;
case RED_DATA_CONT: handleDataCont(sbs, size); break;
case RED_DATA_FINISH: handleDataFinish(sbs, size); break;
case RED_DATA_COMMIT:
handleDataCommit(sbs, size);
done = true;
break;
case RED_DATA_ABORT:
handleDataAbort(sbs, size);
done = true;
break;
default:
handleUnknowDataMsg();
done = true;
break;
}
if (!done)
{
// get next message
sbs = fIOSocket.read();
ByteStream::byte wesMsgId;
*sbs >> wesMsgId;
memcpy(&fMsgHeader, sbs->buf(), sizeof(RedistributeMsgHeader));
sbs->advance(sizeof(RedistributeMsgHeader));
}
} while (!done); // will break after commit/abort or catch an exception
}
catch (const std::exception& ex)
{
noExcept = false;
logMessage(ex.what(), __LINE__);
}
catch (...)
{
noExcept = false;
}
if (noExcept == false)
{
// send NACK to peer
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << ((size_t)-1);
fIOSocket.write(fBs);
}
fBs.reset();
fIOSocket.close();
}
void RedistributeWorkerThread::handleDataInit()
{
uint32_t tmp = fMsgHeader.destination;
fMsgHeader.destination = fMsgHeader.source;
fMsgHeader.source = tmp;
fMsgHeader.messageId = RED_DATA_ACK;
size_t size = 0;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << size;
// finish the hand shaking
fIOSocket.write(fBs);
}
void RedistributeWorkerThread::handleDataStart(SBS& sbs, size_t& size)
{
char fileName[WriteEngine::FILE_NAME_SIZE];
try
{
// extract the control data for the segment file
RedistributeDataControl dc;
if (sbs->length() >= sizeof(RedistributeDataControl))
{
memcpy(&dc, sbs->buf(), sizeof(RedistributeDataControl));
sbs->advance(sizeof(RedistributeDataControl));
size = dc.size;
}
else
{
ostringstream oss;
oss << "Short message, length=" << sbs->length();
fErrorMsg = oss.str();
fErrorCode = RED_EC_WKR_MSG_SHORT;
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
// create and open the file for writing.
WriteEngine::FileOp fileOp; // just to get filename, not for file operations
int rc = fileOp.oid2FileName(dc.oid, fileName, true, dc.dbroot, dc.partition, dc.segment);
if (rc == WriteEngine::NO_ERROR)
{
ostringstream oss;
oss << "=>redistributing: " << fileName << ", oid=" << dc.oid << ", db=" << dc.dbroot
<< ", part=" << dc.partition << ", seg=" << dc.segment << " from db=" << fMsgHeader.source;
logMessage(oss.str(), __LINE__);
}
else
{
fErrorCode = RED_EC_OID_TO_FILENAME;
ostringstream oss;
oss << "Failed to get file name: oid=" << dc.oid << ", dbroot=" << dc.dbroot
<< ", partition=" << dc.partition << ", segment=" << dc.segment;
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
if (fNewFilePtr != NULL)
closeFile(fNewFilePtr);
errno = 0;
fNewFilePtr = fopen(fileName, "wb");
if (fNewFilePtr != NULL)
{
ostringstream oss;
oss << "open " << fileName << ", oid=" << dc.oid << ", dbroot=" << dc.dbroot
<< ", partition=" << dc.partition << ", segment=" << dc.segment << ". " << fNewFilePtr;
logMessage(oss.str(), __LINE__);
}
else
{
int e = errno;
fErrorCode = RED_EC_OPEN_FILE_FAIL;
ostringstream oss;
oss << "Failed to open " << fileName << ", oid=" << dc.oid << ", dbroot=" << dc.dbroot
<< ", partition=" << dc.partition << ", segment=" << dc.segment << ". " << strerror(e) << " (" << e
<< ")";
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
// set output buffering
errno = 0;
if (setvbuf(fNewFilePtr, fWriteBuffer.get(), _IOFBF, CHUNK_SIZE))
{
int e = errno;
ostringstream oss;
oss << "Failed to set i/o buffer: " << strerror(e) << " (" << e << ")";
fErrorMsg = oss.str();
logMessage(fErrorMsg, __LINE__);
// not throwing an exception now.
}
// add to set for remove after abort
addToDirSet(fileName, false);
// do a fseek will show the right size, but will not actually allocate the continuous block.
// do write 4k block till file size.
char buf[PRE_ALLOC_SIZE] = {1};
size_t nmemb = size / PRE_ALLOC_SIZE;
while (nmemb-- > 0)
{
errno = 0;
size_t n = fwrite(buf, PRE_ALLOC_SIZE, 1, fNewFilePtr);
if (n != 1)
{
int e = errno;
ostringstream oss;
oss << "Fail to preallocate file: " << strerror(e) << " (" << e << ")";
fErrorMsg = oss.str();
fErrorCode = RED_EC_FWRITE_FAIL;
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
}
// move back to beging to write real data
fflush(fNewFilePtr);
rewind(fNewFilePtr);
}
catch (const std::exception& ex)
{
// NACK
size = -1;
logMessage(ex.what(), __LINE__);
}
catch (...)
{
// NACK
size = -1;
}
// ack file size
fMsgHeader.messageId = RED_DATA_ACK;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << size;
fIOSocket.write(fBs);
// reset to count the data received
size = 0;
sbs.reset();
}
void RedistributeWorkerThread::handleDataCont(SBS& sbs, size_t& size)
{
size_t ack = 0;
try
{
size_t bytesRcvd = 0;
*sbs >> bytesRcvd;
if (bytesRcvd != sbs->length())
{
ostringstream oss;
oss << "Incorrect data length: " << sbs->length() << ", expecting " << bytesRcvd;
fErrorMsg = oss.str();
fErrorCode = RED_EC_BS_TOO_SHORT;
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
errno = 0;
size_t n = fwrite(sbs->buf(), 1, bytesRcvd, fNewFilePtr);
if (n != bytesRcvd)
{
int e = errno;
ostringstream oss;
oss << "Fail to write file: " << strerror(e) << " (" << e << ")";
fErrorMsg = oss.str();
fErrorCode = RED_EC_FWRITE_FAIL;
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
ack = bytesRcvd;
size += ack;
}
catch (const std::exception&)
{
// NACK
size = -1;
}
catch (...)
{
// NACK
ack = -1;
}
// ack received data
sbs.reset();
fMsgHeader.messageId = RED_DATA_ACK;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << ack;
fIOSocket.write(fBs);
}
void RedistributeWorkerThread::handleDataFinish(SBS& sbs, size_t& size)
{
size_t ack = 0;
// close open file
closeFile(fNewFilePtr);
fNewFilePtr = NULL;
try
{
size_t fileSize = 0;
*sbs >> fileSize;
if (fileSize != size)
{
ostringstream oss;
oss << "File size not match: local=" << size << ", remote=" << fileSize;
fErrorMsg = oss.str();
fErrorCode = RED_EC_FILE_SIZE_NOT_MATCH;
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
ack = size;
}
catch (const std::exception&)
{
// NACK
size = -1;
}
catch (...)
{
// NACK
ack = -1;
}
// ack received data
sbs.reset();
fMsgHeader.messageId = RED_DATA_ACK;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << ack;
fIOSocket.write(fBs);
}
void RedistributeWorkerThread::handleDataCommit(SBS& sbs, size_t& /*size*/)
{
size_t ack = 0;
sbs.reset();
fMsgHeader.messageId = RED_DATA_ACK;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << ack;
fIOSocket.write(fBs);
}
void RedistributeWorkerThread::handleDataAbort(SBS& sbs, size_t& /*size*/)
{
// close open file
if (fNewFilePtr != NULL)
closeFile(fNewFilePtr);
IDBFileSystem& fs = (IDBPolicy::useHdfs() ? IDBFileSystem::getFs(IDBDataFile::HDFS)
: IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
: IDBFileSystem::getFs(IDBDataFile::BUFFERED));
// remove local files
for (set<string>::iterator i = fNewDirSet.begin(); i != fNewDirSet.end(); i++)
{
fs.remove(i->c_str()); // ignoring return code
}
// send ack
sbs.reset();
size_t ack = 0;
fMsgHeader.messageId = RED_DATA_ACK;
fBs.restart();
fBs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE; // dummy, keep for now.
fBs.append((const ByteStream::byte*)&fMsgHeader, sizeof(fMsgHeader));
fBs << ack;
fIOSocket.write(fBs);
}
void RedistributeWorkerThread::handleUnknowDataMsg()
{
ostringstream oss;
oss << "Unknown data message: " << fMsgHeader.messageId;
fErrorMsg = oss.str();
fErrorCode = RED_EC_UNKNOWN_DATA_MSG;
logMessage(fErrorMsg, __LINE__);
throw runtime_error(fErrorMsg);
}
void RedistributeWorkerThread::handleUnknowJobMsg()
{
ostringstream oss;
oss << "Unknown job message: " << fMsgHeader.messageId;
fErrorMsg = oss.str();
fErrorCode = RED_EC_UNKNOWN_JOB_MSG;
logMessage(fErrorMsg, __LINE__);
// protocol error, ignore and close connection.
}
void RedistributeWorkerThread::closeFile(FILE* f)
{
if (f == NULL)
return;
ostringstream oss;
oss << "close file* " << f << " ";
errno = 0;
int rc = fclose(f);
if (rc != 0)
oss << "error: " << strerror(errno) << " (" << errno << ")";
else
oss << "OK";
logMessage(oss.str(), __LINE__);
}
void RedistributeWorkerThread::logMessage(const string& msg, int line)
{
ostringstream oss;
oss << msg << " @workerThread:" << line;
RedistributeControl::instance()->logMessage(oss.str());
}
} // namespace redistribute