/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id: we_columninfo.h 4726 2013-08-07 03:38:36Z bwilkinson $ * *******************************************************************************/ /** @file * Contains main class used to manage column information. */ #pragma once #include "we_type.h" #include "we_brm.h" #include "we_colop.h" #include "we_colbufmgr.h" #include "we_colextinf.h" #include "we_dctnrycompress.h" #include #include #include #include #include "atomicops.h" namespace WriteEngine { class Log; class ColumnAutoInc; class DBRootExtentTracker; class BRMReporter; class TableInfo; struct DBRootExtentInfo; enum Status { PARSE_COMPLETE = 0, READ_COMPLETE, READ_PROGRESS, NEW, ERR }; // State of starting db file: // CREATE_FILE_ON_EMPTY - empty PM; create starting seg file // CREATE_FILE_ON_DISABLED - HWM extent is disabled; create starting seg file // FILE_EXISTS - target seg file exists; will add more rows // ERROR_STATE - error has occurred executing delayed file creation enum InitialDBFileStat { INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY = 1, INITIAL_DBFILE_STAT_CREATE_FILE_ON_DISABLED = 2, INITIAL_DBFILE_STAT_FILE_EXISTS = 3, INITIAL_DBFILE_STAT_ERROR_STATE = 4 }; struct LockInfo { int locker; Status status; LockInfo() : locker(-1), status(WriteEngine::NEW) { } }; //------------------------------------------------------------------------------ // Note the following mutex lock usage using fColMutex within ColumnInfo. // This is the mutex that is used to manage the state of the current segment // column file associated with ColumnInfo. The state of the column being // managed by ColumnInfo is typically driven by calls from BulkLoadBuffer // to ColumnBufferManager; so ColumnBufferManager frequently initiates the // mutex lock. However, when parsing is complete for a table, then some // ColumnInfo functions are called outside of ColumnBufferManager scope (by // TableInfo::setParseComplete), and these ColumnInfo functions (getSegFileInfo // and getBRMUpdateInfo) must lock the mutex on their own. // // Explicit locks initiated by these ColumnBufferManager functions: // reserveSection() // releaseSection() // intermediateFlush() // extendTokenColumn() // // Implicit locks assumed by these ColumnBufferManager functions: // resizeColumnBuffer() (within scope of lock in reserveSection()) // rowsExtentCheck() (within scope of lock in reserveSection()) // writeToFile() (within scope of lock in reserveSection() or // releaseSection()) // writeToFileExtentCheck() (within scope of lock in writeToFile() or // flush() // finishFile() (within scope of lock in finishParsing() // flush() (within scope of lock in intermediateFlush() or // finishParsing()) // // Other ColumnBufferManager functions: // setDbFile() - called by main thread during preprocesing, or within scope // of a lock when an extent is being added. // resetToBeCompressedColBuf() - called within scope of a lock when an extent // is being added // // Explicit locks in ColumnInfo: // createDelayedFileIfNeeded() // getSegFileInfo() // getBRMUpdateInfo() // finishParsing() // //------------------------------------------------------------------------------ /** @brief Maintains information about a DB column. */ class ColumnInfo : public WeUIDGID { public: //-------------------------------------------------------------------------- // Public Data Members //-------------------------------------------------------------------------- /** @brief Current column */ Column curCol; /** @brief ColumnOp instance */ boost::scoped_ptr colOp; /** @brief Column information. */ JobColumn column; /** @brief column id */ int id; /** @brief Processing time for the column. (in milliseconds) */ double lastProcessingTime; #ifdef PROFILE /** @brief Total processing time for the column. (in milliseconds) */ double totalProcessingTime; #endif /** @brief Instance of the write buffer manager. */ ColumnBufferManager* fColBufferMgr; /** @brief Freespace (in bytes) at the end of the current db column file * For compressed data files, this is the "raw" data byte count, * not the compressed byte count. */ long long availFileSize; /** @brief Total size capacity of current db column segment file. * For compressed data files, this is the "raw" data byte count, * not the compressed byte count. */ long long fileSize; //-------------------------------------------------------------------------- // Public Functions //-------------------------------------------------------------------------- /** @brief Constructor. */ ColumnInfo(Log* logger, int id, const JobColumn& column, DBRootExtentTracker* pDBRootExtTrk, TableInfo* pTableInfo); /** @brief Destructor */ virtual ~ColumnInfo(); /** @brief Returns last input Row num in current "logical" extent; used * to track min/max value per extent, as the data is parsed. 0-based * where Row 0 is first valid input row in the import. */ RID lastInputRowInExtent() const; /** @brief Increment last input Row num in current "logical" extent, so * that it references the last row of the next extent; used in tracking * min/max value extent. 0-based where Row 0 is first valid input row * in the import. This function is called when a Read buffer crosses * an extent boundary. */ void lastInputRowInExtentInc(); /** @brief Update dictionary for arrow/parquet format * Parse and store the parquet data into the store file, and * returns the assigned tokens (tokenBuf) to be stored in the * corresponding column token file. */ int updateDctnryStoreParquet(std::shared_ptr columnData, int tokenPos, const int totalRow, char* tokenBuf); /** @brief Update dictionary method. * Parses and stores specified strings into the store file, and * returns the assigned tokens (tokenBuf) to be stored in the * corresponding column token file. */ int updateDctnryStore(char* buf, ColPosPair** pos, const int totalRow, char* tokenBuf); /** @brief Close the current Column file. * @param bCompletedExtent are we completing an extent * @param bAbort indicates if job is aborting and file should be * closed without doing extra work: flushing buffer, etc. */ virtual int closeColumnFile(bool bCompletingExtent, bool bAbort); /** @brief Close the current Dictionary store file. * @param bAbort Indicates if job is aborting and file should be * closed without doing extra work: flushing buffer, updating HWM, etc */ int closeDctnryStore(bool bAbort); /** @brief utility to convert a Status enumeration to a string */ static void convertStatusToString(WriteEngine::Status status, std::string& statusString); /** @brief Adds an extent to "this" column if needed to contain * the specified number of rows. (New version, supplants checkAnd- * ExtendColumn()). Also saves the HWM associated with the current * extent, and uses it to update the extentmap at the job's end. * * The state of ColumnInfo is updated to reflect the new extent. * For example, curCol is updated with the DBRoot, partition, and * segment file corresponding to the new extent and segment file. * * @param saveLBIDForCP (in) Should new extent's LBID be saved in the * extent stats we are saving to update Casual Partition. */ int extendColumn(bool saveLBIDForCP); /** @brief Get Extent Map updates to send to BRM at EOJ, for this column. * @param brmReporter Reporter object where BRM updates are to be saved */ void getBRMUpdateInfo(BRMReporter& brmReporter); /** @brief Commit/Save auto-increment updates */ int finishAutoInc(); /** @brief Get current dbroot, partition, segment and HWM for this column. */ void getSegFileInfo(DBRootExtentInfo& fileInfo); /** @brief Get last updated saved LBID. */ BRM::LBID_t getLastUpdatedLBID() const; /** @brief Initialize autoincrement value from the current "next" value * taken from the system catalog. */ int initAutoInc(const std::string& fullTableName); /** @brief Open a new Dictionary store file based on the setting of the * DBRoot, partition, and segment settings in curCol.dataFile. * @param bMustExist Indicates whether store file must already exist */ int openDctnryStore(bool bMustExist); /** @brief dictionary blocks that will need to be flushed from cache */ std::vector fDictBlocks; /** @brief Set abbreviated extent flag if this is an abbrev extent */ void setAbbrevExtentCheck(); /** @brief Is current extent we are loading, an "abbreviated" extent */ bool isAbbrevExtent(); /** @brief Expand abbreviated extent in current column segment file. * @param bRetainFilePos controls whether current file position is * to be retained up return from the function. */ int expandAbbrevExtent(bool bRetainFilePos); /** @brief Print extent CP information */ void printCPInfo(JobColumn column); /** @brief Set width factor relative to other columns in the same table. */ void relativeColWidthFactor(int colWidFactor); /** @brief Update extent CP information */ template void updateCPInfo(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType, int width); /** @brief Setup initial extent we will begin loading at start of import. * @param dbRoot DBRoot of starting extent * @param partition Partition number of starting extent * @param segment Segment file number of starting extent * @param tblName Name of table holding this column * @param lbid LBID associated with starting extent * @param oldHwm HWM associated with current HWM extent * @param hwm Starting HWM after oldHWM has been incremented to * account for initial block skipping. * @param bSkippedtoNewExtent Did block skipping advance to next extent * @param bIsNewExtent Treat as new extent when updating CP min/max */ int setupInitialColumnExtent(uint16_t dbRoot, uint32_t partition, uint16_t segment, const std::string& tblName, BRM::LBID_t lbid, HWM oldHwm, HWM hwm, bool bSkippedToNewExtent, bool bIsNewExtent); /** @brief Setup a DB file to be created for starting extent only when needed * @param dbRoot DBRoot of starting extent * @param partition Partition number of starting extent * @param segment Segment file number of starting extent * @param hwm Starting HWM for new start extent * @param bEmptyPM Are we setting up delayed file creation because a PM * has no extents (or is the HWM extent just disabled) */ void setupDelayedFileCreation(uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM hwm, bool bEmptyPM); /** @brief Belatedly create a starting DB file for a PM that has none. * @param tableName Name of table for which this column belongs */ int createDelayedFileIfNeeded(const std::string& tableName); /** @brief Update how many bytes of data are in the column segment file and * how much room remains in the file (till the current extent is full). * @param numBytesWritten Number of bytes just added to the column file. */ void updateBytesWrittenCounts(unsigned int numBytesWritten); /** @brief Returns the list of HWM dictionary blks to be cached */ void getDictFlushBlks(std::vector& blks) const; /** @brief Returns the current file size in bytes */ int64_t getFileSize() const; /** @brief Has file filled up all its extents */ bool isFileComplete() const; /** @brief Reserve block of auto-increment numbers to generate * @param autoIncCount The number of autoincrement numbers to be reserved. * @param nextValue Value of the first reserved auto inc number. */ int reserveAutoIncNums(uint32_t autoIncCount, uint64_t& nextValue); /** @brief Truncate specified dictionary file. Only applies if compressed. * @param dctnryOid Dictionary store OID * @param root DBRoot of relevant dictionary store segment file. * @param pNum Partition number of relevant dictionary store segment file. * @param sNum Segment number of relevant dictionary store segment file. */ virtual int truncateDctnryStore(OID dctnryOid, uint16_t root, uint32_t pNum, uint16_t sNum) const; /** @brief Increment saturated row count for this column in current import * @param satIncCnt Increment count to add to the total saturation count. */ void incSaturatedCnt(int64_t satIncCnt); /** @brief Get saturated row count for this column. */ long long saturatedCnt(); /** @brief When parsing is complete for a column, this function is called * to finish flushing and closing the current segment file. */ int finishParsing(); /** @brief Mutex used to manage access to the output buffers and files. * This was formerly the fMgrMutex in ColumnBufferManager. See comments * that precede this class definition for more information. */ boost::mutex& colMutex(); /** @brief Get number of rows per extent */ unsigned rowsPerExtent(); void setUIDGID(const uid_t uid, const gid_t gid) override; protected: //-------------------------------------------------------------------------- // Protected Functions //-------------------------------------------------------------------------- void addToSegFileList(File& dataFile, // save HWM info per segment file HWM hwm); void clearMemory(); // clear memory used by this object void getCPInfoForBRM(BRMReporter& brmReporter); // Get updated CP info for BRM int getHWMInfoForBRM(BRMReporter& brmReporter); // Get updated HWM inf for BRM // Init last input Row number in current "logical" extent; used // to track min/max value per extent. 0-based where Row 0 is first // valid input row in the import. // bIsNewExtent indicates whether to treat as a new extent or not. void lastInputRowInExtentInit(bool bIsNewExtent); virtual int resetFileOffsetsNewExtent(const char* hdr); // Reset file; start new extent void setFileSize(HWM hwm, int abbrevFlag); // Set fileSize data member // Prepare initial column segment file for importing of data. // oldHWM - Current HWM prior to initial block skipping. This is only // used for abbreviated extents, to detect when block skipping has // caused us to require a full expanded extent. // newHWM - Starting point for adding data after initial blockskipping virtual int setupInitialColumnFile(HWM oldHWM, // original HWM HWM newHWM); // new HWM to start from virtual int saveDctnryStoreHWMChunk(bool& needBackup); // Backup Dct HWM Chunk int extendColumnNewExtent( // extend column; new extent bool saveLBIDForCP, uint16_t dbRootNew, uint32_t partitionNew); virtual int extendColumnOldExtent( // extend column; existing extent uint16_t dbRootNext, uint32_t partitionNext, uint16_t segmentNext, HWM hwmNext); //-------------------------------------------------------------------------- // Protected Data Members //-------------------------------------------------------------------------- boost::mutex fDictionaryMutex; // Mutex for dicionary updates boost::mutex fColMutex; // Mutex for column changes boost::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue boost::mutex fDelayedFileCreateMutex; // Manage delayed file check/create Log* fLog; // Object used for logging // Blocks to skip at start of bulk load, if starting file has to be created // by createDelayedFileIfNeeded() HWM fDelayedFileStartBlksSkipped; // LBID corresponding to initial HWM saved in fSavedHWM at start of import. // // LBID is used, at the end of the import, to identify to DBRM, an // extent whose CasualPartition stats are to be cleared, because we will // have written additional rows to that extent as part of an import. BRM::LBID_t fSavedLbid; // The last updated LBID. In case `bulk` creates a new extent with new LBID // we have to initialize file header with created LBID. BRM::LBID_t fLastUpdatedLbid; // Size of a segment file (in bytes) when the file is opened // to add the next extent. // For compressed data files, this is the "raw" data byte count, // not the compressed byte count. long long fSizeWrittenStart; // Tracks the size of a segment file (in bytes) as rows are added. // For compressed data files, this is the "raw" data byte count, // not the compressed byte count. long long fSizeWritten; // Tracks last input Row number in the current "logical" extent, // where Row number is 0-based, with Row 0 being the first row in the // import. Used by parsing thread to track when a read buffer crosses // an extent boundary. We detect when a Read buffer crosses an ex- // pected extent boundary so that we can track a column's min/max for // each extent. RID fLastInputRowInCurrentExtent; bool fLoadingAbbreviatedExtent; // Is current extent abbreviated ColExtInfBase* fColExtInf; // Used to update CP at end of job long long fMaxNumRowsPerSegFile; // Max num rows per segment file Dctnry* fStore; // Corresponding dctnry store file // For autoincrement column only... Tracks latest autoincrement value used long long fAutoIncLastValue; volatile int64_t fSaturatedRowCnt; // No. of rows with saturated values // List of segment files updated during an import; used to track infor- // mation necessary to update the ExtentMap at the "end" of the import. std::vector fSegFileUpdateList; TableInfo* fpTableInfo; // pointer to the table info ColumnAutoInc* fAutoIncMgr; // Maintains autoIncrement nextValue DBRootExtentTracker* fDbRootExtTrk; // DBRoot extent tracker int fColWidthFactor; // Wid factor relative to other cols InitialDBFileStat fDelayedFileCreation; // Denotes when initial DB file is // to be created after preprocessing unsigned fRowsPerExtent; // Number of rows per column extent }; //------------------------------------------------------------------------------ // Inline functions //------------------------------------------------------------------------------ inline void ColumnInfo::setUIDGID(const uid_t p_uid, const gid_t p_gid) { WeUIDGID::setUIDGID(p_uid, p_gid); if (colOp) colOp->setUIDGID(this); } inline boost::mutex& ColumnInfo::colMutex() { return fColMutex; } inline void ColumnInfo::getDictFlushBlks(std::vector& blks) const { blks = fDictBlocks; } inline int64_t ColumnInfo::getFileSize() const { return fileSize; } inline void ColumnInfo::incSaturatedCnt(int64_t satIncCnt) { (void)atomicops::atomicAdd(&fSaturatedRowCnt, satIncCnt); } inline bool ColumnInfo::isAbbrevExtent() { return fLoadingAbbreviatedExtent; } inline RID ColumnInfo::lastInputRowInExtent() const { return fLastInputRowInCurrentExtent; } inline void ColumnInfo::printCPInfo(JobColumn column) { fColExtInf->print(column); } inline long long ColumnInfo::saturatedCnt() { return fSaturatedRowCnt; } inline void ColumnInfo::relativeColWidthFactor(int colWidFactor) { fColWidthFactor = colWidFactor; } inline unsigned ColumnInfo::rowsPerExtent() { return fRowsPerExtent; } template inline void ColumnInfo::updateCPInfo(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType, int width) { fColExtInf->addOrUpdateEntry(lastInputRow, minVal, maxVal, colDataType, width); } } // namespace WriteEngine