1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2026-01-06 08:21:10 +03:00

No boost condition (#2822)

This patch replaces boost primitives with stdlib counterparts.
This commit is contained in:
Leonid Fedorov
2023-04-22 00:42:45 +03:00
committed by GitHub
parent 3ce19abdae
commit f916e64927
245 changed files with 1261 additions and 2007 deletions

View File

@@ -71,7 +71,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix
namespace WriteEngine
{
/* static */ boost::ptr_vector<TableInfo> BulkLoad::fTableInfo;
/* static */ boost::mutex* BulkLoad::fDDLMutex = 0;
/* static */ std::mutex* BulkLoad::fDDLMutex = 0;
/* static */ const std::string BulkLoad::DIR_BULK_JOB("job");
/* static */ const std::string BulkLoad::DIR_BULK_TEMP_JOB("tmpjob");
@@ -165,7 +165,7 @@ BulkLoad::BulkLoad()
fTableInfo.clear();
setDebugLevel(DEBUG_0);
fDDLMutex = new boost::mutex();
fDDLMutex = new std::mutex();
memset(&fStartTime, 0, sizeof(timeval));
memset(&fEndTime, 0, sizeof(timeval));
}
@@ -1584,7 +1584,7 @@ int BulkLoad::updateNextValue(OID columnOid, uint64_t nextAutoIncVal)
// job for 2 tables; so we put a mutex here just in case the DDLClient code
// won't work well with 2 competing WE_DDLCommandClient objects in the same
// process (ex: if there is any static data in WE_DDLCommandClient).
boost::mutex::scoped_lock lock(*fDDLMutex);
std::unique_lock lock(*fDDLMutex);
WE_DDLCommandClient ddlCommandClt;
unsigned int rc = ddlCommandClt.UpdateSyscolumnNextval(columnOid, nextAutoIncVal);

View File

@@ -40,7 +40,8 @@
#include "brmtypes.h"
#include "boost/ptr_container/ptr_vector.hpp"
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <map>
#include <mutex>
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/uuid/uuid.hpp>
@@ -199,9 +200,9 @@ class BulkLoad : public FileOp
int fNoOfReadThreads; // Number of read threads
boost::thread_group fReadThreads; // Read thread group
boost::thread_group fParseThreads; // Parse thread group
boost::mutex fReadMutex; // Manages table selection by each
std::mutex fReadMutex; // Manages table selection by each
// read thread
boost::mutex fParseMutex; // Manages table/buffer/column
std::mutex fParseMutex; // Manages table/buffer/column
// selection by each parsing thread
BRM::TxnID fTxnID; // TransID acquired from SessionMgr
bool fKeepRbMetaFiles; // Keep/delete bulkRB metadata files
@@ -218,7 +219,7 @@ class BulkLoad : public FileOp
ImportDataMode fImportDataMode; // Importing text or binary data
bool fbContinue; // true when read and parse r running
//
static boost::mutex* fDDLMutex; // Insure only 1 DDL op at a time
static std::mutex* fDDLMutex; // Insure only 1 DDL op at a time
EXPORT static const std::string DIR_BULK_JOB; // Bulk job directory
EXPORT static const std::string DIR_BULK_TEMP_JOB; // Dir for tmp job files

View File

@@ -1536,7 +1536,7 @@ int BulkLoadBuffer::parse(ColumnInfo& columnInfo)
// variables). It should be okay to reference a copy of these variables
// as no other thread should be changing them while we are in parse().
{
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
std::unique_lock lock(fSyncUpdatesBLB);
fTotalReadRowsParser = fTotalReadRows;
fStartRowParser = fStartRow;
fDataParser = fData;
@@ -2035,7 +2035,7 @@ int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const ch
const boost::ptr_vector<ColumnInfo>& columnsInfo,
unsigned int allowedErrCntThisCall)
{
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
std::unique_lock lock(fSyncUpdatesBLB);
reset();
copyOverflow(overFlowBufIn);
size_t readSize = 0;
@@ -2138,7 +2138,7 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
unsigned int allowedErrCntThisCall)
{
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
std::unique_lock lock(fSyncUpdatesBLB);
reset();
copyOverflow(overFlowBufIn);
size_t readSize = 0;

View File

@@ -137,7 +137,7 @@ class BulkLoadBuffer
char fColDelim; // Character to delimit columns in a row
unsigned fBufferSize; // Size of input read buffer (fData)
unsigned fReadSize; // Number of bytes in read buffer(fData)
boost::mutex fSyncUpdatesBLB; // Mutex to synchronize updates
std::mutex fSyncUpdatesBLB; // Mutex to synchronize updates
Log* fLog; // Logger object
bool fNullStringMode; // Indicates if "NULL" string is to be
// treated as a NULL value or not

View File

@@ -31,6 +31,8 @@
* ColumnBufferCompressed.
*/
#include "we_colbufmgr.h"
#include "we_colbuf.h"
#include "we_colbufcompressed.h"
@@ -38,8 +40,10 @@
#include "we_bulkstatus.h"
#include "we_log.h"
#include "blocksize.h"
#include <chrono>
#include <sstream>
#include <boost/date_time/posix_time/posix_time_types.hpp>
namespace
{
@@ -123,9 +127,9 @@ int ColumnBufferManager::reserveSection(RID startRowId, uint32_t nRowsIn, uint32
Stats::startParseEvent(WE_STATS_WAIT_TO_RESERVE_OUT_BUF);
#endif
*cbs = 0;
boost::posix_time::seconds wait_seconds(COND_WAIT_SECONDS);
std::chrono::seconds wait_seconds(COND_WAIT_SECONDS);
boost::mutex::scoped_lock lock(fColInfo->colMutex());
std::unique_lock lock(fColInfo->colMutex());
//..Ensure that ColumnBufferSection allocations are made in input row order
bool bWaitedForInSequence = false;
@@ -145,7 +149,7 @@ int ColumnBufferManager::reserveSection(RID startRowId, uint32_t nRowsIn, uint32
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
fOutOfSequence.timed_wait(lock, wait_seconds);
fOutOfSequence.wait_for(lock, wait_seconds);
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
@@ -179,7 +183,7 @@ int ColumnBufferManager::reserveSection(RID startRowId, uint32_t nRowsIn, uint32
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
fResizeInProgress.timed_wait(lock, wait_seconds);
fResizeInProgress.wait_for(lock, wait_seconds);
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
@@ -253,7 +257,7 @@ int ColumnBufferManager::reserveSection(RID startRowId, uint32_t nRowsIn, uint32
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
fBufInUse.timed_wait(lock, wait_seconds);
fBufInUse.wait_for(lock, wait_seconds);
// See if JobStatus has been set to quit by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
@@ -330,7 +334,7 @@ int ColumnBufferManager::releaseSection(ColumnBufferSection* cbs)
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_RELEASE_OUT_BUF);
#endif
boost::mutex::scoped_lock lock(fColInfo->colMutex());
std::unique_lock lock(fColInfo->colMutex());
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WAIT_TO_RELEASE_OUT_BUF);
#endif
@@ -675,8 +679,8 @@ int ColumnBufferManager::flush()
//------------------------------------------------------------------------------
int ColumnBufferManager::intermediateFlush()
{
boost::posix_time::seconds wait_seconds(COND_WAIT_SECONDS);
boost::mutex::scoped_lock lock(fColInfo->colMutex());
std::chrono::seconds wait_seconds(COND_WAIT_SECONDS);
std::unique_lock lock(fColInfo->colMutex());
// Wait for all other threads which are currently parsing rows,
// to finish parsing the data in those sections.
@@ -686,7 +690,7 @@ int ColumnBufferManager::intermediateFlush()
while (fSectionsInUse.size() > 0)
{
fBufInUse.timed_wait(lock, wait_seconds);
fBufInUse.wait_for(lock, wait_seconds);
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
@@ -729,7 +733,7 @@ int ColumnBufferManager::rowsExtentCheck(int nRows, int& nRows2)
//------------------------------------------------------------------------------
int ColumnBufferManager::extendTokenColumn()
{
boost::mutex::scoped_lock lock(fColInfo->colMutex());
std::unique_lock lock(fColInfo->colMutex());
return fColInfo->extendColumn(false);
}

View File

@@ -26,8 +26,9 @@
#pragma once
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <map>
#include <mutex>
#include <condition_variable>
#include <list>
#include <vector>
@@ -223,17 +224,17 @@ class ColumnBufferManager
/** @brief Condition variable for threads waiting for resize to complete
*/
boost::condition fResizeInProgress;
std::condition_variable fResizeInProgress;
/** @brief Condition variable for threads waiting for all buffer sections
* to be released
*/
boost::condition fBufInUse;
std::condition_variable fBufInUse;
/** @brief Condition variable for threads who have arrived out-of-
* sequence with respect to their row-id
*/
boost::condition fOutOfSequence;
std::condition_variable fOutOfSequence;
/** @brief Width of the column
*/

View File

@@ -53,7 +53,7 @@ namespace WriteEngine
// @bug 4806: Added bIsNewExtent; Set CP min/max for very first extent on a PM
void ColExtInf::addFirstEntry(RID lastInputRow, BRM::LBID_t lbid, bool bIsNewExtent)
{
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
ColExtInfEntry entry(lbid, bIsNewExtent);
fMap[lastInputRow] = entry;
@@ -70,7 +70,7 @@ template <typename T>
void ColExtInf::addOrUpdateEntryTemplate(RID lastInputRow, T minVal, T maxVal, ColDataType colDataType,
int width)
{
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
RowExtMap::iterator iter = fMap.find(lastInputRow);
@@ -152,7 +152,7 @@ void ColExtInf::addOrUpdateEntryTemplate(RID lastInputRow, T minVal, T maxVal, C
//------------------------------------------------------------------------------
int ColExtInf::updateEntryLbid(BRM::LBID_t startLbid)
{
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
// fPendingExtentRows is a Set carrying a sorted list of the last Row
// number in each extent. We should be allocating/assigning LBIDs in
@@ -190,7 +190,7 @@ void ColExtInf::getCPInfoForBRM(JobColumn column, BRMReporter& brmReporter)
{
bool bIsChar = ((column.weType == WriteEngine::WR_CHAR) && (column.colType != COL_TYPE_DICT));
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
RowExtMap::const_iterator iter = fMap.begin();
@@ -280,7 +280,7 @@ void ColExtInf::getCPInfoForBRM(JobColumn column, BRMReporter& brmReporter)
//------------------------------------------------------------------------------
void ColExtInf::print(const JobColumn& column)
{
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
bool bIsChar = ((column.weType == WriteEngine::WR_CHAR) && (column.colType != COL_TYPE_DICT));
std::ostringstream oss;
oss << "ColExtInf Map for OID: " << fColOid;

View File

@@ -31,7 +31,8 @@
#include <stdint.h>
#include <set>
#include <tr1/unordered_map>
#include <boost/thread/mutex.hpp>
#include <map>
#include <mutex>
#include "brmtypes.h"
#include "we_type.h"
@@ -242,7 +243,7 @@ class ColExtInf : public ColExtInfBase
private:
OID fColOid; // Column OID for the relevant extents
Log* fLog; // Log used for debug logging
boost::mutex fMapMutex; // protects unordered map access
std::mutex fMapMutex; // protects unordered map access
std::set<RID> fPendingExtentRows; // list of lastInputRow entries that
// are awaiting an LBID assignment.

View File

@@ -118,7 +118,7 @@ int ColumnAutoInc::init(const std::string& fullTableName, ColumnInfo* colInfo)
//------------------------------------------------------------------------------
void ColumnAutoInc::initNextAutoInc(uint64_t nextValue)
{
boost::mutex::scoped_lock lock(fAutoIncMutex);
std::unique_lock lock(fAutoIncMutex);
// nextValue is unusable if < 1; probably means we already reached max value
if (nextValue < 1)
@@ -227,7 +227,7 @@ uint64_t ColumnAutoInc::getNextAutoIncToSave()
{
uint64_t nextValue = AUTOINCR_SATURATED;
boost::mutex::scoped_lock lock(fAutoIncMutex);
std::unique_lock lock(fAutoIncMutex);
// nextValue is returned as -1 if we reached max value
if (fAutoIncLastValue < fMaxIntSat)
@@ -314,7 +314,7 @@ ColumnAutoIncJob::~ColumnAutoIncJob()
/* virtual */
int ColumnAutoIncJob::reserveNextRange(uint32_t autoIncCount, uint64_t& nextValue)
{
boost::mutex::scoped_lock lock(fAutoIncMutex);
std::unique_lock lock(fAutoIncMutex);
if ((fMaxIntSat - autoIncCount) < fAutoIncLastValue)
{
@@ -376,7 +376,7 @@ int ColumnAutoIncIncremental::reserveNextRange(uint32_t autoIncCount, uint64_t&
// processing AI ranges out of order, so we don't arbitrarily
// update fAutoIncLastValue. We only update it if the range in question
// exceeds the current value for fAutoIncLastValue.
boost::mutex::scoped_lock lock(fAutoIncMutex);
std::unique_lock lock(fAutoIncMutex);
if (autoIncLastValue > fAutoIncLastValue)
fAutoIncLastValue = autoIncLastValue;

View File

@@ -26,7 +26,8 @@
#pragma once
#include <string>
#include <boost/thread/mutex.hpp>
#include <map>
#include <mutex>
#include <boost/scoped_ptr.hpp>
#include "we_type.h"
@@ -74,7 +75,7 @@ class ColumnAutoInc
int getNextValueFromSysCat(uint64_t& nextValue);
Log* fLog; // import log file
boost::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue
std::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue
uint64_t fAutoIncLastValue; // Tracks latest autoincrement value used
uint64_t fMaxIntSat; // Maximum saturation value
std::string fTableName; // Full table name (schema.table) for AI column

View File

@@ -301,7 +301,7 @@ int ColumnInfo::createDelayedFileIfNeeded(const std::string& tableName)
// No sense in waiting for a fColMutex lock, when 99.99% of the time,
// all we need to do is check fDelayedFileCreation, see that it's value
// is INITIAL_DBFILE_STAT_FILE_EXISTS, and exit the function.
boost::mutex::scoped_lock lock(fDelayedFileCreateMutex);
std::unique_lock lock(fDelayedFileCreateMutex);
if (fDelayedFileCreation == INITIAL_DBFILE_STAT_FILE_EXISTS)
return NO_ERROR;
@@ -323,7 +323,7 @@ int ColumnInfo::createDelayedFileIfNeeded(const std::string& tableName)
// fDelayedFileCreateMutex lock might suffice, but better to explicitly
// lock fColMutex since we are modifying attributes that we typically
// change within the scope of a fColMutex lock.
boost::mutex::scoped_lock lock2(fColMutex);
std::unique_lock lock2(fColMutex);
uint16_t dbRoot = curCol.dataFile.fDbRoot;
uint32_t partition = curCol.dataFile.fPartition;
@@ -1116,7 +1116,7 @@ int ColumnInfo::finishParsing()
// thread working on this column. But, we use the mutex to insure that
// we see the latest state that may have been set by another parsing thread
// working with the same column.
boost::mutex::scoped_lock lock(fColMutex);
std::unique_lock lock(fColMutex);
// Force the flushing of remaining data in the output buffer
if (fColBufferMgr)
@@ -1165,7 +1165,7 @@ int ColumnInfo::finishParsing()
//------------------------------------------------------------------------------
void ColumnInfo::getBRMUpdateInfo(BRMReporter& brmReporter)
{
boost::mutex::scoped_lock lock(fColMutex);
std::unique_lock lock(fColMutex);
// Useful for debugging
// printCPInfo(column);
@@ -1495,7 +1495,7 @@ int ColumnInfo::finishAutoInc()
//------------------------------------------------------------------------------
void ColumnInfo::getSegFileInfo(DBRootExtentInfo& fileInfo)
{
boost::mutex::scoped_lock lock(fColMutex);
std::unique_lock lock(fColMutex);
fileInfo.fDbRoot = curCol.dataFile.fDbRoot;
fileInfo.fPartition = curCol.dataFile.fPartition;
fileInfo.fSegment = curCol.dataFile.fSegment;
@@ -1692,7 +1692,7 @@ int ColumnInfo::updateDctnryStore(char* buf, ColPosPair** pos, const int totalRo
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
boost::mutex::scoped_lock lock(fDictionaryMutex);
std::unique_lock lock(fDictionaryMutex);
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif

View File

@@ -33,7 +33,8 @@
#include "we_colextinf.h"
#include "we_dctnrycompress.h"
#include <boost/thread/mutex.hpp>
#include <map>
#include <mutex>
#include <boost/scoped_ptr.hpp>
#include <sys/time.h>
#include <vector>
@@ -377,7 +378,7 @@ class ColumnInfo : public WeUIDGID
* This was formerly the fMgrMutex in ColumnBufferManager. See comments
* that precede this class definition for more information.
*/
boost::mutex& colMutex();
std::mutex& colMutex();
/** @brief Get number of rows per extent
*/
@@ -424,10 +425,10 @@ class ColumnInfo : public WeUIDGID
// Protected Data Members
//--------------------------------------------------------------------------
boost::mutex fDictionaryMutex; // Mutex for dicionary updates
boost::mutex fColMutex; // Mutex for column changes
boost::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue
boost::mutex fDelayedFileCreateMutex; // Manage delayed file check/create
std::mutex fDictionaryMutex; // Mutex for dicionary updates
std::mutex fColMutex; // Mutex for column changes
std::mutex fAutoIncMutex; // Mutex to manage fAutoIncLastValue
std::mutex fDelayedFileCreateMutex; // Manage delayed file check/create
Log* fLog; // Object used for logging
// Blocks to skip at start of bulk load, if starting file has to be created
@@ -500,7 +501,7 @@ inline void ColumnInfo::setUIDGID(const uid_t p_uid, const gid_t p_gid)
colOp->setUIDGID(this);
}
inline boost::mutex& ColumnInfo::colMutex()
inline std::mutex& ColumnInfo::colMutex()
{
return fColMutex;
}

View File

@@ -67,7 +67,7 @@ ExtentStripeAlloc::~ExtentStripeAlloc()
//------------------------------------------------------------------------------
void ExtentStripeAlloc::addColumn(OID colOID, int colWidth)
{
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
fColOIDs.push_back(colOID);
fColWidths.push_back(colWidth);
@@ -108,7 +108,7 @@ int ExtentStripeAlloc::allocateExtent(OID oid, uint16_t dbRoot,
std::pair<AllocExtMapIter, AllocExtMapIter> iters;
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
// Search for an extent matching the requested OID and DBRoot.
// We also filter by selecting the lowest stripe number. See
@@ -257,7 +257,7 @@ int ExtentStripeAlloc::allocateExtent(OID oid, uint16_t dbRoot,
//------------------------------------------------------------------------------
void ExtentStripeAlloc::print()
{
boost::mutex::scoped_lock lock(fMapMutex);
std::unique_lock lock(fMapMutex);
std::ostringstream oss;
oss << "Current Pending Extents for table " << fTableOID << ":";

View File

@@ -30,7 +30,8 @@
#include <vector>
#include <tr1/unordered_map>
#include <boost/thread/mutex.hpp>
#include <map>
#include <mutex>
#include "we_type.h"
#include "brmtypes.h"
@@ -156,7 +157,7 @@ class ExtentStripeAlloc
OID fTableOID; // Table extents to be allocated
Log* fLog; // Log used for debug logging
unsigned int fStripeCount; // Extent "stripe" counter
boost::mutex fMapMutex; // protects unordered map access
std::mutex fMapMutex; // protects unordered map access
std::vector<OID> fColOIDs; // Vector of column OIDs
std::vector<int> fColWidths; // Widths associated with fColOIDs

View File

@@ -245,7 +245,7 @@ void TableInfo::closeOpenDbFiles()
//------------------------------------------------------------------------------
bool TableInfo::lockForRead(const int& locker)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
if (fLocker == -1)
{
@@ -278,7 +278,7 @@ int TableInfo::readTableData()
if (rc != NO_ERROR)
{
// Mark the table status as error and exit.
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
return rc;
}
@@ -314,7 +314,7 @@ int TableInfo::readTableData()
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
its.msg_type = ImportTeleStats::IT_TERM;
@@ -350,7 +350,7 @@ int TableInfo::readTableData()
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
its.msg_type = ImportTeleStats::IT_TERM;
@@ -437,7 +437,7 @@ int TableInfo::readTableData()
// need to exit.
// mark the table status as error and exit.
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
@@ -491,7 +491,7 @@ int TableInfo::readTableData()
// number of errors > maximum allowed. hence return error.
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
@@ -518,7 +518,7 @@ int TableInfo::readTableData()
#ifdef PROFILE
Stats::startReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
#endif
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
Stats::startReadEvent(WE_STATS_COMPLETING_READ);
@@ -629,7 +629,7 @@ void TableInfo::writeErrorList(const std::vector<std::pair<RID, std::string> >*
if ((errorRowsCount > 0) || (errorDatRowsCount > 0) || (bCloseFile))
{
boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
std::unique_lock lock(fErrorRptInfoMutex);
if ((errorRowsCount > 0) || (bCloseFile))
writeErrReason(errorRows, bCloseFile);
@@ -674,7 +674,7 @@ int TableInfo::parseColumn(const int& columnId, const int& bufferId, double& pro
//------------------------------------------------------------------------------
int TableInfo::setParseComplete(const int& columnId, const int& bufferId, double processingTime)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
// Check table status in case race condition results in this function
// being called after fStatusTI was set to ERR by another thread.
@@ -1051,7 +1051,7 @@ int TableInfo::finishBRM()
std::vector<std::string>* errFiles = 0;
std::vector<std::string>* badFiles = 0;
{
boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
std::unique_lock lock(fErrorRptInfoMutex);
errFiles = &fErrFiles;
badFiles = &fBadFiles;
}
@@ -1071,7 +1071,7 @@ int TableInfo::finishBRM()
//------------------------------------------------------------------------------
void TableInfo::setParseError()
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
}
@@ -1084,7 +1084,7 @@ void TableInfo::setParseError()
// Added report parm and couts below.
int TableInfo::getColumnForParse(const int& id, const int& bufferId, bool report)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
double maxTime = 0;
int columnId = -1;
@@ -1360,7 +1360,7 @@ void TableInfo::closeTableFile()
// Added report parm and couts below.
bool TableInfo::isBufferAvailable(bool report)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
Status bufferStatus = fBuffers[fCurrentReadBuffer].getStatusBLB();
if ((bufferStatus == WriteEngine::PARSE_COMPLETE) || (bufferStatus == WriteEngine::NEW))

View File

@@ -26,7 +26,8 @@
#include <utility>
#include <vector>
#include <boost/thread/mutex.hpp>
#include <map>
#include <mutex>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/uuid/uuid.hpp>
@@ -105,9 +106,9 @@ class TableInfo : public WeUIDGID
* getting/setting the status of the BulkLoadBuffer objects, so
* fSyncUpdatesTI is also used to set/get the BulkLoadBuffer status.
*/
boost::mutex fSyncUpdatesTI;
std::mutex fSyncUpdatesTI;
boost::mutex fErrorRptInfoMutex; // Used to synhronize access to
std::mutex fErrorRptInfoMutex; // Used to synhronize access to
// fRejectDataFile & fRejectErrFile
int fLocker; // Read thread id reading this table
std::vector<std::string> fLoadFileList; // Load files
@@ -540,7 +541,7 @@ inline bool TableInfo::isTableLocked()
inline void TableInfo::markTableComplete()
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
std::unique_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::PARSE_COMPLETE;
}

View File

@@ -166,7 +166,7 @@ void BulkLoad::read(int id)
//------------------------------------------------------------------------------
int BulkLoad::lockTableForRead(int id)
{
boost::mutex::scoped_lock lock(fReadMutex);
std::unique_lock lock(fReadMutex);
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
@@ -315,7 +315,7 @@ void BulkLoad::parse(int id)
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
#endif
boost::mutex::scoped_lock lock(fParseMutex);
std::unique_lock lock(fParseMutex);
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
Stats::startParseEvent(WE_STATS_COMPLETING_PARSE);
@@ -417,7 +417,7 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
// Check if the currentParseBuffer is available for parsing
// If yes, put the locker and fill the tableId and columnId
// else, go to the next table for checking if a column is available
boost::mutex::scoped_lock lock(fParseMutex);
std::unique_lock lock(fParseMutex);
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
@@ -498,7 +498,7 @@ void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex)
{
if (lockParseMutex)
{
boost::mutex::scoped_lock lock(fParseMutex);
std::unique_lock lock(fParseMutex);
fTableInfo[tableId].setParseError();
}
else