1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00
Files
mariadb-columnstore-engine/writeengine/bulk/we_tableinfo.h
Gagan Goel 1fc399451a MCOL-4957 Fix performance slowdown for processing TIMESTAMP columns.
Part 1:
 As part of MCOL-3776 to address synchronization issue while accessing
 the fTimeZone member of the Func class, mutex locks were added to the
 accessor and mutator methods. However, this slows down processing
 of TIMESTAMP columns in PrimProc significantly as all threads across
 all concurrently running queries would serialize on the mutex. This
 is because PrimProc only has a single global object for the functor
 class (class derived from Func in utils/funcexp/functor.h) for a given
 function name. To fix this problem:

   (1) We remove the fTimeZone as a member of the Func derived classes
   (hence removing the mutexes) and instead use the fOperationType
   member of the FunctionColumn class to propagate the timezone values
   down to the individual functor processing functions such as
   FunctionColumn::getStrVal(), FunctionColumn::getIntVal(), etc.

   (2) To achieve (1), a timezone member is added to the
   execplan::CalpontSystemCatalog::ColType class.

Part 2:
 Several functors in the Funcexp code call dataconvert::gmtSecToMySQLTime()
 and dataconvert::mySQLTimeToGmtSec() functions for conversion between seconds
 since unix epoch and broken-down representation. These functions in turn call
 the C library function localtime_r() which currently has a known bug of holding
 a global lock via a call to __tz_convert. This significantly reduces performance
 in multi-threaded applications where multiple threads concurrently call
 localtime_r(). More details on the bug:
   https://sourceware.org/bugzilla/show_bug.cgi?id=16145

 This bug in localtime_r() caused processing of the Functors in PrimProc to
 slowdown significantly since a query execution causes Functors code to be
 processed in a multi-threaded manner.

 As a fix, we remove the calls to localtime_r() from gmtSecToMySQLTime()
 and mySQLTimeToGmtSec() by performing the timezone-to-offset conversion
 (done in dataconvert::timeZoneToOffset()) during the execution plan
 creation in the plugin. Note that localtime_r() is only called when the
 time_zone system variable is set to "SYSTEM".

 This fix also required changing the timezone type from a std::string to
 a long across the system.
2022-02-11 19:03:32 -05:00

659 lines
21 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*******************************************************************************
* $Id: we_tableinfo.h 4648 2013-05-29 21:42:40Z rdempsey $
*
*******************************************************************************/
#ifndef _WE_TABLEINFO_H
#define _WE_TABLEINFO_H
#include <sys/time.h>
#include <fstream>
#include <utility>
#include <vector>
#include <boost/thread/mutex.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/uuid/uuid.hpp>
#include <libmarias3/marias3.h>
#include "we_type.h"
#include "we_colop.h"
#include "we_fileop.h"
#include "we_blockop.h"
#include "we_brm.h"
#include "we_colbufmgr.h"
#include "we_columninfo.h"
#include "we_bulkloadbuffer.h"
#include "we_rbmetawriter.h"
#include "we_log.h"
#include "we_brmreporter.h"
#include "we_extentstripealloc.h"
#include "messagelog.h"
#include "brmtypes.h"
#include "querytele.h"
#include "oamcache.h"
namespace WriteEngine
{
/* @brief Class which maintains the information for a table.
*/
class TableInfo : public WeUIDGID
{
private:
//--------------------------------------------------------------------------
// Private Data Members
//--------------------------------------------------------------------------
int fTableId; // Table id
int fBufferSize; // Size of buffer used by BulkLoadBuffer
size_t fFileBufSize; // Size of fFileBuffer passed to setvbuf
// to read import files. Comes from
// writeBufferSize tag in job xml file
char fColDelim; // Used to delimit col values in a row
volatile Status fStatusTI; // Status of table. Made volatile to
// insure BulkLoad methods can access
// (thru getStatusTI()) correctly w/o
// having to go through a mutex lock.
int fReadBufCount; // Number of read buffers
// (size of fBuffers vector)
unsigned fNumberOfColumns; // Number of ColumnInfo objs in this tbl
// (size of fColumns vector)
FILE* fHandle; // Handle to the input load file
int fCurrentReadBuffer; // Id of current buffer being popu-
// lated by the read thread
RID fTotalReadRows; // Total number of rows read
volatile unsigned fTotalErrRows; // Total error rows among all input
// for this table. Is volatile to
// insure parser & reader threads
// see the latest value.
unsigned fMaxErrorRows; // Maximum error rows
int fLastBufferId; // Id of the last buffer
char* fFileBuffer; // File buffer passed to setvbuf()
int fCurrentParseBuffer; // Id of leading current buffer being
// parsed. There can be more than 1
// buffer being parsed concurrently.
unsigned fNumberOfColsParsed; // Number of columns completed parsing
boost::ptr_vector<ColumnInfo> fColumns; // Columns of the table
boost::ptr_vector<BulkLoadBuffer> fBuffers; // Array of read buffers. Used
// to pass data from the read
// thread to the write thread(s)
/* fSyncUpdatesTI is the mutex used to synchronize updates to TableInfo
* (excluding fErrorRows and fErrDataRows)
*
* This mutex is also used to coordinate access to fColumnLocks and
* fParseComplete (in BulkLoadBuffer) for the buffers within a table.
* See bulkloadBuffer.h for more information.
*
* As the controlling class, TableInfo is the one that is always
* getting/setting the status of the BulkLoadBuffer objects, so
* fSyncUpdatesTI is also used to set/get the BulkLoadBuffer status.
*/
boost::mutex fSyncUpdatesTI;
boost::mutex fErrorRptInfoMutex; // Used to synhronize access to
// fRejectDataFile & fRejectErrFile
int fLocker; // Read thread id reading this table
std::vector<std::string> fLoadFileList; // Load files
std::string fFileName; // Current load file
std::string fTableName; // File name of the table
OID fTableOID; // OID of the table
std::string fjobFileName; // Job file name
int fJobId; // Job ID number
Log* fLog; // Object used for logging
timeval fStartTime; // Time when reading and processing began for this table
const BRM::TxnID fTxnID; // Transaction id for the build load
RBMetaWriter fRBMetaWriter; // Manages the writing of bulk roll-
// back meta data for this table
std::string fProcessName; // Name of application process used
// in db table locks
bool fKeepRbMetaFile; // Keep or delete bulk rollback meta
// data file
bool fbTruncationAsError; // Treat string truncation as error
ImportDataMode fImportDataMode; // Import data in text or binary mode
long fTimeZone; // Timezone offset (in seconds) relative to UTC,
// to use for TIMESTAMP data type. For example,
// for EST which is UTC-5:00, offset will be -18000s.
volatile bool fTableLocked; // Do we have db table lock
bool fReadFromStdin; // Read import file from STDIN
bool fReadFromS3; // Read import file from S3
std::string fS3Host; // S3 hostname
std::string fS3Key; // S3 key
std::string fS3Secret; // S3 secret key
std::string fS3Bucket; // S3 bucket
std::string fS3Region;
ms3_st* ms3; // S3 object
size_t fS3ReadLength;
size_t fS3ParseLength;
bool fNullStringMode; // Treat "NULL" as a null value
char fEnclosedByChar; // Character to enclose col values
char fEscapeChar; // Escape character used in conjunc-
// tion with fEnclosedByChar
bool fProcessingBegun; // Has processing begun on this tbl
BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3)
std::string fBRMRptFileName; // Name of distributed mode rpt file
BRMReporter fBRMReporter; // Object used to report BRM updates
uint64_t fTableLockID; // Unique table lock ID
std::vector<uint16_t> fOrigDbRootIds; // List of DBRoots at start of job
std::string fErrorDir; // Opt dir for *.err and *.bad files
std::vector<std::string> fErrFiles; // List of *.err files for this table
std::vector<std::string> fBadFiles; // List of *.bad files for this table
std::ofstream fRejectDataFile; // File containing rejected rows
std::ofstream fRejectErrFile; // File containing errmsgs for bad rows
std::string fRejectDataFileName; // Filename for current fRejectDataFile
std::string fRejectErrFileName; // Filename for current fRejectErrFile
unsigned int fRejectDataCnt; // Running row count in current bad file
unsigned int fRejectErrCnt; // Running count in current err msg file
ExtentStripeAlloc fExtentStrAlloc; // Extent stripe allocator for this tbl
querytele::QueryTeleClient fQtc; // Query Tele client
oam::OamCache* fOamCachePtr; // OamCache: ptr is copyable
boost::uuids::uuid fJobUUID; // Job UUID
std::vector<BRM::LBID_t> fDictFlushBlks; // dict blks to be flushed from cache
//--------------------------------------------------------------------------
// Private Functions
//--------------------------------------------------------------------------
int changeTableLockState(); // Change state of table lock to cleanup
void closeTableFile(); // Close current tbl file; free buffer
void closeOpenDbFiles(); // Close DB files left open at job's end
int confirmDBFileChanges(); // Confirm DB file changes (on HDFS)
void deleteTempDBFileChanges(); // Delete DB temp swap files (on HDFS)
int finishBRM(); // Finish reporting updates for BRM
void freeProcessingBuffers(); // Free up Processing Buffers
bool isBufferAvailable(bool report); // Is tbl buffer available for reading
int openTableFile(); // Open data file and set the buffer
void reportTotals(double elapsedSec); // Report summary totals
void sleepMS(long int ms); // Sleep method
// Compare column HWM with the examplar HWM.
int compareHWMs(const int smallestColumnId, const int widerColumnId, const uint32_t smallerColumnWidth,
const uint32_t widerColumnWidth, const std::vector<DBRootExtentInfo>& segFileInfo,
int& colIdx);
int synchronizeAutoInc(); // Sychronize AutoInc in BRM with syscat
// Write the list of errors for this table
void writeErrorList(const std::vector<std::pair<RID, std::string> >* errorRows,
const std::vector<std::string>* errorDatRows, bool bCloseFile);
// Write out rejected rows, and corresponding error messages
void writeBadRows(const std::vector<std::string>* errorDatRows, bool bCloseFile);
void writeErrReason(const std::vector<std::pair<RID, std::string> >* errorRows, bool bCloseFile);
// Disable copy constructor and assignment operator
TableInfo(const TableInfo& tableInfo); //
TableInfo& operator=(const TableInfo& info);
public:
//-------------------------------------------------------------------------
// Public Functions
//-------------------------------------------------------------------------
/** @brief Default constructor
*/
TableInfo(Log* logger, const BRM::TxnID, const std::string& processName, OID tableOID,
const std::string& tableName, bool bKeepRbMetaFile);
/** @brief Default destructor
*/
~TableInfo();
/** @brief Acquire the DB table lock for this table
*/
int acquireTableLock(bool disableTimeOut = false);
/** @brief Get current table lock ID for this table
*/
uint64_t getTableLockID() const;
/** @brief Release the DB table lock for this table
*/
int releaseTableLock();
/** @brief Allocate an extent for the specified OID and DBRoot, using the
* internal "stripe" allocator.
* @param columnOID Allocate next extent for this column
* @param dbRoot Allocate extent on this DBRoot
* @param partition (in/out) If DBRoot is empty, this is an input arg,
* else it is assigned by BRM and returned as output
* @param segment (out) Segment number of extent created by BRM
* @param startLbid (out) Starting LBID for extent created by BRM
* @param allocSize (out) Num blocks allocated to extent by BRM
* @param hwm (out) FBO for extent created by BRM
* @param errMsg (out) Error message
*/
int allocateBRMColumnExtent(OID columnOID, uint16_t dbRoot, uint32_t& partition, uint16_t& segment,
BRM::LBID_t& startLbid, int& allocSize, HWM& hwm, std::string& errMsg);
/** @brief Delete the bulk rollback metadata file.
*/
void deleteMetaDataRollbackFile();
/** @brief Get binary import mode.
*/
ImportDataMode getImportDataMode() const;
/** @brief Get timezone.
*/
long getTimeZone() const;
/** @brief Get number of buffers
*/
int getNumberOfBuffers() const;
/** @brief Set the buffer size
* @param Buffer size
*/
void setBufferSize(const int bufSize);
/** @brief Set the file buffer size.
* @param Buffer size
*/
void setFileBufferSize(const int fileBufSize);
/** @brief Set the delimiter used to delimit column values within a row
*/
void setColDelimiter(const char delim);
/** @brief Get table status
*/
Status getStatusTI() const;
/** @brief Get current parse buffer
*/
int getCurrentParseBuffer() const;
/** @brief Get the number of columns
*/
int getNumberOfColumns() const;
/** @brief get the file name
*/
std::string getFileName() const;
/** @brief Get the number of maximum allowed error rows
*/
unsigned getMaxErrorRows() const;
/** @brief retrieve the tuncation as error setting for this
* import. When set, this causes char and varchar strings
* that are longer than the column definition to be treated
* as errors instead of warnings.
*/
bool getTruncationAsError() const;
/** @brief set the maximum number of error rows allowed
*/
void setMaxErrorRows(const unsigned int maxErrorRows);
/** @brief Set mode to treat "NULL" string as NULL value or not.
*/
void setNullStringMode(bool bMode);
/** @brief Set binary import data mode (text or binary).
*/
void setImportDataMode(ImportDataMode importMode);
/** @brief Set timezone.
*/
void setTimeZone(long timeZone);
/** @brief Enable distributed mode, saving BRM updates in rptFileName
*/
void setBulkLoadMode(BulkModeType bulkMode, const std::string& rptFileName);
/** @brief Set character optionally used to enclose input column values.
*/
void setEnclosedByChar(char enChar);
/** @brief Set escape char to use in conjunction with enclosed by char.
*/
void setEscapeChar(char esChar);
/** @brief Has processing begun for this table.
*/
bool hasProcessingBegun();
/** @brief set the table id
*/
void setTableId(const int& id);
/** @brief get the file name
*/
std::string getTableName() const;
/** @brief get the table OID
*/
OID getTableOID();
/** @brief Set the directory for *.err and *.bad files. May be
* empty string, in which case we use current dir.
*/
void setErrorDir(const std::string& errorDir);
/** @brief get the bulk rollback meta data writer object for this table
*/
RBMetaWriter* rbMetaWriter();
/** @brief Add column information to the table
*/
void addColumn(ColumnInfo* info);
/** @brief Initialize the buffer list
* @param noOfBuffers Number of buffers to create for this table
* @param jobFieldRefList List of fields in this import
* @param fixedBinaryRecLen In binary mode, this is the fixed record length
* used to read the buffer; in text mode, this value is not used.
*/
int initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList,
unsigned int fixedBinaryRecLen);
/** @brief Read the table data into the read buffer
*/
int readTableData();
/** @brief parse method
*/
int parseColumn(const int& columnId, const int& bufferId, double& processingTime);
/** @brief update the buffer status for column
*/
int setParseComplete(const int& columnId, const int& bufferId, double processingTime);
/** @brief update the status to reflect a parsing error
*/
void setParseError();
/** @brief Check if buffer ready for parsing.
*/
bool bufferReadyForParse(const int& bufferId, bool report) const;
/** @brief Check if a column is available for parsing in the buffer
* and return the column id if available
*/
int getColumnForParse(const int& id, const int& bufferId, bool report);
/** @brief Do we have a db lock with the session manager for this table.
*/
bool isTableLocked();
/** @brief Lock the table for reading
*/
bool lockForRead(const int& locker);
/** @brief Rollback changes made to "this" table by the current import job
*/
int rollbackWork();
/** @brief set list of import files and STDIN usage flag
*/
void setLoadFilesInput(bool bReadFromStdin, bool bReadFromS3, const std::vector<std::string>& files,
const std::string& s3host, const std::string& s3key, const std::string& s3secret,
const std::string& s3bucket, const std::string& s3region);
/** @brief set job file name under process.
*/
void setJobFileName(const std::string& jobFileName);
/** @brief set job ID for this import.
*/
void setJobId(int jobId);
/** @brief set truncation as error for this import.
* When set, this causes char and varchar strings that are
* longer than the column definition to be treated as errors
* instead of warnings.
*/
void setTruncationAsError(bool bTruncationAsError);
/** @brief log message to data_mods.log file.
*/
void logToDataMods(const std::string& jobFile, const std::string& messageText);
/** @brief Validate consistency of current HWMs for this table's columns.
* If jobTable argument is provided, then it will be used to get additional
* column info, else this table's fColumns vector is used.
* "stage" indicates validation stage ("Starting" or "Ending" HWMs).
*/
int validateColumnHWMs(const JobTable* jobTable, const std::vector<DBRootExtentInfo>& segFileInfo,
const char* stage);
/** @brief Initialize the bulk rollback meta data writer for this table.
*/
int initBulkRollbackMetaData();
/** @brief Save meta data information for bulk rollback.
* This is the Shared-Nothing version of this function.
* @param job Input Job information
* @param segFileInfo vector of starting segment files for each column
* @param dbRootHWMInfoColVec Vector of last local HWMs for each DBRoot
* on this PM.
*/
int saveBulkRollbackMetaData(Job& job, const std::vector<DBRootExtentInfo>& segFileInfo,
const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoColVec);
/** @brief Mark table as complete
*/
void markTableComplete();
void setJobUUID(const boost::uuids::uuid& jobUUID);
public:
friend class BulkLoad;
friend class ColumnInfo;
friend class ColumnInfoCompressed;
};
//------------------------------------------------------------------------------
// Inline functions
//------------------------------------------------------------------------------
inline int TableInfo::getCurrentParseBuffer() const
{
return fCurrentParseBuffer;
}
inline std::string TableInfo::getFileName() const
{
return fFileName;
}
inline ImportDataMode TableInfo::getImportDataMode() const
{
return fImportDataMode;
}
inline long TableInfo::getTimeZone() const
{
return fTimeZone;
}
inline int TableInfo::getNumberOfBuffers() const
{
return fReadBufCount;
}
inline int TableInfo::getNumberOfColumns() const
{
return fNumberOfColumns;
}
inline Status TableInfo::getStatusTI() const
{
return fStatusTI;
}
inline unsigned TableInfo::getMaxErrorRows() const
{
return fMaxErrorRows;
}
inline uint64_t TableInfo::getTableLockID() const
{
return fTableLockID;
}
inline std::string TableInfo::getTableName() const
{
return fTableName;
}
inline OID TableInfo::getTableOID()
{
return fTableOID;
}
inline bool TableInfo::getTruncationAsError() const
{
return fbTruncationAsError;
}
inline bool TableInfo::hasProcessingBegun()
{
return fProcessingBegun;
}
inline bool TableInfo::isTableLocked()
{
return fTableLocked;
}
inline void TableInfo::markTableComplete()
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::PARSE_COMPLETE;
}
inline RBMetaWriter* TableInfo::rbMetaWriter()
{
return &fRBMetaWriter;
}
inline void TableInfo::setBufferSize(const int bufSize)
{
fBufferSize = bufSize;
}
inline void TableInfo::setColDelimiter(const char delim)
{
fColDelim = delim;
}
inline void TableInfo::setBulkLoadMode(BulkModeType bulkMode, const std::string& rptFileName)
{
fBulkMode = bulkMode, fBRMRptFileName = rptFileName;
}
inline void TableInfo::setEnclosedByChar(char enChar)
{
fEnclosedByChar = enChar;
}
inline void TableInfo::setEscapeChar(char esChar)
{
fEscapeChar = esChar;
}
inline void TableInfo::setFileBufferSize(const int fileBufSize)
{
fFileBufSize = fileBufSize;
}
inline void TableInfo::setImportDataMode(ImportDataMode importMode)
{
fImportDataMode = importMode;
}
inline void TableInfo::setTimeZone(long timeZone)
{
fTimeZone = timeZone;
}
inline void TableInfo::setJobFileName(const std::string& jobFileName)
{
fjobFileName = jobFileName;
}
inline void TableInfo::setJobId(int jobId)
{
fJobId = jobId;
}
inline void TableInfo::setLoadFilesInput(bool bReadFromStdin, bool bReadFromS3,
const std::vector<std::string>& files, const std::string& s3host,
const std::string& s3key, const std::string& s3secret,
const std::string& s3bucket, const std::string& s3region)
{
fReadFromStdin = bReadFromStdin;
fReadFromS3 = bReadFromS3;
fLoadFileList = files;
fS3Host = s3host;
fS3Key = s3key;
fS3Secret = s3secret;
fS3Bucket = s3bucket;
fS3Region = s3region;
}
inline void TableInfo::setMaxErrorRows(const unsigned int maxErrorRows)
{
fMaxErrorRows = maxErrorRows;
}
inline void TableInfo::setNullStringMode(bool bMode)
{
fNullStringMode = bMode;
}
inline void TableInfo::setTableId(const int& id)
{
fTableId = id;
}
inline void TableInfo::setTruncationAsError(bool bTruncationAsError)
{
fbTruncationAsError = bTruncationAsError;
}
inline void TableInfo::setJobUUID(const boost::uuids::uuid& jobUUID)
{
fJobUUID = jobUUID;
}
inline void TableInfo::setErrorDir(const std::string& errorDir)
{
fErrorDir = errorDir;
#ifdef _MSC_VER
if (fErrorDir.length() > 0 && *(--(fErrorDir.end())) != '/' && *(--(fErrorDir.end())) != '\\')
fErrorDir.push_back('\\');
}
#else
if (fErrorDir.length() > 0 && *(--(fErrorDir.end())) != '/')
fErrorDir.push_back('/');
}
#endif
} // namespace WriteEngine
#endif