1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-10-22 22:32:34 +03:00
Files
mariadb-columnstore-engine/writeengine/wrapper/writeengine.h
2018-02-02 14:53:36 +00:00

759 lines
27 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
// $Id: writeengine.h 4726 2013-08-07 03:38:36Z bwilkinson $
/** @file */
#ifndef _WRITE_ENGINE_H_
#define _WRITE_ENGINE_H_
#include <stdio.h>
#include <string>
// the header file for fd
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// end
#include <boost/lexical_cast.hpp>
#ifdef _MSC_VER
#include <unordered_map>
#else
#include <tr1/unordered_map>
#endif
#include "we_brm.h"
#include "we_colop.h"
#include "we_index.h"
#include "we_tablemetadata.h"
#include "we_dbrootextenttracker.h"
#include "we_rbmetawriter.h"
#include "brmtypes.h"
#include "we_chunkmanager.h"
#define IO_BUFF_SIZE 81920
#if defined(_MSC_VER) && defined(WRITEENGINE_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
/** Namespace WriteEngine */
namespace WriteEngine
{
//... Total compression operation: un_compresssed, compressed
const int UN_COMPRESSED_OP = 0;
const int COMPRESSED_OP = 1;
const int TOTAL_COMPRESS_OP = 2;
//...Forward class declarations
class Log;
// Bug4312. During transactions, we need to mark each extent modified as invalid.
// In order to prevent thrashing, marking the same extent everytime we get an lbid
// for an extent, we remember the starting lbid for each extent marked for the
// transaction. We also add a sequence number so we can age them out of the list
// for truly long running transactions.
struct TxnLBIDRec
{
std::tr1::unordered_map<BRM::LBID_t, uint32_t> m_LBIDMap;
uint32_t m_lastSeqnum;
uint32_t m_squashedLbids;
TxnLBIDRec() : m_lastSeqnum(0), m_squashedLbids(0) {};
~TxnLBIDRec() {}
void AddLBID(BRM::LBID_t lbid)
{
m_LBIDMap[lbid] = ++m_lastSeqnum;
}
};
typedef boost::shared_ptr<TxnLBIDRec> SP_TxnLBIDRec_t;
typedef std::set<BRM::LBID_t> dictLBIDRec_t;
/** Class WriteEngineWrapper */
class WriteEngineWrapper : public WEObj
{
public:
/**
* @brief Constructor
*/
EXPORT WriteEngineWrapper();
EXPORT WriteEngineWrapper(const WriteEngineWrapper& rhs);
/**
* @brief Default Destructor
*/
EXPORT ~WriteEngineWrapper();
/************************************************************************
* Interface definitions
************************************************************************/
/**
* @brief Performs static/global initialization for BRMWrapper.
* Should be called once from the main thread.
*/
EXPORT static void init(unsigned subSystemID);
/**
* @brief Build a index from an oid file (NOTE: this is write engine internal used function, just for test purpose and not for generic use
*/
int buildIndex(const OID& colOid, const OID& treeOid, const OID& listOid,
execplan::CalpontSystemCatalog::ColDataType colDataType, int width, int hwm,
bool resetFile, uint64_t& totalRows, int maxRow = IDX_DEFAULT_READ_ROW)
{
return -1;
}
/**
* @brief Build a index from a file
*/
int buildIndex(const std::string& sourceFileName, const OID& treeOid, const OID& listOid,
execplan::CalpontSystemCatalog::ColDataType colDataType, int width, int hwm, bool resetFile,
uint64_t& totalRows, const std::string& indexName, Log* pLogger,
int maxRow = IDX_DEFAULT_READ_ROW)
{
return -1;
}
/**
* @brief Close a index file
*/
void closeIndex() { }
/**
* @brief Close a dictionary
*/
int closeDctnry(const TxnID& txnid, int i, bool realClose = true)
{
return m_dctnry[op(i)]->closeDctnry(realClose);
}
/**
* @brief Commit transaction
*/
int commit(const TxnID& txnid)
{
m_txnLBIDMap.erase(txnid);
return BRMWrapper::getInstance()->commit(txnid);
}
/**
* @brief Convert interface value list to internal value array
*/
EXPORT void convertValArray(size_t totalRow, const ColType colType,
ColTupleList& curTupleList, void* valArray,
bool bFromList = true) ;
/**
* @brief Create a column, include object ids for column data and bitmap files
* @param dataOid column datafile object id
* @param dataType column data type
* @param dataWidth column width
* @param dbRoot DBRoot under which file is to be located (1-based)
* @param partition Starting partition number for segment file path (0-based).
* @param compressionType compression type
*/
EXPORT int createColumn(const TxnID& txnid, const OID& dataOid,
execplan::CalpontSystemCatalog::ColDataType dataType, int dataWidth,
uint16_t dbRoot, uint32_t partition = 0, int compressionType = 0);
//BUG931
/**
* @brief Fill a new column with default value using row-ids from a reference column
*
* @param txnid Transaction id
* @param dataOid OID of the new column
* @param dataType Data-type of the new column
* @param dataWidth Width of the new column
* @param defaultVal Default value to be filled in the new column
* @param refColOID OID of the reference column
* @param refColDataType Data-type of the referecne column
* @param refColWidth Width of the reference column
*/
EXPORT int fillColumn(const TxnID& txnid, const OID& dataOid, execplan::CalpontSystemCatalog::ColDataType dataType,
int dataWidth, ColTuple defaultVal,
const OID& refColOID, execplan::CalpontSystemCatalog::ColDataType refColDataType,
int refColWidth, int refCompressionType, bool isNULL, int compressionType,
const std::string& defaultValStr, const OID& dictOid = 0, bool autoincrement = false);
/**
* @brief Create a index related files, include object ids for index tree and list files
* @param treeOid index tree file object id
* @param listOid index list file object id
*/
int createIndex(const TxnID& txnid, const OID& treeOid, const OID& listOid)
{
int rc = -1;
return rc;
}
/**
* @brief Create dictionary
* @param dctnryOid dictionary signature file object id
* @param partition Starting partition number for segment file path (0-based).
* @param segment segment number
* @param compressionType compression type
*/
EXPORT int createDctnry(const TxnID& txnid, const OID& dctnryOid,
int colWidth, uint16_t dbRoot,
uint32_t partiotion = 0, uint16_t segment = 0, int compressionType = 0);
/**
* @brief Delete a list of rows from a table
* @param colStructList column struct list
* @param colOldValueList column old values list (return value)
* @param rowIdList row id list
*/
EXPORT int deleteRow(const TxnID& txnid, std::vector<ColStructList>& colExtentsStruct,
std::vector<void*>& colOldValueList, std::vector<RIDList>& ridLists, const int32_t tableOid);
/**
* @brief Delete a list of rows from a table
* @param colStructList column struct list
* @param rowIdList row id list
*/
EXPORT int deleteBadRows(const TxnID& txnid, ColStructList& colStructs,
RIDList& ridList, DctnryStructList& dctnryStructList);
/**
* @brief delete a dictionary signature and its token
* @param dctnryStruct dictionary structure
* @param dctnryTuple dictionary tuple
*/
//ITER17_Obsolete
// int deleteToken(const TxnID& txnid, Token& token); // Files need already open
// int deleteToken(const TxnID& txnid, DctnryStruct& dctnryStruct, Token& token);
/**
* @brief Drop a column, include object ids for column data file
* @param dataOid column datafile object id
*/
int dropColumn(const TxnID& txnid, const OID dataOid)
{
return m_colOp[0]->dropColumn((FID) dataOid);
}
/**
* @brief Drop files
* @param dataOids column and dictionary datafile object id
*/
int dropFiles(const TxnID& txnid, const std::vector<int32_t>& dataOids)
{
return m_colOp[0]->dropFiles(dataOids);
}
/**
* @brief Delete files for one partition
* @param dataOids column and dictionary datafile object id
*/
int deletePartitions(const std::vector<OID>& dataOids,
const std::vector<BRM::PartitionInfo>& partitions)
{
return m_colOp[0]->dropPartitions(dataOids, partitions);
}
int deleteOIDsFromExtentMap (const TxnID& txnid, const std::vector<int32_t>& dataOids)
{
return m_colOp[0]->deleteOIDsFromExtentMap(dataOids);
}
/**
* @brief Create a index related files, include object ids for index tree and list files
* @param treeOid index tree file object id
* @param listOid index list file object id
*/
int dropIndex(const TxnID& txnid, const OID& treeOid, const OID& listOid)
{
return -1;
}
/**
* @brief Drop a dictionary
* @param dctnryOid dictionary signature file object id
* @param treeOid dictionary tree file object id
* @param listOid index list file object id
*/
int dropDctnry(const TxnID& txnid, const OID& dctnryOid, const OID& treeOid, const OID& listOid)
{
return m_dctnry[0]->dropDctnry(dctnryOid);
}
/**
* @brief Flush VM write cache
* @param None
*/
EXPORT void flushVMCache() const;
/**
* @brief Insert values into a table
* @param colStructList column structure list
* @param colValueList column value list
* @param dicStringListt dictionary values list
* @param dbRootExtentTrackers dbrootTrackers
* @param bFirstExtentOnThisPM true when there is no extent on this PM
* @param insertSelect if insert with select, the hwm block is skipped
* @param isAutoCommitOn if autocommit on, only the hwm block is versioned,
* else eveything is versioned
* @param tableOid used to get table meta data
* @param isFirstBatchPm to track if this batch is first batch for this PM.
*/
EXPORT int insertColumnRecs(const TxnID& txnid,
ColStructList& colStructList,
ColValueList& colValueList,
DctnryStructList& dctnryStructList,
DictStrList& dictStrList,
std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
RBMetaWriter* fRBMetaWriter,
bool bFirstExtentOnThisPM,
bool insertSelect = false,
bool isAutoCommitOn = false,
OID tableOid = 0,
bool isFirstBatchPm = false);
EXPORT int insertColumnRecsBinary(const TxnID& txnid,
ColStructList& colStructList,
std::vector<uint64_t>& colValueList,
DctnryStructList& dctnryStructList,
DictStrList& dictStrList,
std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
RBMetaWriter* fRBMetaWriter,
bool bFirstExtentOnThisPM,
bool insertSelect = false,
bool isAutoCommitOn = false,
OID tableOid = 0,
bool isFirstBatchPm = false);
/**
* @brief Insert values into systables
* @param colStructList column structure list
* @param colValueList column value list
* @param dicStringListt dictionary values list
*/
EXPORT int insertColumnRec_SYS(const TxnID& txnid,
ColStructList& colStructList,
ColValueList& colValueList,
DctnryStructList& dctnryStructList,
DictStrList& dictStrList,
const int32_t tableOid);
/**
* @brief Insert a row
* @param colStructList column structure list
* @param colValueList column value list
* @param dicStringListt dictionary values list
*/
EXPORT int insertColumnRec_Single(const TxnID& txnid,
ColStructList& colStructList,
ColValueList& colValueList,
DctnryStructList& dctnryStructList,
DictStrList& dictStrList,
const int32_t tableOid);
/**
* @brief Open dictionary
* @param txnid relevant transaction
* @param dctnryStruct dictionary column to open
* @param useTmpSuffix Bulk HDFS usage: use *.tmp file suffix
*/
// @bug 5572 - HDFS usage: add *.tmp file backup flag
int openDctnry(const TxnID& txnid, DctnryStruct dctnryStruct, bool useTmpSuffix)
{
int compress_op = op(dctnryStruct.fCompressionType);
m_dctnry[compress_op]->setTransId(txnid);
return m_dctnry[compress_op]->openDctnry(
dctnryStruct.dctnryOid,
dctnryStruct.fColDbRoot,
dctnryStruct.fColPartition,
dctnryStruct.fColSegment,
useTmpSuffix);
}
/**
* @brief Rollback transaction (common portion)
*/
EXPORT int rollbackCommon(const TxnID& txnid, int sessionId);
/**
* @brief Rollback transaction
*/
EXPORT int rollbackTran(const TxnID& txnid, int sessionId);
/**
* @brief Rollback transaction
*/
EXPORT int rollbackBlocks(const TxnID& txnid, int sessionId);
/**
* @brief Rollback transaction
*/
EXPORT int rollbackVersion(const TxnID& txnid, int sessionId);
/**
* @brief Set the IsInsert flag in the ChunkManagers.
* This forces flush at end of block. Used only for bulk insert.
*/
void setIsInsert(bool bIsInsert)
{
m_colOp[COMPRESSED_OP]->chunkManager()->setIsInsert(bIsInsert);
m_dctnry[COMPRESSED_OP]->chunkManager()->setIsInsert(true);
}
/**
* @brief Get the IsInsert flag as set in the ChunkManagers.
* Since both chunk managers are supposed to be in lockstep as regards the
* isInsert flag, we need only grab one.
*
*/
bool getIsInsert()
{
return m_colOp[COMPRESSED_OP]->chunkManager()->getIsInsert();
}
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>& getTxnMap()
{
return m_txnLBIDMap;
};
std::tr1::unordered_map<TxnID, dictLBIDRec_t>& getDictMap()
{
return m_dictLBIDMap;
};
/**
* @brief Flush the ChunkManagers.
*/
int flushChunks(int rc, const std::map<FID, FID>& columOids)
{
int rtn1 = m_colOp[COMPRESSED_OP]->chunkManager()->flushChunks(rc, columOids);
int rtn2 = m_dctnry[COMPRESSED_OP]->chunkManager()->flushChunks(rc, columOids);
return (rtn1 != NO_ERROR ? rtn1 : rtn2);
}
/**
* @brief Set the transaction id into all fileops
*/
void setTransId(const TxnID& txnid)
{
for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
{
m_colOp[i]->setTransId(txnid);
m_dctnry[i]->setTransId(txnid);
}
}
/**
* @brief Set the fIsBulk id into all fileops
*/
void setBulkFlag(bool isBulk)
{
for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
{
m_colOp[i]->setBulkFlag(isBulk);
m_dctnry[i]->setBulkFlag(isBulk);
}
}
/**
* @brief Set the fIsFix into all fileops
*/
void setFixFlag(bool isFix = false)
{
for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
{
m_colOp[i]->setFixFlag(isFix);
m_dctnry[i]->setFixFlag(isFix);
}
}
/**
* @brief let chunkmanager start transaction.
*
*/
int startTransaction(const TxnID& txnid)
{
int rc = 0;
rc = m_colOp[COMPRESSED_OP]->chunkManager()->startTransaction(txnid);
//if ( rc == 0)
// rc = m_dctnry[COMPRESSED_OP]->chunkManager()->startTransaction(txnid);
return rc;
}
/**
* @brief let chunkmanager confirm transaction.
*
*/
int confirmTransaction (const TxnID& txnid)
{
int rc = 0;
rc = m_colOp[COMPRESSED_OP]->chunkManager()->confirmTransaction (txnid);
return rc;
}
/**
* @brief let chunkmanager end transaction.
*
*/
int endTransaction(const TxnID& txnid, bool success)
{
int rc = 0;
rc = m_colOp[COMPRESSED_OP]->chunkManager()->endTransaction(txnid, success);
//if ( rc == 0)
// rc = m_dctnry[COMPRESSED_OP]->chunkManager()->endTransaction(txnid, success);
return rc;
}
/**
* @brief Tokenize a dictionary signature into a token
* @param dctnryStruct dictionary structure
* @param dctnryTuple dictionary tuple
* @param useTmpSuffix Bulk HDFS usage: use *.tmp file suffix
*/
EXPORT int tokenize(const TxnID& txnid, DctnryTuple&, int compType ); // Files need open first
EXPORT int tokenize(const TxnID& txnid, DctnryStruct& dctnryStruct, DctnryTuple& dctnryTuple,
bool useTmpSuffix);
/**
* @brief Update values into a column (New one)
* @param colStructList column structure list
* @param colValueList column value list
* @param colOldValueList column old values list (return value)
* @param ridList row id list
*/
EXPORT int updateColumnRec(const TxnID& txnid,
std::vector<ColStructList>& colExtentsStruct,
ColValueList& colValueList,
std::vector<void*>& colOldValueList,
std::vector<RIDList>& ridLists,
std::vector<DctnryStructList>& dctnryExtentsStruct,
DctnryValueList& dctnryValueList,
const int32_t tableOid);
/**
* @brief Update values into columns
* @param colStructList column structure list
* @param colValueList column value list
* @param ridList row id list
*/
EXPORT int updateColumnRecs(const TxnID& txnid,
std::vector<ColStruct>& colStructList,
ColValueList& colValueList,
const RIDList& ridLists,
const int32_t tableOid);
/**
* @brief Release specified table lock.
* @param lockID Table lock id to be released.
* @param errorMsg Return error message
*/
EXPORT int clearTableLockOnly(uint64_t lockID,
std::string& errorMsg);
/**
* @brief Rollback the specified table
* @param tableOid Table to be rolled back
* @param lockID Table lock id of the table to be rolled back.
* Currently used for logging only.
* @param tableName Name of table associated with tableOid.
* Currently used for logging only.
* @param applName Application that is driving this bulk rollback.
* Currently used for logging only.
* @param debugConsole Enable logging to console
* @param errorMsg Return error message
*/
EXPORT int bulkRollback(OID tableOid,
uint64_t lockID,
const std::string& tableName,
const std::string& applName,
bool debugConsole, std::string& errorMsg);
/**
* @brief update SYSCOLUMN next value
* @param oidValueMap
*/
EXPORT int updateNextValue(const TxnID txnId, const OID& columnoid, const uint64_t nextVal, const uint32_t sessionID, const uint16_t dbRoot);
/**
* @brief write active datafiles to disk
*
*/
EXPORT int flushDataFiles(int rc, const TxnID txnId, std::map<FID, FID>& columnOids);
/**
* @brief Process versioning for batch insert - only version the hwm block.
*/
EXPORT int processBatchVersions(const TxnID& txnid, std::vector<Column> columns, std::vector<BRM::LBIDRange>& rangeList);
EXPORT void writeVBEnd(const TxnID& txnid, std::vector<BRM::LBIDRange>& rangeList);
/************************************************************************
* Future implementations
************************************************************************/
/**
* @brief Begin transaction
*/
// todo: add implementation when we work on version control
// int beginTran(const TransID transOid) { return NO_ERROR; }
/**
* @brief End transaction
*/
// todo: add implementation when we work on version control
// int endTran(const TransID transOid) { return NO_ERROR; }
/************************************************************************
* Internal use definitions
************************************************************************/
private:
/**
* @brief Check whether the passing parameters are valid
*/
int checkValid(const TxnID& txnid, const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const;
/**
* @brief Convert interface column type to a internal column type
*/
// void convertColType(void* curStruct, const FuncType curType = FUNC_WRITE_ENGINE) const;
void convertValue(const ColType colType, void* valArray, size_t pos, boost::any& data, bool fromList = true);
/**
* @brief Convert column value to its internal representation
*
* @param colType Column data-type
* @param value Memory pointer for storing output value. Should be pre-allocated
* @param data Column data
*/
void convertValue(const ColType colType, void* value, boost::any& data);
/**
* @brief Print input value from DDL/DML processors
*/
void printInputValue(const ColStructList& colStructList, const ColValueList& colValueList, const RIDList& ridList) const;
/**
* @brief Process version buffer
*/
int processVersionBuffer(IDBDataFile* pFile, const TxnID& txnid, const ColStruct& colStruct,
int width, int totalRow, const RID* rowIdArray,
std::vector<BRM::LBIDRange>& rangeList);
/**
* @brief Process version buffers for update and delete @Bug 1886,2870
*/
int processVersionBuffers(IDBDataFile* pFile, const TxnID& txnid, const ColStruct& colStruct,
int width, int totalRow, const RIDList& ridList,
std::vector<BRM::LBIDRange>& rangeList);
int processBeginVBCopy(const TxnID& txnid, const std::vector<ColStruct>& colStructList, const RIDList& ridList,
std::vector<BRM::VBRange>& freeList, std::vector<std::vector<uint32_t> >& fboLists,
std::vector<std::vector<BRM::LBIDRange> >& rangeLists, std::vector<BRM::LBIDRange>& rangeListTot);
void setDebugLevel(const DebugLevel level)
{
WEObj::setDebugLevel(level);
for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
{
m_colOp[i]->setDebugLevel(level);
m_dctnry[i]->setDebugLevel(level);
}
} // todo: cleanup
/**
* @brief Common methods to write values to a column
*/
int writeColumnRec(const TxnID& txnid, const ColStructList& colStructList,
ColValueList& colValueList,
RID* rowIdArray, const ColStructList& newColStructList,
ColValueList& newColValueList, const int32_t tableOid,
bool useTmpSuffix, bool versioning = true);
int writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList,
std::vector<uint64_t>& colValueList,
RID* rowIdArray, const ColStructList& newColStructList,
std::vector<uint64_t>& newColValueList,
const int32_t tableOid,
bool useTmpSuffix, bool versioning = true);
//@Bug 1886,2870 pass the address of ridList vector
int writeColumnRec(const TxnID& txnid, const ColStructList& colStructList,
const ColValueList& colValueList, std::vector<void*>& colOldValueList,
const RIDList& ridList, const int32_t tableOid,
bool convertStructFlag = true, ColTupleList::size_type nRows = 0);
//For update column from column to use
int writeColumnRecords(const TxnID& txnid, std::vector<ColStruct>& colStructList,
ColValueList& colValueList, const RIDList& ridLists,
const int32_t tableOid, bool versioning = true);
/**
* @brief util method to convert rowid to a column file
*
*/
int convertRidToColumn(RID& rid, uint16_t& dbRoot, uint32_t& partition, uint16_t& segment,
const RID filesPerColumnPartition, const RID extentsPerSegmentFile,
const RID extentRows, uint16_t startDBRoot, unsigned dbrootCnt);
void AddDictToList(const TxnID txnid, std::vector<BRM::LBID_t>& lbids);
void RemoveTxnFromDictMap(const TxnID txnid);
// Bug 4312: We use a hash set to hold the set of starting LBIDS for a given
// txn so that we don't waste time marking the same extent as invalid. This
// list should be trimmed if it gets too big.
int AddLBIDtoList(const TxnID txnid,
std::vector<BRM::LBID_t>& lbids,
std::vector<execplan::CalpontSystemCatalog::ColDataType>& colDataTypes,
const ColStruct& colStruct,
const int fbo);
int RemoveTxnFromLBIDMap(const TxnID txnid);
int op(int compressionType)
{
return (compressionType > 0 ? COMPRESSED_OP : UN_COMPRESSED_OP);
}
// This is a Map of sets of LBIDS for each transaction. A Transaction's list will be removed upon commit or rollback.
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t> m_txnLBIDMap;
// MCOL-1160: We need to track dictionary LBIDs so we can tell PrimProc
// to flush the blocks after an API bulk-write.
std::tr1::unordered_map<TxnID, dictLBIDRec_t> m_dictLBIDMap;
ColumnOp* m_colOp[TOTAL_COMPRESS_OP]; // column operations
Dctnry* m_dctnry[TOTAL_COMPRESS_OP]; // dictionary operations
OpType m_opType; // operation type
DebugLevel m_debugLevel; // debug level
};
} //end of namespace
#undef EXPORT
#endif // _WRITE_ENGINE_H_