/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id: we_bulkload.h 4631 2013-05-02 15:21:09Z dcathey $ * *******************************************************************************/ /** @file */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include "we_tableinfo.h" #include "brmtypes.h" #include "boost/ptr_container/ptr_vector.hpp" #include #include #include #include #include #if 0 // defined(_MSC_VER) && defined(WE_BULKLOAD_DLLEXPORT) #define EXPORT __declspec(dllexport) #else #define EXPORT #endif /** Namespace WriteEngine */ namespace WriteEngine { /** Class BulkLoad */ class BulkLoad : public FileOp { public: /** * @brief BulkLoad constructor */ EXPORT BulkLoad(); /** * @brief BulkLoad destructor */ EXPORT ~BulkLoad() override; /** * @brief Load job information */ EXPORT int loadJobInfo(const std::string& fullFileName, bool bUseTempJobFile, int argc, char** argv, bool bLogInfo2ToConsole, bool bValidateColumnList); /** * @brief Pre process jobs to validate and assign values to the job structure */ int preProcess(Job& job, int tableNo, std::shared_ptr& tableInfo); /** * @brief Print job information */ void printJob(); /** * @brief Process job */ EXPORT int processJob(); /** * @brief Set Debug level for this BulkLoad object and any data members */ void setAllDebug(DebugLevel level); /** * @brief Update next autoincrement value for specified OID. * @param columnOID oid of autoincrement column to be updated * @param nextValue next autoincrement value to assign to tableOID */ static int updateNextValue(OID columnOID, uint64_t nextValue); // Accessors and mutators void addToCmdLineImportFileList(const std::string& importFile); const std::string& getAlternateImportDir() const; const std::string& getErrorDir() const; long getTimeZone() const; const std::string& getJobDir() const; const std::string& getSchema() const; const std::string& getTempJobDir() const; const std::string& getS3Key() const; const std::string& getS3Secret() const; const std::string& getS3Bucket() const; const std::string& getS3Host() const; const std::string& getS3Region() const; bool getTruncationAsError() const; BulkModeType getBulkLoadMode() const; bool getContinue() const; boost::uuids::uuid getJobUUID() const { return fUUID; } EXPORT int setAlternateImportDir(const std::string& loadDir, std::string& errMsg); void setImportDataMode(ImportDataMode importMode); void setColDelimiter(char delim); void setBulkLoadMode(BulkModeType bulkMode, const std::string& rptFileName); void setEnclosedByChar(char enChar); void setEscapeChar(char esChar); void setKeepRbMetaFiles(bool keepMeta); void setMaxErrorCount(unsigned int maxErrors); void setNoOfParseThreads(int parseThreads); void setNoOfReadThreads(int readThreads); void setNullStringMode(bool bMode); void setParseErrorOnTable(int tableId, bool lockParseMutex); void setParserNum(int parser); void setProcessName(const std::string& processName); void setReadBufferCount(int noOfReadBuffers); void setReadBufferSize(int readBufferSize); void setTxnID(BRM::TxnID txnID); void setVbufReadSize(int vbufReadSize); void setTruncationAsError(bool bTruncationAsError); void setJobUUID(const std::string& jobUUID); void setErrorDir(const std::string& errorDir); void setTimeZone(long timeZone); void setS3Key(const std::string& key); void setS3Secret(const std::string& secret); void setS3Bucket(const std::string& bucket); void setS3Host(const std::string& host); void setS3Region(const std::string& region); void setUsername(const std::string& username); // Timer functions void startTimer(); void stopTimer(); double getTotalRunTime() const; void disableTimeOut(const bool disableTimeOut); bool disableTimeOut() const; static void disableConsoleOutput(const bool noConsoleOutput) { fNoConsoleOutput = noConsoleOutput; } static bool disableConsoleOutput() { return fNoConsoleOutput; } // Add error message into appropriate BRM updater static bool addErrorMsg2BrmUpdater(const std::string& tablename, const std::ostringstream& oss); void setDefaultJobUUID(); private: //-------------------------------------------------------------------------- // Private Data Members //-------------------------------------------------------------------------- XMLJob fJobInfo; // current job information boost::scoped_ptr fColOp; // column operation std::string fRootDir; // job process root directory std::string fJobFileName; // job description file name Log fLog; // logger int fNumOfParser; // total number of parser char fColDelim; // delimits col values within a row int fNoOfBuffers; // Number of read buffers int fBufferSize; // Read buffer size int fFileVbufSize; // Internal file system buffer size long long fMaxErrors; // Max allowable errors per job std::string fAlternateImportDir; // Alternate bulk import directory std::string fErrorDir; // Opt. where error records record std::string fProcessName; // Application process name static std::vector> fTableInfo; // Vector of Table information int fNoOfParseThreads; // Number of parse threads int fNoOfReadThreads; // Number of read threads boost::thread_group fReadThreads; // Read thread group boost::thread_group fParseThreads; // Parse thread group boost::mutex fReadMutex; // Manages table selection by each // read thread boost::mutex fParseMutex; // Manages table/buffer/column // selection by each parsing thread BRM::TxnID fTxnID; // TransID acquired from SessionMgr bool fKeepRbMetaFiles; // Keep/delete bulkRB metadata files bool fNullStringMode; // Treat "NULL" as NULL value char fEnclosedByChar; // Char used to enclose column value char fEscapeChar; // Escape char within enclosed value timeval fStartTime; // job start time timeval fEndTime; // job end time double fTotalTime; // elapsed time for current phase std::vector fCmdLineImportFiles; // Import Files from cmd line BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3) std::string fBRMRptFileName; // Name of distributed mode rpt file bool fbTruncationAsError; // Treat string truncation as error ImportDataMode fImportDataMode; // Importing text or binary data bool fbContinue; // true when read and parse r running // static boost::mutex* fDDLMutex; // Insure only 1 DDL op at a time EXPORT static const std::string DIR_BULK_JOB; // Bulk job directory EXPORT static const std::string DIR_BULK_TEMP_JOB; // Dir for tmp job files static const std::string DIR_BULK_IMPORT; // Bulk job import dir static const std::string DIR_BULK_LOG; // Bulk job log directory bool fDisableTimeOut; // disable timeout when waiting for table lock boost::uuids::uuid fUUID; // job UUID static bool fNoConsoleOutput; // disable output to console long fTimeZone; // Timezone offset (in seconds) relative to UTC, // to use for TIMESTAMP data type. For example, // for EST which is UTC-5:00, offset will be -18000s. std::string fS3Key; // S3 Key std::string fS3Secret; // S3 Secret std::string fS3Host; // S3 Host std::string fS3Bucket; // S3 Bucket std::string fS3Region; // S3 Region std::string fUsername; // data files owner name mysql by default //-------------------------------------------------------------------------- // Private Functions //-------------------------------------------------------------------------- // Spawn the worker threads. void spawnWorkers(); // Checks if all tables have the status set bool allTablesDone(Status status); // Lock the table for read. Called by the read thread. int lockTableForRead(int id); // Get column for parsing. Called by the parse thread. // @bug 2099 - Temporary hack to diagnose deadlock. Added report parm below. bool lockColumnForParse(int id, // thread id int& tableId, // selected table id int& columnId, // selected column id int& myParseBuffer, // selected parse buffer bool report); // Map specified DBRoot to it's first segment file number int mapDBRootToFirstSegment(OID columnOid, uint16_t dbRoot, uint16_t& segment); // The thread method for the read thread. void read(int id); // The thread method for the parse thread. void parse(int id); // Sleep method void sleepMS(long int ms); // Initialize auto-increment column for specified schema and table. int preProcessAutoInc(const std::string& fullTableName, // schema.table ColumnInfo* colInfo); // ColumnInfo associated with AI column // Determine starting HWM and LBID after block skipping added to HWM int preProcessHwmLbid(const ColumnInfo* info, int minWidth, uint32_t partition, uint16_t segment, HWM& hwm, BRM::LBID_t& lbid, bool& bSkippedToNewExtent); // Rollback any tables that are left in a locked state at EOJ. int rollbackLockedTables(); // Rollback a table left in a locked state. int rollbackLockedTable(TableInfo& tableInfo); // Save metadata info required for shared-nothing bulk rollback. int saveBulkRollbackMetaData(Job& job, // current job TableInfo* tableInfo, // TableInfo for table of interest const std::vector& segFileInfo, // vector seg file info const std::vector& dbRootHWMInfoPM); // Manage/validate the list of 1 or more import data files int manageImportDataFileList(Job& job, // current job int tableNo, // table number of current job TableInfo* tableInfo); // TableInfo for table of interest // Break up list of file names into a vector of filename strings int buildImportDataFileList(const std::string& location, const std::string& filename, std::vector& importFileNames); }; //------------------------------------------------------------------------------ // Inline functions //------------------------------------------------------------------------------ inline void BulkLoad::addToCmdLineImportFileList(const std::string& importFile) { fCmdLineImportFiles.push_back(importFile); } inline const std::string& BulkLoad::getAlternateImportDir() const { return fAlternateImportDir; } inline const std::string& BulkLoad::getErrorDir() const { return fErrorDir; } inline long BulkLoad::getTimeZone() const { return fTimeZone; } inline const std::string& BulkLoad::getJobDir() const { return DIR_BULK_JOB; } inline const std::string& BulkLoad::getSchema() const { return fJobInfo.getJob().schema; } inline const std::string& BulkLoad::getTempJobDir() const { return DIR_BULK_TEMP_JOB; } inline const std::string& BulkLoad::getS3Key() const { return fS3Key; } inline const std::string& BulkLoad::getS3Secret() const { return fS3Secret; } inline const std::string& BulkLoad::getS3Bucket() const { return fS3Bucket; } inline const std::string& BulkLoad::getS3Host() const { return fS3Host; } inline const std::string& BulkLoad::getS3Region() const { return fS3Region; } inline bool BulkLoad::getTruncationAsError() const { return fbTruncationAsError; } inline BulkModeType BulkLoad::getBulkLoadMode() const { return fBulkMode; } inline bool BulkLoad::getContinue() const { return fbContinue; } inline void BulkLoad::printJob() { if (isDebug(DEBUG_1)) fJobInfo.printJobInfo(fLog); else fJobInfo.printJobInfoBrief(fLog); } inline void BulkLoad::setAllDebug(DebugLevel level) { setDebugLevel(level); fLog.setDebugLevel(level); } inline void BulkLoad::setColDelimiter(char delim) { fColDelim = delim; } inline void BulkLoad::setBulkLoadMode(BulkModeType bulkMode, const std::string& rptFileName) { fBulkMode = bulkMode; fBRMRptFileName = rptFileName; } inline void BulkLoad::setEnclosedByChar(char enChar) { fEnclosedByChar = enChar; } inline void BulkLoad::setEscapeChar(char esChar) { fEscapeChar = esChar; } inline void BulkLoad::setImportDataMode(ImportDataMode importMode) { fImportDataMode = importMode; } inline void BulkLoad::setKeepRbMetaFiles(bool keepMeta) { fKeepRbMetaFiles = keepMeta; } // Mutator takes an unsigned int, but we store in a long long, because... // TableInfo which eventually needs this attribute, takes an unsigned int, // but we want to be able to init to -1, to indicate when it has not been set. inline void BulkLoad::setMaxErrorCount(unsigned int maxErrors) { fMaxErrors = maxErrors; } inline void BulkLoad::setNoOfParseThreads(int parseThreads) { fNoOfParseThreads = parseThreads; } inline void BulkLoad::setNoOfReadThreads(int readThreads) { fNoOfReadThreads = readThreads; } inline void BulkLoad::setNullStringMode(bool bMode) { fNullStringMode = bMode; } inline void BulkLoad::setParserNum(int parser) { fNumOfParser = parser; } inline void BulkLoad::setProcessName(const std::string& processName) { fProcessName = processName; } inline void BulkLoad::setReadBufferCount(int noOfReadBuffers) { fNoOfBuffers = noOfReadBuffers; } inline void BulkLoad::setReadBufferSize(int readBufferSize) { fBufferSize = readBufferSize; } inline void BulkLoad::setTxnID(BRM::TxnID txnID) { fTxnID = txnID; } inline void BulkLoad::setVbufReadSize(int vbufReadSize) { fFileVbufSize = vbufReadSize; } inline void BulkLoad::setTruncationAsError(bool bTruncationAsError) { fbTruncationAsError = bTruncationAsError; } inline void BulkLoad::setErrorDir(const std::string& errorDir) { fErrorDir = errorDir; } inline void BulkLoad::setTimeZone(long timeZone) { fTimeZone = timeZone; } inline void BulkLoad::setS3Key(const std::string& key) { fS3Key = key; } inline void BulkLoad::setS3Secret(const std::string& secret) { fS3Secret = secret; } inline void BulkLoad::setS3Bucket(const std::string& bucket) { fS3Bucket = bucket; } inline void BulkLoad::setS3Host(const std::string& host) { fS3Host = host; } inline void BulkLoad::setS3Region(const std::string& region) { fS3Region = region; } inline void BulkLoad::setUsername(const std::string& username) { fUsername = username; } inline void BulkLoad::startTimer() { gettimeofday(&fStartTime, nullptr); } inline void BulkLoad::stopTimer() { gettimeofday(&fEndTime, nullptr); fTotalTime = (fEndTime.tv_sec + (fEndTime.tv_usec / 1000000.0)) - (fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0)); } inline double BulkLoad::getTotalRunTime() const { return fTotalTime; } inline void BulkLoad::disableTimeOut(const bool disableTimeOut) { fDisableTimeOut = disableTimeOut; } inline bool BulkLoad::disableTimeOut() const { return fDisableTimeOut; } } // namespace WriteEngine #undef EXPORT