mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
536 lines
18 KiB
C++
536 lines
18 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/***********************************************************************
|
|
* $Id: dmlpackageprocessor.h 9673 2013-07-09 15:59:49Z chao $
|
|
*
|
|
*
|
|
***********************************************************************/
|
|
/** @file */
|
|
|
|
#pragma once
|
|
#include <stdexcept>
|
|
#include <string>
|
|
#include <sstream>
|
|
#include <iostream>
|
|
#include <iomanip>
|
|
#include <boost/any.hpp>
|
|
#include "calpontdmlpackage.h"
|
|
#include "calpontsystemcatalog.h"
|
|
#include "we_type.h"
|
|
#include "writeengine.h"
|
|
#include "messageobj.h"
|
|
#include "sessionmanager.h"
|
|
#include "distributedenginecomm.h"
|
|
#include "brmtypes.h"
|
|
#include "../../writeengine/client/we_clients.h"
|
|
#include "liboamcpp.h"
|
|
#include "oamcache.h"
|
|
#include "querystats.h"
|
|
#include "clientrotator.h"
|
|
|
|
#define EXPORT
|
|
|
|
//#define IDB_DML_DEBUG
|
|
namespace dmlpackageprocessor
|
|
{
|
|
typedef std::vector<std::string> dicStrValues;
|
|
#define SUMMARY_INFO(message) \
|
|
if (isDebug(SUMMARY)) \
|
|
{ \
|
|
std::cerr << message << std::endl; \
|
|
}
|
|
|
|
#define DETAIL_INFO(message) \
|
|
if (isDebug(DETAIL)) \
|
|
{ \
|
|
std::cerr << message << std::endl; \
|
|
}
|
|
|
|
#define VERBOSE_INFO(message) \
|
|
if (isDebug(VERBOSE)) \
|
|
{ \
|
|
std::cerr << message << std::endl; \
|
|
}
|
|
|
|
typedef std::vector<uint64_t> rids;
|
|
|
|
/** @brief abstract class that the defines the general interface and
|
|
* implemetation of a DMLPackageProcessor
|
|
*/
|
|
class DMLPackageProcessor
|
|
{
|
|
public:
|
|
/** @brief Result code
|
|
*/
|
|
enum ResultCode
|
|
{
|
|
NO_ERROR,
|
|
INSERT_ERROR,
|
|
NETWORK_ERROR,
|
|
NOTNULL_VIOLATION,
|
|
CHECK_VIOLATION,
|
|
DELETE_ERROR,
|
|
UPDATE_ERROR,
|
|
INDEX_UPDATE_ERROR,
|
|
COMMAND_ERROR,
|
|
TOKEN_ERROR,
|
|
NOT_ACCEPTING_PACKAGES,
|
|
DEAD_LOCK_ERROR,
|
|
REFERENCE_VIOLATION,
|
|
IDBRANGE_WARNING,
|
|
VB_OVERFLOW_ERROR,
|
|
ACTIVE_TRANSACTION_ERROR,
|
|
TABLE_LOCK_ERROR,
|
|
JOB_ERROR,
|
|
JOB_CANCELED,
|
|
DBRM_READ_ONLY,
|
|
PP_LOST_CONNECTION
|
|
};
|
|
|
|
enum DebugLevel /** @brief Debug level type enumeration */
|
|
{
|
|
NONE = 0, /** @brief No debug info */
|
|
SUMMARY = 1, /** @brief Summary level debug info */
|
|
DETAIL = 2, /** @brief A little detail debug info */
|
|
VERBOSE = 3, /** @brief Detailed debug info */
|
|
};
|
|
|
|
/** @brief the result of dml operations
|
|
*/
|
|
struct DMLResult
|
|
{
|
|
/** @brief the result code
|
|
*/
|
|
ResultCode result;
|
|
/** @brief the error message if result != NO_ERROR
|
|
*/
|
|
logging::Message message;
|
|
/** @brief the rowCount
|
|
*/
|
|
long long rowCount;
|
|
std::string tableLockInfo;
|
|
// query stats;
|
|
std::string queryStats;
|
|
std::string extendedStats;
|
|
std::string miniStats;
|
|
querystats::QueryStats stats;
|
|
|
|
DMLResult() : result(NO_ERROR), rowCount(0){};
|
|
};
|
|
/** @brief a structure to hold a date
|
|
*/
|
|
struct Date
|
|
{
|
|
unsigned spare : 6;
|
|
unsigned day : 6;
|
|
unsigned month : 4;
|
|
unsigned year : 16;
|
|
// NULL column value = 0xFFFFFFFE
|
|
Date()
|
|
{
|
|
year = 0xFFFF;
|
|
month = 0xF;
|
|
day = 0x3F;
|
|
spare = 0x3E;
|
|
}
|
|
};
|
|
/** @brief ctor
|
|
*/
|
|
DMLPackageProcessor(BRM::DBRM* aDbrm, uint32_t sid)
|
|
: fEC(0), DMLLoggingId(21), fRollbackPending(false), fDebugLevel(NONE)
|
|
{
|
|
try
|
|
{
|
|
fWEClient = new WriteEngine::WEClients(WriteEngine::WEClients::DMLPROC);
|
|
// std::cout << "In DMLPackageProcessor constructor " << this << std::endl;
|
|
fPMCount = fWEClient->getPmCount();
|
|
}
|
|
catch (...)
|
|
{
|
|
std::cout << "Cannot make connection to WES" << std::endl;
|
|
}
|
|
|
|
oam::OamCache* oamCache = oam::OamCache::makeOamCache();
|
|
fDbRootPMMap = oamCache->getDBRootToPMMap();
|
|
fDbrm = aDbrm;
|
|
fSessionID = sid;
|
|
fExeMgr = new execplan::ClientRotator(fSessionID, "ExeMgr");
|
|
// std::cout << " fSessionID is " << fSessionID << std::endl;
|
|
fExeMgr->connect(0.005);
|
|
}
|
|
|
|
/** @brief destructor
|
|
*/
|
|
EXPORT virtual ~DMLPackageProcessor();
|
|
|
|
/** @brief Is it required to debug
|
|
*/
|
|
inline bool isDebug(const DebugLevel level) const
|
|
{
|
|
return level <= fDebugLevel;
|
|
}
|
|
|
|
/**
|
|
* @brief Get debug level
|
|
*/
|
|
inline DebugLevel getDebugLevel() const
|
|
{
|
|
return fDebugLevel;
|
|
}
|
|
// int rollBackTransaction(uint64_t uniqueId, uint32_t txnID, uint32_t sessionID, std::string & errorMsg);
|
|
/**
|
|
* @brief Set debug level
|
|
*/
|
|
inline void setDebugLevel(const DebugLevel level)
|
|
{
|
|
fDebugLevel = level;
|
|
}
|
|
|
|
/**
|
|
* @brief Set the Distributed Engine Comm object
|
|
*/
|
|
inline void setEngineComm(joblist::DistributedEngineComm* ec)
|
|
{
|
|
fEC = ec;
|
|
}
|
|
|
|
/** @brief process the dml package
|
|
*
|
|
* @param cpackage the CalpontDMLPackage to process
|
|
*/
|
|
DMLResult processPackage(dmlpackage::CalpontDMLPackage& cpackage);
|
|
|
|
/** @brief Check that give exception is related to PP lost connection.
|
|
*/
|
|
bool checkPPLostConnection(std::exception& ex);
|
|
|
|
inline void setRM(joblist::ResourceManager* frm)
|
|
{
|
|
fRM = frm;
|
|
};
|
|
|
|
EXPORT int rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
|
|
std::string& errorMsg);
|
|
|
|
EXPORT int32_t tryToRollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
|
|
std::string& errorMsg);
|
|
|
|
EXPORT int rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
|
|
const uint32_t tableOid, std::string& errorMsg);
|
|
/**
|
|
* @brief convert a columns data, represnted as a string, to it's native
|
|
* data type
|
|
*
|
|
* @param type the columns database type
|
|
* @param data the columns string representation of it's data
|
|
*/
|
|
// static boost::any convertColumnData( execplan::CalpontSystemCatalog::ColType colType,
|
|
// const std::string& dataOrig );
|
|
/**
|
|
* @brief convert a columns data from native format to a string
|
|
*
|
|
* @param type the columns database type
|
|
* @param data the columns string representation of it's data
|
|
*/
|
|
// static std::string dateToString( int datevalue );
|
|
/** @brief validate numerical string
|
|
*
|
|
* @param result the result structure
|
|
* @param data
|
|
*/
|
|
|
|
// static bool numer_value( std::string data );
|
|
/** @brief check whether the given year is leap year
|
|
*
|
|
* @param year
|
|
*/
|
|
// static bool isLeapYear ( int year);
|
|
|
|
/** @brief check whether the given date valid
|
|
*
|
|
* @param year, month, day
|
|
*/
|
|
// static bool isDateValid ( int day, int month, int year);
|
|
|
|
/** @brief check whether the given datetime valid
|
|
*
|
|
* @param hour, minute, second, microSecond
|
|
*/
|
|
// static bool isDateTimeValid ( int hour, int minute, int second, int microSecond);
|
|
|
|
/** @brief convert month from string to integer
|
|
*
|
|
* @param month
|
|
*/
|
|
// static int convertMonth (std::string month);
|
|
|
|
/** @brief tokenize date or datetime string
|
|
*
|
|
* @param data
|
|
*/
|
|
// static void tokenTime (std::string data, std::vector<std::string>& dataList);
|
|
|
|
/** @brief Access the rollback pending flag
|
|
*/
|
|
bool getRollbackPending()
|
|
{
|
|
return fRollbackPending;
|
|
}
|
|
|
|
/** @brief Set the rollback pending flag
|
|
*/
|
|
void setRollbackPending(bool rollback)
|
|
{
|
|
fRollbackPending = rollback;
|
|
}
|
|
|
|
protected:
|
|
/** @brief update the indexes on the target table
|
|
*
|
|
* @param schema the schema name
|
|
* @param table the table name
|
|
* @param rows the list of rows
|
|
* @param result the result structure
|
|
* @param updateFlag 0: delete, 1: update, 2 insert
|
|
* @param colNameList the updated column names, only valid for update SQL statement
|
|
*/
|
|
bool updateIndexes(uint32_t sessionID, execplan::CalpontSystemCatalog::SCN txnID, const std::string& schema,
|
|
const std::string& table, const dmlpackage::RowList& rows, DMLResult& result,
|
|
std::vector<WriteEngine::RID> ridList, WriteEngine::ColValueList& colValuesList,
|
|
std::vector<std::string>& colNameList, const char updateFlag,
|
|
std::vector<dicStrValues>& dicStrValCols);
|
|
|
|
/** @brief delete the indexes on the target table
|
|
*
|
|
* @param schema the schema name
|
|
* @param table the table name
|
|
* @param rows the list of rows
|
|
* @param result the result structure
|
|
*/
|
|
bool deleteIndexes(const std::string& schema, const std::string& table, const dmlpackage::RowList& rows,
|
|
DMLResult& result);
|
|
|
|
/** @brief validate that none of the columns in the supplied row(s) violate
|
|
* any defined constraints
|
|
*
|
|
* @param schema the schema name
|
|
* @param table the table name
|
|
* @param rows the lists of rows to check column constraints on
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesConstraints(uint32_t sessionID, const std::string& schema, const std::string& table,
|
|
const dmlpackage::RowList& rows, DMLResult& result);
|
|
|
|
/** @brief validate that the column does not violate a unique constraint
|
|
*
|
|
* @param column the column to validate
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesUniqueConstraint(const dmlpackage::RowList& rows,
|
|
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
|
|
unsigned int sessionID, DMLResult& result);
|
|
|
|
/** @brief validate that the column does not violate a check constraint
|
|
*
|
|
* @param column the column to validate
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesCheckConstraint(const dmlpackage::RowList& rows,
|
|
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
|
|
unsigned int sessionID, unsigned int colOffset, DMLResult& result);
|
|
|
|
/** @brief validate that the column does not violate a not null constraint
|
|
*
|
|
* @param type the columns database type
|
|
* @param column the column to validate
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesNotNullConstraint(const dmlpackage::RowList& rows, unsigned int colOffset, DMLResult& result);
|
|
|
|
/** @brief validate that the column does not violate a reference (foreign key) constraint
|
|
*
|
|
* @param rows the row set to validate
|
|
* @param colOffset the offset of this column in the row
|
|
* @param constraintInfo the column constraint infomation
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesReferenceConstraint(const dmlpackage::RowList& rows, unsigned int colOffset,
|
|
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
|
|
DMLResult& result);
|
|
|
|
/** @brief validate that non of the rows deleted from this table violate a reference
|
|
* (foreign key) constraint of the reference table.
|
|
*
|
|
* @param schema the schema name
|
|
* @param table the table name
|
|
* @param rows the lists of rows to check column constraints on
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesPKRefConnstraint(uint32_t sessionID, const std::string& schema, const std::string& table,
|
|
const dmlpackage::RowList& rows,
|
|
const WriteEngine::ColValueList& oldValueList, DMLResult& result);
|
|
|
|
bool violatesPKRefConnstraint(uint32_t sessionID, const std::string& schema, const std::string& table,
|
|
std::vector<WriteEngine::RID>& rowIDList, std::vector<void*>& oldValueList,
|
|
DMLResult& result);
|
|
|
|
/** @brief validate that the rows deleted does not violate a reference (foreign key) constraint
|
|
*
|
|
* @param rows the row set to validate
|
|
* @param colOffset the offset of this column in the row
|
|
* @param constraintInfo the column constraint infomation
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesReferenceConstraint_PK(const WriteEngine::ColValueList& oldValueList,
|
|
const execplan::CalpontSystemCatalog::ColType& colType,
|
|
unsigned int colOffset,
|
|
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
|
|
DMLResult& result);
|
|
|
|
bool violatesReferenceConstraint_PK(std::vector<void*>& oldValueList, const int totalRows,
|
|
const execplan::CalpontSystemCatalog::ColType& colType,
|
|
unsigned int colOffset,
|
|
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
|
|
DMLResult& result);
|
|
|
|
/** @brief validate that none of the columns in the update row(s) violate
|
|
* any reference constraints of the foreign key table
|
|
*
|
|
* @param schema the schema name
|
|
* @param table the table name
|
|
* @param rows the lists of rows to check column constraints on
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesUpdtRefConstraints(uint32_t sessionID, const std::string& schema, const std::string& table,
|
|
const dmlpackage::RowList& rows, DMLResult& result);
|
|
|
|
/** @brief validate that the column does not violate a reference (foreign key) constraint
|
|
*
|
|
* @param rows the row set to validate
|
|
* @param colOffset the offset of this column in the row
|
|
* @param constraintInfo the column constraint infomation
|
|
* @param result the result structure
|
|
*/
|
|
bool violatesReferenceConstraint_updt(const dmlpackage::RowList& rows, unsigned int colOffset,
|
|
const execplan::CalpontSystemCatalog::ConstraintInfo& constraintInfo,
|
|
DMLResult& result);
|
|
|
|
/** @brief get the column list for the supplied table
|
|
*
|
|
* @param schema the schema name
|
|
* @param table the table name
|
|
* @param colList ColumnList to fill with the columns for the supplied table
|
|
*/
|
|
void getColumnsForTable(uint32_t sessionID, std::string schema, std::string table,
|
|
dmlpackage::ColumnList& colList);
|
|
|
|
/** @brief convert absolute rid to relative rid in a segement file
|
|
*
|
|
* @param rid in:the absolute rid out: relative rid in a segement file
|
|
* @param dbRoot,partition, segment the extent information obtained from rid
|
|
* @param filesPerColumnPartition,extentRows, extentsPerSegmentFile the extent map parameters
|
|
* @param startDBRoot the dbroot this table starts
|
|
* @param dbrootCnt the number of dbroot in db
|
|
*/
|
|
void convertRidToColumn(uint64_t& rid, unsigned& dbRoot, unsigned& partition, unsigned& segment,
|
|
unsigned filesPerColumnPartition, unsigned extentsPerSegmentFile,
|
|
unsigned extentRows, unsigned startDBRoot, unsigned dbrootCnt,
|
|
const unsigned startPartitionNum);
|
|
|
|
inline bool isDictCol(execplan::CalpontSystemCatalog::ColType colType)
|
|
{
|
|
if (((colType.colDataType == execplan::CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) ||
|
|
((colType.colDataType == execplan::CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) ||
|
|
((colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) && (colType.precision > 38)) ||
|
|
(colType.colDataType == execplan::CalpontSystemCatalog::VARBINARY) ||
|
|
(colType.colDataType == execplan::CalpontSystemCatalog::BLOB) ||
|
|
(colType.colDataType == execplan::CalpontSystemCatalog::TEXT))
|
|
{
|
|
return true;
|
|
}
|
|
else
|
|
return false;
|
|
}
|
|
|
|
/** @brief convert an error code to a string
|
|
*
|
|
* @param ec in:the error code received
|
|
* @returns error string
|
|
*/
|
|
std::string projectTableErrCodeToMsg(uint32_t ec);
|
|
|
|
// bool validateNextValue(execplan::CalpontSystemCatalog::ColType colType, int64_t value, bool &
|
|
// offByOne);
|
|
|
|
bool validateVarbinaryVal(std::string& inStr);
|
|
int commitTransaction(uint64_t uniqueId, BRM::TxnID txnID);
|
|
int commitBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, const uint32_t tableOid,
|
|
std::string& errorMsg);
|
|
int commitBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID, const uint32_t tableOid,
|
|
std::string& errorMsg);
|
|
int rollBackBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
|
|
const uint32_t tableOid, std::string& errorMsg);
|
|
int flushDataFiles(int rc, std::map<uint32_t, uint32_t>& columnOids, uint64_t uniqueId, BRM::TxnID txnID,
|
|
uint32_t tableOid);
|
|
int endTransaction(uint64_t uniqueId, BRM::TxnID txnID, bool success);
|
|
|
|
/** @brief the Session Manager interface
|
|
*/
|
|
execplan::SessionManager fSessionManager;
|
|
joblist::DistributedEngineComm* fEC;
|
|
joblist::ResourceManager* fRM;
|
|
char* strlower(char* in);
|
|
uint32_t fSessionID;
|
|
const unsigned DMLLoggingId;
|
|
uint32_t fPMCount;
|
|
WriteEngine::WEClients* fWEClient;
|
|
BRM::DBRM* fDbrm;
|
|
boost::shared_ptr<std::map<int, int> > fDbRootPMMap;
|
|
oam::Oam fOam;
|
|
bool fRollbackPending; // When set, any derived object should stop what it's doing and cleanup in
|
|
// preparation for a Rollback
|
|
execplan::ClientRotator* fExeMgr;
|
|
|
|
private:
|
|
virtual DMLResult processPackageInternal(dmlpackage::CalpontDMLPackage& cpackage) = 0;
|
|
|
|
/** @brief clean beginning and ending glitches and spaces from string
|
|
*
|
|
* @param s string to be cleaned
|
|
*/
|
|
void cleanString(std::string& s);
|
|
|
|
DebugLevel fDebugLevel; // internal use debug level
|
|
|
|
const std::string PPLostConnectionErrorCode = "MCS-2045";
|
|
};
|
|
|
|
/** @brief helper template function to do safe from string to type conversions
|
|
*
|
|
*/
|
|
template <class T>
|
|
bool from_string(T& t, const std::string& s, std::ios_base& (*f)(std::ios_base&))
|
|
{
|
|
std::istringstream iss(s);
|
|
return !(iss >> f >> t).fail();
|
|
}
|
|
|
|
} // namespace dmlpackageprocessor
|
|
|
|
#undef EXPORT
|