mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-26 11:48:52 +03:00
567 lines
20 KiB
C++
567 lines
20 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*******************************************************************************
|
|
* $Id: we_columninfo.h 4726 2013-08-07 03:38:36Z bwilkinson $
|
|
*
|
|
*******************************************************************************/
|
|
|
|
/** @file
|
|
* Contains main class used to manage column information.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "we_type.h"
|
|
#include "we_brm.h"
|
|
#include "we_colop.h"
|
|
#include "we_colbufmgr.h"
|
|
#include "we_colextinf.h"
|
|
#include "we_dctnrycompress.h"
|
|
|
|
#include <boost/thread/mutex.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
#include <sys/time.h>
|
|
#include <vector>
|
|
|
|
#include "atomicops.h"
|
|
|
|
namespace WriteEngine
|
|
{
|
|
class Log;
|
|
class ColumnAutoInc;
|
|
class DBRootExtentTracker;
|
|
class BRMReporter;
|
|
class TableInfo;
|
|
struct DBRootExtentInfo;
|
|
|
|
enum Status
|
|
{
|
|
PARSE_COMPLETE = 0,
|
|
READ_COMPLETE,
|
|
READ_PROGRESS,
|
|
NEW,
|
|
ERR
|
|
};
|
|
|
|
// State of starting db file:
|
|
// CREATE_FILE_ON_EMPTY - empty PM; create starting seg file
|
|
// CREATE_FILE_ON_DISABLED - HWM extent is disabled; create starting seg file
|
|
// FILE_EXISTS - target seg file exists; will add more rows
|
|
// ERROR_STATE - error has occurred executing delayed file creation
|
|
enum InitialDBFileStat
|
|
{
|
|
INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY = 1,
|
|
INITIAL_DBFILE_STAT_CREATE_FILE_ON_DISABLED = 2,
|
|
INITIAL_DBFILE_STAT_FILE_EXISTS = 3,
|
|
INITIAL_DBFILE_STAT_ERROR_STATE = 4
|
|
};
|
|
|
|
struct LockInfo
|
|
{
|
|
int locker;
|
|
Status status;
|
|
LockInfo() : locker(-1), status(WriteEngine::NEW)
|
|
{
|
|
}
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Note the following mutex lock usage using fColMutex within ColumnInfo.
|
|
// This is the mutex that is used to manage the state of the current segment
|
|
// column file associated with ColumnInfo. The state of the column being
|
|
// managed by ColumnInfo is typically driven by calls from BulkLoadBuffer
|
|
// to ColumnBufferManager; so ColumnBufferManager frequently initiates the
|
|
// mutex lock. However, when parsing is complete for a table, then some
|
|
// ColumnInfo functions are called outside of ColumnBufferManager scope (by
|
|
// TableInfo::setParseComplete), and these ColumnInfo functions (getSegFileInfo
|
|
// and getBRMUpdateInfo) must lock the mutex on their own.
|
|
//
|
|
// Explicit locks initiated by these ColumnBufferManager functions:
|
|
// reserveSection()
|
|
// releaseSection()
|
|
// intermediateFlush()
|
|
// extendTokenColumn()
|
|
//
|
|
// Implicit locks assumed by these ColumnBufferManager functions:
|
|
// resizeColumnBuffer() (within scope of lock in reserveSection())
|
|
// rowsExtentCheck() (within scope of lock in reserveSection())
|
|
// writeToFile() (within scope of lock in reserveSection() or
|
|
// releaseSection())
|
|
// writeToFileExtentCheck() (within scope of lock in writeToFile() or
|
|
// flush()
|
|
// finishFile() (within scope of lock in finishParsing()
|
|
// flush() (within scope of lock in intermediateFlush() or
|
|
// finishParsing())
|
|
//
|
|
// Other ColumnBufferManager functions:
|
|
// setDbFile() - called by main thread during preprocesing, or within scope
|
|
// of a lock when an extent is being added.
|
|
// resetToBeCompressedColBuf() - called within scope of a lock when an extent
|
|
// is being added
|
|
//
|
|
// Explicit locks in ColumnInfo:
|
|
// createDelayedFileIfNeeded()
|
|
// getSegFileInfo()
|
|
// getBRMUpdateInfo()
|
|
// finishParsing()
|
|
//
|
|
//------------------------------------------------------------------------------
|
|
|
|
/** @brief Maintains information about a DB column.
|
|
*/
|
|
class ColumnInfo : public WeUIDGID
|
|
{
|
|
public:
|
|
//--------------------------------------------------------------------------
|
|
// Public Data Members
|
|
//--------------------------------------------------------------------------
|
|
|
|
/** @brief Current column
|
|
*/
|
|
Column curCol;
|
|
|
|
/** @brief ColumnOp instance
|
|
*/
|
|
boost::scoped_ptr<ColumnOp> colOp;
|
|
|
|
/** @brief Column information.
|
|
*/
|
|
JobColumn column;
|
|
|
|
/** @brief column id
|
|
*/
|
|
int id;
|
|
|
|
/** @brief Processing time for the column. (in milliseconds)
|
|
*/
|
|
double lastProcessingTime;
|
|
|
|
#ifdef PROFILE
|
|
/** @brief Total processing time for the column. (in milliseconds)
|
|
*/
|
|
double totalProcessingTime;
|
|
#endif
|
|
|
|
/** @brief Instance of the write buffer manager.
|
|
*/
|
|
ColumnBufferManager* fColBufferMgr;
|
|
|
|
/** @brief Freespace (in bytes) at the end of the current db column file
|
|
* For compressed data files, this is the "raw" data byte count,
|
|
* not the compressed byte count.
|
|
*/
|
|
long long availFileSize;
|
|
|
|
/** @brief Total size capacity of current db column segment file.
|
|
* For compressed data files, this is the "raw" data byte count,
|
|
* not the compressed byte count.
|
|
*/
|
|
long long fileSize;
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Public Functions
|
|
//--------------------------------------------------------------------------
|
|
|
|
/** @brief Constructor.
|
|
*/
|
|
ColumnInfo(Log* logger, int id, const JobColumn& column, DBRootExtentTracker* pDBRootExtTrk,
|
|
TableInfo* pTableInfo);
|
|
|
|
/** @brief Destructor
|
|
*/
|
|
virtual ~ColumnInfo();
|
|
|
|
/** @brief Returns last input Row num in current "logical" extent; used
|
|
* to track min/max value per extent, as the data is parsed. 0-based
|
|
* where Row 0 is first valid input row in the import.
|
|
*/
|
|
RID lastInputRowInExtent() const;
|
|
|
|
/** @brief Increment last input Row num in current "logical" extent, so
|
|
* that it references the last row of the next extent; used in tracking
|
|
* min/max value extent. 0-based where Row 0 is first valid input row
|
|
* in the import. This function is called when a Read buffer crosses
|
|
* an extent boundary.
|
|
*/
|
|
void lastInputRowInExtentInc();
|
|
|
|
/** @brief Update dictionary for arrow/parquet format
|
|
* Parse and store the parquet data into the store file, and
|
|
* returns the assigned tokens (tokenBuf) to be stored in the
|
|
* corresponding column token file.
|
|
*/
|
|
int updateDctnryStoreParquet(std::shared_ptr<arrow::Array> columnData, int tokenPos, const int totalRow, char* tokenBuf);
|
|
|
|
/** @brief Update dictionary method.
|
|
* Parses and stores specified strings into the store file, and
|
|
* returns the assigned tokens (tokenBuf) to be stored in the
|
|
* corresponding column token file.
|
|
*/
|
|
int updateDctnryStore(char* buf, ColPosPair** pos, const int totalRow, char* tokenBuf);
|
|
|
|
/** @brief Close the current Column file.
|
|
* @param bCompletedExtent are we completing an extent
|
|
* @param bAbort indicates if job is aborting and file should be
|
|
* closed without doing extra work: flushing buffer, etc.
|
|
*/
|
|
virtual int closeColumnFile(bool bCompletingExtent, bool bAbort);
|
|
|
|
/** @brief Close the current Dictionary store file.
|
|
* @param bAbort Indicates if job is aborting and file should be
|
|
* closed without doing extra work: flushing buffer, updating HWM, etc
|
|
*/
|
|
int closeDctnryStore(bool bAbort);
|
|
|
|
/** @brief utility to convert a Status enumeration to a string
|
|
*/
|
|
static void convertStatusToString(WriteEngine::Status status, std::string& statusString);
|
|
|
|
/** @brief Adds an extent to "this" column if needed to contain
|
|
* the specified number of rows. (New version, supplants checkAnd-
|
|
* ExtendColumn()). Also saves the HWM associated with the current
|
|
* extent, and uses it to update the extentmap at the job's end.
|
|
*
|
|
* The state of ColumnInfo is updated to reflect the new extent.
|
|
* For example, curCol is updated with the DBRoot, partition, and
|
|
* segment file corresponding to the new extent and segment file.
|
|
*
|
|
* @param saveLBIDForCP (in) Should new extent's LBID be saved in the
|
|
* extent stats we are saving to update Casual Partition.
|
|
*/
|
|
int extendColumn(bool saveLBIDForCP);
|
|
|
|
/** @brief Get Extent Map updates to send to BRM at EOJ, for this column.
|
|
* @param brmReporter Reporter object where BRM updates are to be saved
|
|
*/
|
|
void getBRMUpdateInfo(BRMReporter& brmReporter);
|
|
|
|
/** @brief Commit/Save auto-increment updates
|
|
*/
|
|
int finishAutoInc();
|
|
|
|
/** @brief Get current dbroot, partition, segment and HWM for this column.
|
|
*/
|
|
void getSegFileInfo(DBRootExtentInfo& fileInfo);
|
|
|
|
/** @brief Get last updated saved LBID.
|
|
*/
|
|
BRM::LBID_t getLastUpdatedLBID() const;
|
|
|
|
/** @brief Initialize autoincrement value from the current "next" value
|
|
* taken from the system catalog.
|
|
*/
|
|
int initAutoInc(const std::string& fullTableName);
|
|
|
|
/** @brief Open a new Dictionary store file based on the setting of the
|
|
* DBRoot, partition, and segment settings in curCol.dataFile.
|
|
* @param bMustExist Indicates whether store file must already exist
|
|
*/
|
|
int openDctnryStore(bool bMustExist);
|
|
|
|
/** @brief dictionary blocks that will need to be flushed from cache */
|
|
std::vector<BRM::LBID_t> fDictBlocks;
|
|
|
|
/** @brief Set abbreviated extent flag if this is an abbrev extent */
|
|
void setAbbrevExtentCheck();
|
|
|
|
/** @brief Is current extent we are loading, an "abbreviated" extent
|
|
*/
|
|
bool isAbbrevExtent();
|
|
|
|
/** @brief Expand abbreviated extent in current column segment file.
|
|
* @param bRetainFilePos controls whether current file position is
|
|
* to be retained up return from the function.
|
|
*/
|
|
int expandAbbrevExtent(bool bRetainFilePos);
|
|
|
|
/** @brief Print extent CP information
|
|
*/
|
|
void printCPInfo(JobColumn column);
|
|
|
|
/** @brief Set width factor relative to other columns in the same table.
|
|
*/
|
|
void relativeColWidthFactor(int colWidFactor);
|
|
|
|
/** @brief Update extent CP information
|
|
*/
|
|
template <typename T>
|
|
void updateCPInfo(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType, int width);
|
|
|
|
/** @brief Setup initial extent we will begin loading at start of import.
|
|
* @param dbRoot DBRoot of starting extent
|
|
* @param partition Partition number of starting extent
|
|
* @param segment Segment file number of starting extent
|
|
* @param tblName Name of table holding this column
|
|
* @param lbid LBID associated with starting extent
|
|
* @param oldHwm HWM associated with current HWM extent
|
|
* @param hwm Starting HWM after oldHWM has been incremented to
|
|
* account for initial block skipping.
|
|
* @param bSkippedtoNewExtent Did block skipping advance to next extent
|
|
* @param bIsNewExtent Treat as new extent when updating CP min/max
|
|
*/
|
|
int setupInitialColumnExtent(uint16_t dbRoot, uint32_t partition, uint16_t segment,
|
|
const std::string& tblName, BRM::LBID_t lbid, HWM oldHwm, HWM hwm,
|
|
bool bSkippedToNewExtent, bool bIsNewExtent);
|
|
|
|
/** @brief Setup a DB file to be created for starting extent only when needed
|
|
* @param dbRoot DBRoot of starting extent
|
|
* @param partition Partition number of starting extent
|
|
* @param segment Segment file number of starting extent
|
|
* @param hwm Starting HWM for new start extent
|
|
* @param bEmptyPM Are we setting up delayed file creation because a PM
|
|
* has no extents (or is the HWM extent just disabled)
|
|
*/
|
|
void setupDelayedFileCreation(uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM hwm,
|
|
bool bEmptyPM);
|
|
|
|
/** @brief Belatedly create a starting DB file for a PM that has none.
|
|
* @param tableName Name of table for which this column belongs
|
|
*/
|
|
int createDelayedFileIfNeeded(const std::string& tableName);
|
|
|
|
/** @brief Update how many bytes of data are in the column segment file and
|
|
* how much room remains in the file (till the current extent is full).
|
|
* @param numBytesWritten Number of bytes just added to the column file.
|
|
*/
|
|
void updateBytesWrittenCounts(unsigned int numBytesWritten);
|
|
|
|
/** @brief Returns the list of HWM dictionary blks to be cached
|
|
*/
|
|
void getDictFlushBlks(std::vector<BRM::LBID_t>& blks) const;
|
|
|
|
/** @brief Returns the current file size in bytes
|
|
*/
|
|
int64_t getFileSize() const;
|
|
|
|
/** @brief Has file filled up all its extents
|
|
*/
|
|
bool isFileComplete() const;
|
|
|
|
/** @brief Reserve block of auto-increment numbers to generate
|
|
* @param autoIncCount The number of autoincrement numbers to be reserved.
|
|
* @param nextValue Value of the first reserved auto inc number.
|
|
*/
|
|
int reserveAutoIncNums(uint32_t autoIncCount, uint64_t& nextValue);
|
|
|
|
/** @brief Truncate specified dictionary file. Only applies if compressed.
|
|
* @param dctnryOid Dictionary store OID
|
|
* @param root DBRoot of relevant dictionary store segment file.
|
|
* @param pNum Partition number of relevant dictionary store segment file.
|
|
* @param sNum Segment number of relevant dictionary store segment file.
|
|
*/
|
|
virtual int truncateDctnryStore(OID dctnryOid, uint16_t root, uint32_t pNum, uint16_t sNum) const;
|
|
|
|
/** @brief Increment saturated row count for this column in current import
|
|
* @param satIncCnt Increment count to add to the total saturation count.
|
|
*/
|
|
void incSaturatedCnt(int64_t satIncCnt);
|
|
|
|
/** @brief Get saturated row count for this column.
|
|
*/
|
|
long long saturatedCnt();
|
|
|
|
/** @brief When parsing is complete for a column, this function is called
|
|
* to finish flushing and closing the current segment file.
|
|
*/
|
|
int finishParsing();
|
|
|
|
/** @brief Mutex used to manage access to the output buffers and files.
|
|
* This was formerly the fMgrMutex in ColumnBufferManager. See comments
|
|
* that precede this class definition for more information.
|
|
*/
|
|
boost::mutex& colMutex();
|
|
|
|
/** @brief Get number of rows per extent
|
|
*/
|
|
unsigned rowsPerExtent();
|
|
|
|
void setUIDGID(const uid_t uid, const gid_t gid) override;
|
|
|
|
protected:
|
|
//--------------------------------------------------------------------------
|
|
// Protected Functions
|
|
//--------------------------------------------------------------------------
|
|
|
|
void addToSegFileList(File& dataFile, // save HWM info per segment file
|
|
HWM hwm);
|
|
void clearMemory(); // clear memory used by this object
|
|
void getCPInfoForBRM(BRMReporter& brmReporter); // Get updated CP info for BRM
|
|
int getHWMInfoForBRM(BRMReporter& brmReporter); // Get updated HWM inf for BRM
|
|
|
|
// Init last input Row number in current "logical" extent; used
|
|
// to track min/max value per extent. 0-based where Row 0 is first
|
|
// valid input row in the import.
|
|
// bIsNewExtent indicates whether to treat as a new extent or not.
|
|
void lastInputRowInExtentInit(bool bIsNewExtent);
|
|
|
|
virtual int resetFileOffsetsNewExtent(const char* hdr);
|
|
// Reset file; start new extent
|
|
void setFileSize(HWM hwm, int abbrevFlag); // Set fileSize data member
|
|
|
|
// Prepare initial column segment file for importing of data.
|
|
// oldHWM - Current HWM prior to initial block skipping. This is only
|
|
// used for abbreviated extents, to detect when block skipping has
|
|
// caused us to require a full expanded extent.
|
|
// newHWM - Starting point for adding data after initial blockskipping
|
|
virtual int setupInitialColumnFile(HWM oldHWM, // original HWM
|
|
HWM newHWM); // new HWM to start from
|
|
|
|
virtual int saveDctnryStoreHWMChunk(bool& needBackup); // Backup Dct HWM Chunk
|
|
int extendColumnNewExtent( // extend column; new extent
|
|
bool saveLBIDForCP, uint16_t dbRootNew, uint32_t partitionNew);
|
|
virtual int extendColumnOldExtent( // extend column; existing extent
|
|
uint16_t dbRootNext, uint32_t partitionNext, uint16_t segmentNext, HWM hwmNext);
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Protected Data Members
|
|
//--------------------------------------------------------------------------
|
|
|
|
boost::mutex fDictionaryMutex; // Mutex for dicionary updates
|
|
boost::mutex fColMutex; // Mutex for column changes
|
|
boost::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue
|
|
boost::mutex fDelayedFileCreateMutex; // Manage delayed file check/create
|
|
Log* fLog; // Object used for logging
|
|
|
|
// Blocks to skip at start of bulk load, if starting file has to be created
|
|
// by createDelayedFileIfNeeded()
|
|
HWM fDelayedFileStartBlksSkipped;
|
|
|
|
// LBID corresponding to initial HWM saved in fSavedHWM at start of import.
|
|
//
|
|
// LBID is used, at the end of the import, to identify to DBRM, an
|
|
// extent whose CasualPartition stats are to be cleared, because we will
|
|
// have written additional rows to that extent as part of an import.
|
|
BRM::LBID_t fSavedLbid;
|
|
|
|
// The last updated LBID. In case `bulk` creates a new extent with new LBID
|
|
// we have to initialize file header with created LBID.
|
|
BRM::LBID_t fLastUpdatedLbid;
|
|
|
|
// Size of a segment file (in bytes) when the file is opened
|
|
// to add the next extent.
|
|
// For compressed data files, this is the "raw" data byte count,
|
|
// not the compressed byte count.
|
|
long long fSizeWrittenStart;
|
|
|
|
// Tracks the size of a segment file (in bytes) as rows are added.
|
|
// For compressed data files, this is the "raw" data byte count,
|
|
// not the compressed byte count.
|
|
long long fSizeWritten;
|
|
|
|
// Tracks last input Row number in the current "logical" extent,
|
|
// where Row number is 0-based, with Row 0 being the first row in the
|
|
// import. Used by parsing thread to track when a read buffer crosses
|
|
// an extent boundary. We detect when a Read buffer crosses an ex-
|
|
// pected extent boundary so that we can track a column's min/max for
|
|
// each extent.
|
|
RID fLastInputRowInCurrentExtent;
|
|
|
|
bool fLoadingAbbreviatedExtent; // Is current extent abbreviated
|
|
ColExtInfBase* fColExtInf; // Used to update CP at end of job
|
|
long long fMaxNumRowsPerSegFile; // Max num rows per segment file
|
|
Dctnry* fStore; // Corresponding dctnry store file
|
|
|
|
// For autoincrement column only... Tracks latest autoincrement value used
|
|
long long fAutoIncLastValue;
|
|
|
|
volatile int64_t fSaturatedRowCnt; // No. of rows with saturated values
|
|
|
|
// List of segment files updated during an import; used to track infor-
|
|
// mation necessary to update the ExtentMap at the "end" of the import.
|
|
std::vector<File> fSegFileUpdateList;
|
|
|
|
TableInfo* fpTableInfo; // pointer to the table info
|
|
ColumnAutoInc* fAutoIncMgr; // Maintains autoIncrement nextValue
|
|
DBRootExtentTracker* fDbRootExtTrk; // DBRoot extent tracker
|
|
|
|
int fColWidthFactor; // Wid factor relative to other cols
|
|
|
|
InitialDBFileStat fDelayedFileCreation; // Denotes when initial DB file is
|
|
// to be created after preprocessing
|
|
|
|
unsigned fRowsPerExtent; // Number of rows per column extent
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Inline functions
|
|
//------------------------------------------------------------------------------
|
|
inline void ColumnInfo::setUIDGID(const uid_t p_uid, const gid_t p_gid)
|
|
{
|
|
WeUIDGID::setUIDGID(p_uid, p_gid);
|
|
if (colOp)
|
|
colOp->setUIDGID(this);
|
|
}
|
|
|
|
inline boost::mutex& ColumnInfo::colMutex()
|
|
{
|
|
return fColMutex;
|
|
}
|
|
|
|
inline void ColumnInfo::getDictFlushBlks(std::vector<BRM::LBID_t>& blks) const
|
|
{
|
|
blks = fDictBlocks;
|
|
}
|
|
|
|
inline int64_t ColumnInfo::getFileSize() const
|
|
{
|
|
return fileSize;
|
|
}
|
|
|
|
inline void ColumnInfo::incSaturatedCnt(int64_t satIncCnt)
|
|
{
|
|
(void)atomicops::atomicAdd(&fSaturatedRowCnt, satIncCnt);
|
|
}
|
|
|
|
inline bool ColumnInfo::isAbbrevExtent()
|
|
{
|
|
return fLoadingAbbreviatedExtent;
|
|
}
|
|
|
|
inline RID ColumnInfo::lastInputRowInExtent() const
|
|
{
|
|
return fLastInputRowInCurrentExtent;
|
|
}
|
|
|
|
inline void ColumnInfo::printCPInfo(JobColumn column)
|
|
{
|
|
fColExtInf->print(column);
|
|
}
|
|
|
|
inline long long ColumnInfo::saturatedCnt()
|
|
{
|
|
return fSaturatedRowCnt;
|
|
}
|
|
|
|
inline void ColumnInfo::relativeColWidthFactor(int colWidFactor)
|
|
{
|
|
fColWidthFactor = colWidFactor;
|
|
}
|
|
|
|
inline unsigned ColumnInfo::rowsPerExtent()
|
|
{
|
|
return fRowsPerExtent;
|
|
}
|
|
|
|
template <typename T>
|
|
inline void ColumnInfo::updateCPInfo(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType, int width)
|
|
{
|
|
fColExtInf->addOrUpdateEntry(lastInputRow, minVal, maxVal, colDataType, width);
|
|
}
|
|
|
|
} // namespace WriteEngine
|