1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-26 11:48:52 +03:00

567 lines
20 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*******************************************************************************
* $Id: we_columninfo.h 4726 2013-08-07 03:38:36Z bwilkinson $
*
*******************************************************************************/
/** @file
* Contains main class used to manage column information.
*/
#pragma once
#include "we_type.h"
#include "we_brm.h"
#include "we_colop.h"
#include "we_colbufmgr.h"
#include "we_colextinf.h"
#include "we_dctnrycompress.h"
#include <boost/thread/mutex.hpp>
#include <boost/scoped_ptr.hpp>
#include <sys/time.h>
#include <vector>
#include "atomicops.h"
namespace WriteEngine
{
class Log;
class ColumnAutoInc;
class DBRootExtentTracker;
class BRMReporter;
class TableInfo;
struct DBRootExtentInfo;
enum Status
{
PARSE_COMPLETE = 0,
READ_COMPLETE,
READ_PROGRESS,
NEW,
ERR
};
// State of starting db file:
// CREATE_FILE_ON_EMPTY - empty PM; create starting seg file
// CREATE_FILE_ON_DISABLED - HWM extent is disabled; create starting seg file
// FILE_EXISTS - target seg file exists; will add more rows
// ERROR_STATE - error has occurred executing delayed file creation
enum InitialDBFileStat
{
INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY = 1,
INITIAL_DBFILE_STAT_CREATE_FILE_ON_DISABLED = 2,
INITIAL_DBFILE_STAT_FILE_EXISTS = 3,
INITIAL_DBFILE_STAT_ERROR_STATE = 4
};
struct LockInfo
{
int locker;
Status status;
LockInfo() : locker(-1), status(WriteEngine::NEW)
{
}
};
//------------------------------------------------------------------------------
// Note the following mutex lock usage using fColMutex within ColumnInfo.
// This is the mutex that is used to manage the state of the current segment
// column file associated with ColumnInfo. The state of the column being
// managed by ColumnInfo is typically driven by calls from BulkLoadBuffer
// to ColumnBufferManager; so ColumnBufferManager frequently initiates the
// mutex lock. However, when parsing is complete for a table, then some
// ColumnInfo functions are called outside of ColumnBufferManager scope (by
// TableInfo::setParseComplete), and these ColumnInfo functions (getSegFileInfo
// and getBRMUpdateInfo) must lock the mutex on their own.
//
// Explicit locks initiated by these ColumnBufferManager functions:
// reserveSection()
// releaseSection()
// intermediateFlush()
// extendTokenColumn()
//
// Implicit locks assumed by these ColumnBufferManager functions:
// resizeColumnBuffer() (within scope of lock in reserveSection())
// rowsExtentCheck() (within scope of lock in reserveSection())
// writeToFile() (within scope of lock in reserveSection() or
// releaseSection())
// writeToFileExtentCheck() (within scope of lock in writeToFile() or
// flush()
// finishFile() (within scope of lock in finishParsing()
// flush() (within scope of lock in intermediateFlush() or
// finishParsing())
//
// Other ColumnBufferManager functions:
// setDbFile() - called by main thread during preprocesing, or within scope
// of a lock when an extent is being added.
// resetToBeCompressedColBuf() - called within scope of a lock when an extent
// is being added
//
// Explicit locks in ColumnInfo:
// createDelayedFileIfNeeded()
// getSegFileInfo()
// getBRMUpdateInfo()
// finishParsing()
//
//------------------------------------------------------------------------------
/** @brief Maintains information about a DB column.
*/
class ColumnInfo : public WeUIDGID
{
public:
//--------------------------------------------------------------------------
// Public Data Members
//--------------------------------------------------------------------------
/** @brief Current column
*/
Column curCol;
/** @brief ColumnOp instance
*/
boost::scoped_ptr<ColumnOp> colOp;
/** @brief Column information.
*/
JobColumn column;
/** @brief column id
*/
int id;
/** @brief Processing time for the column. (in milliseconds)
*/
double lastProcessingTime;
#ifdef PROFILE
/** @brief Total processing time for the column. (in milliseconds)
*/
double totalProcessingTime;
#endif
/** @brief Instance of the write buffer manager.
*/
ColumnBufferManager* fColBufferMgr;
/** @brief Freespace (in bytes) at the end of the current db column file
* For compressed data files, this is the "raw" data byte count,
* not the compressed byte count.
*/
long long availFileSize;
/** @brief Total size capacity of current db column segment file.
* For compressed data files, this is the "raw" data byte count,
* not the compressed byte count.
*/
long long fileSize;
//--------------------------------------------------------------------------
// Public Functions
//--------------------------------------------------------------------------
/** @brief Constructor.
*/
ColumnInfo(Log* logger, int id, const JobColumn& column, DBRootExtentTracker* pDBRootExtTrk,
TableInfo* pTableInfo);
/** @brief Destructor
*/
virtual ~ColumnInfo();
/** @brief Returns last input Row num in current "logical" extent; used
* to track min/max value per extent, as the data is parsed. 0-based
* where Row 0 is first valid input row in the import.
*/
RID lastInputRowInExtent() const;
/** @brief Increment last input Row num in current "logical" extent, so
* that it references the last row of the next extent; used in tracking
* min/max value extent. 0-based where Row 0 is first valid input row
* in the import. This function is called when a Read buffer crosses
* an extent boundary.
*/
void lastInputRowInExtentInc();
/** @brief Update dictionary for arrow/parquet format
* Parse and store the parquet data into the store file, and
* returns the assigned tokens (tokenBuf) to be stored in the
* corresponding column token file.
*/
int updateDctnryStoreParquet(std::shared_ptr<arrow::Array> columnData, int tokenPos, const int totalRow, char* tokenBuf);
/** @brief Update dictionary method.
* Parses and stores specified strings into the store file, and
* returns the assigned tokens (tokenBuf) to be stored in the
* corresponding column token file.
*/
int updateDctnryStore(char* buf, ColPosPair** pos, const int totalRow, char* tokenBuf);
/** @brief Close the current Column file.
* @param bCompletedExtent are we completing an extent
* @param bAbort indicates if job is aborting and file should be
* closed without doing extra work: flushing buffer, etc.
*/
virtual int closeColumnFile(bool bCompletingExtent, bool bAbort);
/** @brief Close the current Dictionary store file.
* @param bAbort Indicates if job is aborting and file should be
* closed without doing extra work: flushing buffer, updating HWM, etc
*/
int closeDctnryStore(bool bAbort);
/** @brief utility to convert a Status enumeration to a string
*/
static void convertStatusToString(WriteEngine::Status status, std::string& statusString);
/** @brief Adds an extent to "this" column if needed to contain
* the specified number of rows. (New version, supplants checkAnd-
* ExtendColumn()). Also saves the HWM associated with the current
* extent, and uses it to update the extentmap at the job's end.
*
* The state of ColumnInfo is updated to reflect the new extent.
* For example, curCol is updated with the DBRoot, partition, and
* segment file corresponding to the new extent and segment file.
*
* @param saveLBIDForCP (in) Should new extent's LBID be saved in the
* extent stats we are saving to update Casual Partition.
*/
int extendColumn(bool saveLBIDForCP);
/** @brief Get Extent Map updates to send to BRM at EOJ, for this column.
* @param brmReporter Reporter object where BRM updates are to be saved
*/
void getBRMUpdateInfo(BRMReporter& brmReporter);
/** @brief Commit/Save auto-increment updates
*/
int finishAutoInc();
/** @brief Get current dbroot, partition, segment and HWM for this column.
*/
void getSegFileInfo(DBRootExtentInfo& fileInfo);
/** @brief Get last updated saved LBID.
*/
BRM::LBID_t getLastUpdatedLBID() const;
/** @brief Initialize autoincrement value from the current "next" value
* taken from the system catalog.
*/
int initAutoInc(const std::string& fullTableName);
/** @brief Open a new Dictionary store file based on the setting of the
* DBRoot, partition, and segment settings in curCol.dataFile.
* @param bMustExist Indicates whether store file must already exist
*/
int openDctnryStore(bool bMustExist);
/** @brief dictionary blocks that will need to be flushed from cache */
std::vector<BRM::LBID_t> fDictBlocks;
/** @brief Set abbreviated extent flag if this is an abbrev extent */
void setAbbrevExtentCheck();
/** @brief Is current extent we are loading, an "abbreviated" extent
*/
bool isAbbrevExtent();
/** @brief Expand abbreviated extent in current column segment file.
* @param bRetainFilePos controls whether current file position is
* to be retained up return from the function.
*/
int expandAbbrevExtent(bool bRetainFilePos);
/** @brief Print extent CP information
*/
void printCPInfo(JobColumn column);
/** @brief Set width factor relative to other columns in the same table.
*/
void relativeColWidthFactor(int colWidFactor);
/** @brief Update extent CP information
*/
template <typename T>
void updateCPInfo(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType, int width);
/** @brief Setup initial extent we will begin loading at start of import.
* @param dbRoot DBRoot of starting extent
* @param partition Partition number of starting extent
* @param segment Segment file number of starting extent
* @param tblName Name of table holding this column
* @param lbid LBID associated with starting extent
* @param oldHwm HWM associated with current HWM extent
* @param hwm Starting HWM after oldHWM has been incremented to
* account for initial block skipping.
* @param bSkippedtoNewExtent Did block skipping advance to next extent
* @param bIsNewExtent Treat as new extent when updating CP min/max
*/
int setupInitialColumnExtent(uint16_t dbRoot, uint32_t partition, uint16_t segment,
const std::string& tblName, BRM::LBID_t lbid, HWM oldHwm, HWM hwm,
bool bSkippedToNewExtent, bool bIsNewExtent);
/** @brief Setup a DB file to be created for starting extent only when needed
* @param dbRoot DBRoot of starting extent
* @param partition Partition number of starting extent
* @param segment Segment file number of starting extent
* @param hwm Starting HWM for new start extent
* @param bEmptyPM Are we setting up delayed file creation because a PM
* has no extents (or is the HWM extent just disabled)
*/
void setupDelayedFileCreation(uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM hwm,
bool bEmptyPM);
/** @brief Belatedly create a starting DB file for a PM that has none.
* @param tableName Name of table for which this column belongs
*/
int createDelayedFileIfNeeded(const std::string& tableName);
/** @brief Update how many bytes of data are in the column segment file and
* how much room remains in the file (till the current extent is full).
* @param numBytesWritten Number of bytes just added to the column file.
*/
void updateBytesWrittenCounts(unsigned int numBytesWritten);
/** @brief Returns the list of HWM dictionary blks to be cached
*/
void getDictFlushBlks(std::vector<BRM::LBID_t>& blks) const;
/** @brief Returns the current file size in bytes
*/
int64_t getFileSize() const;
/** @brief Has file filled up all its extents
*/
bool isFileComplete() const;
/** @brief Reserve block of auto-increment numbers to generate
* @param autoIncCount The number of autoincrement numbers to be reserved.
* @param nextValue Value of the first reserved auto inc number.
*/
int reserveAutoIncNums(uint32_t autoIncCount, uint64_t& nextValue);
/** @brief Truncate specified dictionary file. Only applies if compressed.
* @param dctnryOid Dictionary store OID
* @param root DBRoot of relevant dictionary store segment file.
* @param pNum Partition number of relevant dictionary store segment file.
* @param sNum Segment number of relevant dictionary store segment file.
*/
virtual int truncateDctnryStore(OID dctnryOid, uint16_t root, uint32_t pNum, uint16_t sNum) const;
/** @brief Increment saturated row count for this column in current import
* @param satIncCnt Increment count to add to the total saturation count.
*/
void incSaturatedCnt(int64_t satIncCnt);
/** @brief Get saturated row count for this column.
*/
long long saturatedCnt();
/** @brief When parsing is complete for a column, this function is called
* to finish flushing and closing the current segment file.
*/
int finishParsing();
/** @brief Mutex used to manage access to the output buffers and files.
* This was formerly the fMgrMutex in ColumnBufferManager. See comments
* that precede this class definition for more information.
*/
boost::mutex& colMutex();
/** @brief Get number of rows per extent
*/
unsigned rowsPerExtent();
void setUIDGID(const uid_t uid, const gid_t gid) override;
protected:
//--------------------------------------------------------------------------
// Protected Functions
//--------------------------------------------------------------------------
void addToSegFileList(File& dataFile, // save HWM info per segment file
HWM hwm);
void clearMemory(); // clear memory used by this object
void getCPInfoForBRM(BRMReporter& brmReporter); // Get updated CP info for BRM
int getHWMInfoForBRM(BRMReporter& brmReporter); // Get updated HWM inf for BRM
// Init last input Row number in current "logical" extent; used
// to track min/max value per extent. 0-based where Row 0 is first
// valid input row in the import.
// bIsNewExtent indicates whether to treat as a new extent or not.
void lastInputRowInExtentInit(bool bIsNewExtent);
virtual int resetFileOffsetsNewExtent(const char* hdr);
// Reset file; start new extent
void setFileSize(HWM hwm, int abbrevFlag); // Set fileSize data member
// Prepare initial column segment file for importing of data.
// oldHWM - Current HWM prior to initial block skipping. This is only
// used for abbreviated extents, to detect when block skipping has
// caused us to require a full expanded extent.
// newHWM - Starting point for adding data after initial blockskipping
virtual int setupInitialColumnFile(HWM oldHWM, // original HWM
HWM newHWM); // new HWM to start from
virtual int saveDctnryStoreHWMChunk(bool& needBackup); // Backup Dct HWM Chunk
int extendColumnNewExtent( // extend column; new extent
bool saveLBIDForCP, uint16_t dbRootNew, uint32_t partitionNew);
virtual int extendColumnOldExtent( // extend column; existing extent
uint16_t dbRootNext, uint32_t partitionNext, uint16_t segmentNext, HWM hwmNext);
//--------------------------------------------------------------------------
// Protected Data Members
//--------------------------------------------------------------------------
boost::mutex fDictionaryMutex; // Mutex for dicionary updates
boost::mutex fColMutex; // Mutex for column changes
boost::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue
boost::mutex fDelayedFileCreateMutex; // Manage delayed file check/create
Log* fLog; // Object used for logging
// Blocks to skip at start of bulk load, if starting file has to be created
// by createDelayedFileIfNeeded()
HWM fDelayedFileStartBlksSkipped;
// LBID corresponding to initial HWM saved in fSavedHWM at start of import.
//
// LBID is used, at the end of the import, to identify to DBRM, an
// extent whose CasualPartition stats are to be cleared, because we will
// have written additional rows to that extent as part of an import.
BRM::LBID_t fSavedLbid;
// The last updated LBID. In case `bulk` creates a new extent with new LBID
// we have to initialize file header with created LBID.
BRM::LBID_t fLastUpdatedLbid;
// Size of a segment file (in bytes) when the file is opened
// to add the next extent.
// For compressed data files, this is the "raw" data byte count,
// not the compressed byte count.
long long fSizeWrittenStart;
// Tracks the size of a segment file (in bytes) as rows are added.
// For compressed data files, this is the "raw" data byte count,
// not the compressed byte count.
long long fSizeWritten;
// Tracks last input Row number in the current "logical" extent,
// where Row number is 0-based, with Row 0 being the first row in the
// import. Used by parsing thread to track when a read buffer crosses
// an extent boundary. We detect when a Read buffer crosses an ex-
// pected extent boundary so that we can track a column's min/max for
// each extent.
RID fLastInputRowInCurrentExtent;
bool fLoadingAbbreviatedExtent; // Is current extent abbreviated
ColExtInfBase* fColExtInf; // Used to update CP at end of job
long long fMaxNumRowsPerSegFile; // Max num rows per segment file
Dctnry* fStore; // Corresponding dctnry store file
// For autoincrement column only... Tracks latest autoincrement value used
long long fAutoIncLastValue;
volatile int64_t fSaturatedRowCnt; // No. of rows with saturated values
// List of segment files updated during an import; used to track infor-
// mation necessary to update the ExtentMap at the "end" of the import.
std::vector<File> fSegFileUpdateList;
TableInfo* fpTableInfo; // pointer to the table info
ColumnAutoInc* fAutoIncMgr; // Maintains autoIncrement nextValue
DBRootExtentTracker* fDbRootExtTrk; // DBRoot extent tracker
int fColWidthFactor; // Wid factor relative to other cols
InitialDBFileStat fDelayedFileCreation; // Denotes when initial DB file is
// to be created after preprocessing
unsigned fRowsPerExtent; // Number of rows per column extent
};
//------------------------------------------------------------------------------
// Inline functions
//------------------------------------------------------------------------------
inline void ColumnInfo::setUIDGID(const uid_t p_uid, const gid_t p_gid)
{
WeUIDGID::setUIDGID(p_uid, p_gid);
if (colOp)
colOp->setUIDGID(this);
}
inline boost::mutex& ColumnInfo::colMutex()
{
return fColMutex;
}
inline void ColumnInfo::getDictFlushBlks(std::vector<BRM::LBID_t>& blks) const
{
blks = fDictBlocks;
}
inline int64_t ColumnInfo::getFileSize() const
{
return fileSize;
}
inline void ColumnInfo::incSaturatedCnt(int64_t satIncCnt)
{
(void)atomicops::atomicAdd(&fSaturatedRowCnt, satIncCnt);
}
inline bool ColumnInfo::isAbbrevExtent()
{
return fLoadingAbbreviatedExtent;
}
inline RID ColumnInfo::lastInputRowInExtent() const
{
return fLastInputRowInCurrentExtent;
}
inline void ColumnInfo::printCPInfo(JobColumn column)
{
fColExtInf->print(column);
}
inline long long ColumnInfo::saturatedCnt()
{
return fSaturatedRowCnt;
}
inline void ColumnInfo::relativeColWidthFactor(int colWidFactor)
{
fColWidthFactor = colWidFactor;
}
inline unsigned ColumnInfo::rowsPerExtent()
{
return fRowsPerExtent;
}
template <typename T>
inline void ColumnInfo::updateCPInfo(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType, int width)
{
fColExtInf->addOrUpdateEntry(lastInputRow, minVal, maxVal, colDataType, width);
}
} // namespace WriteEngine