1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

536 lines
18 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/***********************************************************************
* $Id: dmlpackageprocessor.h 9673 2013-07-09 15:59:49Z chao $
*
*
***********************************************************************/
/** @file */
#pragma once
#include <stdexcept>
#include <string>
#include <sstream>
#include <iostream>
#include <iomanip>
#include <boost/any.hpp>
#include "calpontdmlpackage.h"
#include "calpontsystemcatalog.h"
#include "we_type.h"
#include "writeengine.h"
#include "messageobj.h"
#include "sessionmanager.h"
#include "distributedenginecomm.h"
#include "brmtypes.h"
#include "../../writeengine/client/we_clients.h"
#include "liboamcpp.h"
#include "oamcache.h"
#include "querystats.h"
#include "clientrotator.h"
#define EXPORT
//#define IDB_DML_DEBUG
namespace dmlpackageprocessor
{
typedef std::vector<std::string> dicStrValues;
#define SUMMARY_INFO(message) \
if (isDebug(SUMMARY)) \
{ \
std::cerr << message << std::endl; \
}
#define DETAIL_INFO(message) \
if (isDebug(DETAIL)) \
{ \
std::cerr << message << std::endl; \
}
#define VERBOSE_INFO(message) \
if (isDebug(VERBOSE)) \
{ \
std::cerr << message << std::endl; \
}
typedef std::vector<uint64_t> rids;
/** @brief abstract class that the defines the general interface and
* implemetation of a DMLPackageProcessor
*/
class DMLPackageProcessor
{
public:
/** @brief Result code
*/
enum ResultCode
{
NO_ERROR,
INSERT_ERROR,
NETWORK_ERROR,
NOTNULL_VIOLATION,
CHECK_VIOLATION,
DELETE_ERROR,
UPDATE_ERROR,
INDEX_UPDATE_ERROR,
COMMAND_ERROR,
TOKEN_ERROR,
NOT_ACCEPTING_PACKAGES,
DEAD_LOCK_ERROR,
REFERENCE_VIOLATION,
IDBRANGE_WARNING,
VB_OVERFLOW_ERROR,
ACTIVE_TRANSACTION_ERROR,
TABLE_LOCK_ERROR,
JOB_ERROR,
JOB_CANCELED,
DBRM_READ_ONLY,
PP_LOST_CONNECTION
};
enum DebugLevel /** @brief Debug level type enumeration */
{
NONE = 0, /** @brief No debug info */
SUMMARY = 1, /** @brief Summary level debug info */
DETAIL = 2, /** @brief A little detail debug info */
VERBOSE = 3, /** @brief Detailed debug info */
};
/** @brief the result of dml operations
*/
struct DMLResult
{
/** @brief the result code
*/
ResultCode result;
/** @brief the error message if result != NO_ERROR
*/
logging::Message message;
/** @brief the rowCount
*/
long long rowCount;
std::string tableLockInfo;
// query stats;
std::string queryStats;
std::string extendedStats;
std::string miniStats;
querystats::QueryStats stats;
DMLResult() : result(NO_ERROR), rowCount(0){};
};
/** @brief a structure to hold a date
*/
struct Date
{
unsigned spare : 6;
unsigned day : 6;
unsigned month : 4;
unsigned year : 16;
// NULL column value = 0xFFFFFFFE
Date()
{
year = 0xFFFF;
month = 0xF;
day = 0x3F;
spare = 0x3E;
}
};
/** @brief ctor
*/
DMLPackageProcessor(BRM::DBRM* aDbrm, uint32_t sid)
: fEC(0), DMLLoggingId(21), fRollbackPending(false), fDebugLevel(NONE)
{
try
{
fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DMLPROC);
// std::cout << "In DMLPackageProcessor constructor " << this << std::endl;
fPMCount = fWEClient->getPmCount();
}
catch (...)
{
std::cout << "Cannot make connection to WES" << std::endl;
}
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
fDbRootPMMap = oamCache->getDBRootToPMMap();
fDbrm = aDbrm;
fSessionID = sid;
fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr");
// std::cout << " fSessionID is " << fSessionID << std::endl;
fExeMgr->connect(0.005);
}
/** @brief destructor
*/
EXPORT virtual ~DMLPackageProcessor();
/** @brief Is it required to debug
*/
inline bool isDebug(const DebugLevel level) const
{
return level <= fDebugLevel;
}
/**
* @brief Get debug level
*/
inline DebugLevel getDebugLevel() const
{
return fDebugLevel;
}
// int rollBackTransaction(uint64_t uniqueId, uint32_t txnID, uint32_t sessionID, std::string & errorMsg);
/**
* @brief Set debug level
*/
inline void setDebugLevel(const DebugLevel level)
{
fDebugLevel = level;
}
/**
* @brief Set the Distributed Engine Comm object
*/
inline void setEngineComm(joblist::DistributedEngineComm* ec)
{
fEC = ec;
}
/** @brief process the dml package
*
* @param cpackage the CalpontDMLPackage to process
*/
DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage);
/** @brief Check that give exception is related to PP lost connection.
*/
bool checkPPLostConnection(std::exception& ex);
inline void setRM(joblist::ResourceManager* frm)
{
fRM = frm;
};
EXPORT int rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
std::string& errorMsg);
EXPORT int32_t tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
std::string& errorMsg);
EXPORT int rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
const uint32_t tableOid, std::string& errorMsg);
/**
* @brief convert a columns data, represnted as a string, to it's native
* data type
*
* @param type the columns database type
* @param data the columns string representation of it's data
*/
// static boost::any convertColumnData( execplan::CalpontSystemCatalog::ColType colType,
// const std::string& dataOrig );
/**
* @brief convert a columns data from native format to a string
*
* @param type the columns database type
* @param data the columns string representation of it's data
*/
// static std::string dateToString( int datevalue );
/** @brief validate numerical string
*
* @param result the result structure
* @param data
*/
// static bool numer_value( std::string data );
/** @brief check whether the given year is leap year
*
* @param year
*/
// static bool isLeapYear ( int year);
/** @brief check whether the given date valid
*
* @param year, month, day
*/
// static bool isDateValid ( int day, int month, int year);
/** @brief check whether the given datetime valid
*
* @param hour, minute, second, microSecond
*/
// static bool isDateTimeValid ( int hour, int minute, int second, int microSecond);
/** @brief convert month from string to integer
*
* @param month
*/
// static int convertMonth (std::string month);
/** @brief tokenize date or datetime string
*
* @param data
*/
// static void tokenTime (std::string data, std::vector<std::string>& dataList);
/** @brief Access the rollback pending flag
*/
bool getRollbackPending()
{
return fRollbackPending;
}
/** @brief Set the rollback pending flag
*/
void setRollbackPending(bool rollback)
{
fRollbackPending = rollback;
}
protected:
/** @brief update the indexes on the target table
*
* @param schema the schema name
* @param table the table name
* @param rows the list of rows
* @param result the result structure
* @param updateFlag 0: delete, 1: update, 2 insert
* @param colNameList the updated column names, only valid for update SQL statement
*/
bool updateIndexes(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, const std::string& schema,
const std::string& table, const dmlpackage::RowList& rows, DMLResult& result,
std::vector<WriteEngine::RID> ridList, WriteEngine::ColValueList& colValuesList,
std::vector<std::string>& colNameList, const char updateFlag,
std::vector<dicStrValues>& dicStrValCols);
/** @brief delete the indexes on the target table
*
* @param schema the schema name
* @param table the table name
* @param rows the list of rows
* @param result the result structure
*/
bool deleteIndexes(const std::string& schema, const std::string& table, const dmlpackage::RowList& rows,
DMLResult& result);
/** @brief validate that none of the columns in the supplied row(s) violate
* any defined constraints
*
* @param schema the schema name
* @param table the table name
* @param rows the lists of rows to check column constraints on
* @param result the result structure
*/
bool violatesConstraints(uint32_t sessionID, const std::string& schema, const std::string& table,
const dmlpackage::RowList& rows, DMLResult& result);
/** @brief validate that the column does not violate a unique constraint
*
* @param column the column to validate
* @param result the result structure
*/
bool violatesUniqueConstraint(const dmlpackage::RowList& rows,
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
unsigned int sessionID, DMLResult& result);
/** @brief validate that the column does not violate a check constraint
*
* @param column the column to validate
* @param result the result structure
*/
bool violatesCheckConstraint(const dmlpackage::RowList& rows,
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
unsigned int sessionID, unsigned int colOffset, DMLResult& result);
/** @brief validate that the column does not violate a not null constraint
*
* @param type the columns database type
* @param column the column to validate
* @param result the result structure
*/
bool violatesNotNullConstraint(const dmlpackage::RowList& rows, unsigned int colOffset, DMLResult& result);
/** @brief validate that the column does not violate a reference (foreign key) constraint
*
* @param rows the row set to validate
* @param colOffset the offset of this column in the row
* @param constraintInfo the column constraint infomation
* @param result the result structure
*/
bool violatesReferenceConstraint(const dmlpackage::RowList& rows, unsigned int colOffset,
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
DMLResult& result);
/** @brief validate that non of the rows deleted from this table violate a reference
* (foreign key) constraint of the reference table.
*
* @param schema the schema name
* @param table the table name
* @param rows the lists of rows to check column constraints on
* @param result the result structure
*/
bool violatesPKRefConnstraint(uint32_t sessionID, const std::string& schema, const std::string& table,
const dmlpackage::RowList& rows,
const WriteEngine::ColValueList& oldValueList, DMLResult& result);
bool violatesPKRefConnstraint(uint32_t sessionID, const std::string& schema, const std::string& table,
std::vector<WriteEngine::RID>& rowIDList, std::vector<void*>& oldValueList,
DMLResult& result);
/** @brief validate that the rows deleted does not violate a reference (foreign key) constraint
*
* @param rows the row set to validate
* @param colOffset the offset of this column in the row
* @param constraintInfo the column constraint infomation
* @param result the result structure
*/
bool violatesReferenceConstraint_PK(const WriteEngine::ColValueList& oldValueList,
const execplan::CalpontSystemCatalog::ColType& colType,
unsigned int colOffset,
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
DMLResult& result);
bool violatesReferenceConstraint_PK(std::vector<void*>& oldValueList, const int totalRows,
const execplan::CalpontSystemCatalog::ColType& colType,
unsigned int colOffset,
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
DMLResult& result);
/** @brief validate that none of the columns in the update row(s) violate
* any reference constraints of the foreign key table
*
* @param schema the schema name
* @param table the table name
* @param rows the lists of rows to check column constraints on
* @param result the result structure
*/
bool violatesUpdtRefConstraints(uint32_t sessionID, const std::string& schema, const std::string& table,
const dmlpackage::RowList& rows, DMLResult& result);
/** @brief validate that the column does not violate a reference (foreign key) constraint
*
* @param rows the row set to validate
* @param colOffset the offset of this column in the row
* @param constraintInfo the column constraint infomation
* @param result the result structure
*/
bool violatesReferenceConstraint_updt(const dmlpackage::RowList& rows, unsigned int colOffset,
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
DMLResult& result);
/** @brief get the column list for the supplied table
*
* @param schema the schema name
* @param table the table name
* @param colList ColumnList to fill with the columns for the supplied table
*/
void getColumnsForTable(uint32_t sessionID, std::string schema, std::string table,
dmlpackage::ColumnList& colList);
/** @brief convert absolute rid to relative rid in a segement file
*
* @param rid in:the absolute rid out: relative rid in a segement file
* @param dbRoot,partition, segment the extent information obtained from rid
* @param filesPerColumnPartition,extentRows, extentsPerSegmentFile the extent map parameters
* @param startDBRoot the dbroot this table starts
* @param dbrootCnt the number of dbroot in db
*/
void convertRidToColumn(uint64_t& rid, unsigned& dbRoot, unsigned& partition, unsigned& segment,
unsigned filesPerColumnPartition, unsigned extentsPerSegmentFile,
unsigned extentRows, unsigned startDBRoot, unsigned dbrootCnt,
const unsigned startPartitionNum);
inline bool isDictCol(execplan::CalpontSystemCatalog::ColType colType)
{
if (((colType.colDataType == execplan::CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) ||
((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) ||
((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 38)) ||
(colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY) ||
(colType.colDataType == execplan::CalpontSystemCatalog::BLOB) ||
(colType.colDataType == execplan::CalpontSystemCatalog::TEXT))
{
return true;
}
else
return false;
}
/** @brief convert an error code to a string
*
* @param ec in:the error code received
* @returns error string
*/
std::string projectTableErrCodeToMsg(uint32_t ec);
// bool validateNextValue(execplan::CalpontSystemCatalog::ColType colType, int64_t value, bool &
// offByOne);
bool validateVarbinaryVal(std::string& inStr);
int commitTransaction(uint64_t uniqueId, BRM::TxnID txnID);
int commitBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, const uint32_t tableOid,
std::string& errorMsg);
int commitBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID, const uint32_t tableOid,
std::string& errorMsg);
int rollBackBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
const uint32_t tableOid, std::string& errorMsg);
int flushDataFiles(int rc, std::map<uint32_t, uint32_t>& columnOids, uint64_t uniqueId, BRM::TxnID txnID,
uint32_t tableOid);
int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success);
/** @brief the Session Manager interface
*/
execplan::SessionManager fSessionManager;
joblist::DistributedEngineComm* fEC;
joblist::ResourceManager* fRM;
char* strlower(char* in);
uint32_t fSessionID;
const unsigned DMLLoggingId;
uint32_t fPMCount;
WriteEngine::WEClients* fWEClient;
BRM::DBRM* fDbrm;
boost::shared_ptr<std::map<int, int> > fDbRootPMMap;
oam::Oam fOam;
bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in
// preparation for a Rollback
execplan::ClientRotator* fExeMgr;
private:
virtual DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) = 0;
/** @brief clean beginning and ending glitches and spaces from string
*
* @param s string to be cleaned
*/
void cleanString(std::string& s);
DebugLevel fDebugLevel; // internal use debug level
const std::string PPLostConnectionErrorCode = "MCS-2045";
};
/** @brief helper template function to do safe from string to type conversions
*
*/
template <class T>
bool from_string(T& t, const std::string& s, std::ios_base& (*f)(std::ios_base&))
{
std::istringstream iss(s);
return !(iss >> f >> t).fail();
}
} // namespace dmlpackageprocessor
#undef EXPORT