/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /* * $Id: we_redistributeworkerthread.h 4450 2013-01-21 14:13:24Z rdempsey $ */ #pragma once #include #include #include #include #include "boost/shared_ptr.hpp" #include "boost/thread/mutex.hpp" #include "brmtypes.h" #include "we_redistributedef.h" // forward reference namespace config { class Config; } namespace oam { class OamCache; } namespace BRM { class DBRM; } namespace messageqcpp { class ByteStream; class IOSocket; } // namespace messageqcpp namespace messagequeue { class MessageQueueClient; } namespace redistribute { class RedistributeWorkerThread { public: RedistributeWorkerThread(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& ios); ~RedistributeWorkerThread(); void operator()(); private: void handleRequest(); void handleStop(); void handleData(); void handleUnknowJobMsg(); int setup(); int grabTableLock(); int buildEntryList(); int sendData(); int connectToWes(int); int updateDbrm(); void confirmToPeer(); bool checkDataTransferAck(messageqcpp::SBS&, size_t); void sendResponse(uint32_t); void doAbort(); void handleDataInit(); void handleDataStart(messageqcpp::SBS&, size_t&); void handleDataCont(messageqcpp::SBS&, size_t&); void handleDataFinish(messageqcpp::SBS&, size_t&); void handleDataCommit(messageqcpp::SBS&, size_t&); void handleDataAbort(messageqcpp::SBS&, size_t&); void handleUnknowDataMsg(); int buildFullHdfsPath(std::map& rootToPathMap, int64_t colOid, int16_t dbRoot, uint32_t partition, int16_t segment, std::string& fullFileName); void closeFile(FILE*); // for tracing, may remove later. void addToDirSet(const char*, bool); void logMessage(const std::string&, int); oam::OamCache* fOamCache; config::Config* fConfig; boost::shared_ptr fMsgQueueClient; RedistributeMsgHeader fMsgHeader; messageqcpp::ByteStream& fBs; messageqcpp::IOSocket& fIOSocket; RedistributePlanEntry fPlanEntry; uint64_t fTableLockId; int32_t fErrorCode; std::string fErrorMsg; std::pair fMyId; // std::pair fPeerId; // std::set fSegments; std::vector fOids; // column oids std::vector fUpdateRtEntries; // for dbrm update std::vector fUpdateHwmEntries; // for dbrm update FILE* fNewFilePtr; FILE* fOldFilePtr; std::set fNewDirSet; std::set fOldDirSet; std::shared_ptr fWriteBuffer; boost::shared_ptr fDbrm; // for segment file # workaround // uint64_t fSegPerRoot; static boost::mutex fActionMutex; static volatile bool fStopAction; static volatile bool fCommitted; static std::string fWesInUse; }; } // namespace redistribute