1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

fix(memory leaks): MCOL-5791 - get rid of memory leaks in plugin code

There were numerous memory leaks in plugin's code and associated code.
During typical run of MTR tests it leaked around 65 megabytes of
objects. As a result they may severely affect long-lived connections.

This patch fixes (almost) all leaks found in the plugin. The exceptions
are two leaks associated with SHOW CREATE TABLE columnstore_table and
getting information of columns of columnstore-handled table. These
should be fixed on the server side and work is on the way.
This commit is contained in:
Serguey Zefirov
2024-09-30 14:50:35 +03:00
committed by Sergey Zefirov
parent 6445f4dff3
commit 38fd96a663
31 changed files with 472 additions and 229 deletions

View File

@ -70,7 +70,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix
// extern WriteEngine::BRMWrapper* brmWrapperPtr;
namespace WriteEngine
{
/* static */ boost::ptr_vector<TableInfo> BulkLoad::fTableInfo;
/* static */ std::vector<std::shared_ptr<TableInfo>> BulkLoad::fTableInfo;
/* static */ boost::mutex* BulkLoad::fDDLMutex = 0;
/* static */ const std::string BulkLoad::DIR_BULK_JOB("job");
@ -519,7 +519,7 @@ void BulkLoad::spawnWorkers()
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tableInfo)
{
int rc = NO_ERROR, minWidth = 9999; // give a big number
HWM minHWM = 999999; // rp 9/25/07 Bug 473
@ -701,7 +701,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
<< "Table-" << job.jobTableList[tableNo].tblName << "...";
fLog.logMsg(oss11.str(), MSGLVL_INFO2);
rc = saveBulkRollbackMetaData(job, tableInfo, segFileInfo, dbRootHWMInfoColVec);
rc = saveBulkRollbackMetaData(job, tableInfo.get(), segFileInfo, dbRootHWMInfoColVec);
if (rc != NO_ERROR)
{
@ -733,10 +733,10 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
if (job.jobTableList[tableNo].colList[i].compressionType)
info = new ColumnInfoCompressed(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker,
tableInfo);
tableInfo.get());
// tableInfo->rbMetaWriter());
else
info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo);
info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo.get());
if (pwd)
info->setUIDGID(pwd->pw_uid, pwd->pw_gid);
@ -840,7 +840,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
if (rc)
return rc;
fTableInfo.push_back(tableInfo);
fTableInfo.push_back(std::shared_ptr<TableInfo>(tableInfo));
return NO_ERROR;
}
@ -1039,19 +1039,19 @@ int BulkLoad::processJob()
//--------------------------------------------------------------------------
// Validate the existence of the import data files
//--------------------------------------------------------------------------
std::vector<TableInfo*> tables;
std::vector<std::shared_ptr<TableInfo>> tables;
for (i = 0; i < curJob.jobTableList.size(); i++)
{
TableInfo* tableInfo = new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid,
curJob.jobTableList[i].tblName, fKeepRbMetaFiles);
std::shared_ptr<TableInfo> tableInfo(new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid,
curJob.jobTableList[i].tblName, fKeepRbMetaFiles));
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
tableInfo->setBulkLoadMode(fBulkMode, fBRMRptFileName);
tableInfo->setErrorDir(string(getErrorDir()));
tableInfo->setTruncationAsError(getTruncationAsError());
rc = manageImportDataFileList(curJob, i, tableInfo);
rc = manageImportDataFileList(curJob, i, tableInfo.get());
if (rc != NO_ERROR)
{
@ -1515,7 +1515,7 @@ int BulkLoad::rollbackLockedTables()
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (fTableInfo[i].isTableLocked())
if (fTableInfo[i]->isTableLocked())
{
lockedTableFound = true;
break;
@ -1529,10 +1529,10 @@ int BulkLoad::rollbackLockedTables()
// Report the tables that were successfully loaded
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (!fTableInfo[i].isTableLocked())
if (!fTableInfo[i]->isTableLocked())
{
ostringstream oss;
oss << "Table " << fTableInfo[i].getTableName() << " was successfully loaded. ";
oss << "Table " << fTableInfo[i]->getTableName() << " was successfully loaded. ";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
}
@ -1540,24 +1540,24 @@ int BulkLoad::rollbackLockedTables()
// Report the tables that were not successfully loaded
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (fTableInfo[i].isTableLocked())
if (fTableInfo[i]->isTableLocked())
{
if (fTableInfo[i].hasProcessingBegun())
if (fTableInfo[i]->hasProcessingBegun())
{
ostringstream oss;
oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")"
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
<< " was not successfully loaded. Rolling back.";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
else
{
ostringstream oss;
oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")"
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
<< " did not start loading. No rollback necessary.";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
rc = rollbackLockedTable(fTableInfo[i]);
rc = rollbackLockedTable(*fTableInfo[i]);
if (rc != NO_ERROR)
{
@ -1623,9 +1623,9 @@ bool BulkLoad::addErrorMsg2BrmUpdater(const std::string& tablename, const ostrin
for (int tableId = 0; tableId < size; tableId++)
{
if (fTableInfo[tableId].getTableName() == tablename)
if (fTableInfo[tableId]->getTableName() == tablename)
{
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
return true;
}
}

View File

@ -77,7 +77,7 @@ class BulkLoad : public FileOp
/**
* @brief Pre process jobs to validate and assign values to the job structure
*/
int preProcess(Job& job, int tableNo, TableInfo* tableInfo);
int preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tableInfo);
/**
* @brief Print job information
@ -194,7 +194,7 @@ class BulkLoad : public FileOp
std::string fAlternateImportDir; // Alternate bulk import directory
std::string fErrorDir; // Opt. where error records record
std::string fProcessName; // Application process name
static boost::ptr_vector<TableInfo> fTableInfo; // Vector of Table information
static std::vector<std::shared_ptr<TableInfo>> fTableInfo; // Vector of Table information
int fNoOfParseThreads; // Number of parse threads
int fNoOfReadThreads; // Number of read threads
boost::thread_group fReadThreads; // Read thread group

View File

@ -1670,7 +1670,7 @@ int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo)
// not aux column
if (isNonAuxColumn)
{
columnData = fParquetBatch->column(columnId);
columnData = fParquetBatchParser->column(columnId);
}
else // aux column
{
@ -1789,6 +1789,13 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
int width = column.width;
int bufIndex = 1;
if (columnData->data()->buffers.size() < 2)
{
bufIndex = 0;
}
//--------------------------------------------------------------------------
// Parse based on column data type
//--------------------------------------------------------------------------
@ -1799,7 +1806,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
//----------------------------------------------------------------------
case WriteEngine::WR_FLOAT:
{
const float* dataPtr = columnData->data()->GetValues<float>(1);
const float* dataPtr = columnData->data()->GetValues<float>(bufIndex);
for (uint32_t i = 0; i < fTotalReadRowsParser; i++)
{
@ -1855,7 +1862,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
//----------------------------------------------------------------------
case WriteEngine::WR_DOUBLE:
{
const double* dataPtr = columnData->data()->GetValues<double>(1);
const double* dataPtr = columnData->data()->GetValues<double>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2005,7 +2012,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
case WriteEngine::WR_SHORT:
{
long long origVal;
const short* dataPtr = columnData->data()->GetValues<short>(1);
const short* dataPtr = columnData->data()->GetValues<short>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2085,7 +2092,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
case WriteEngine::WR_USHORT:
{
int64_t origVal = 0;
const uint16_t* dataPtr = columnData->data()->GetValues<uint16_t>(1);
const uint16_t* dataPtr = columnData->data()->GetValues<uint16_t>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2161,7 +2168,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
// if use int8_t here, it will take 8 bool value of parquet array
std::shared_ptr<arrow::BooleanArray> boolArray =
std::static_pointer_cast<arrow::BooleanArray>(columnData);
const int8_t* dataPtr = columnData->data()->GetValues<int8_t>(1);
const int8_t* dataPtr = columnData->data()->GetValues<int8_t>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2250,7 +2257,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
// special handling for aux column to fix segmentation error
if (columnData->type_id() != arrow::Type::type::NA)
{
const uint8_t* dataPtr = columnData->data()->GetValues<uint8_t>(1);
const uint8_t* dataPtr = columnData->data()->GetValues<uint8_t>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2382,7 +2389,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
if (column.dataType != CalpontSystemCatalog::DATETIME &&
column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME)
{
const long long* dataPtr = columnData->data()->GetValues<long long>(1);
const long long* dataPtr = columnData->data()->GetValues<long long>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2740,7 +2747,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
std::static_pointer_cast<arrow::Decimal128Array>(columnData);
std::shared_ptr<arrow::DecimalType> fType =
std::static_pointer_cast<arrow::DecimalType>(decimalArray->type());
const int128_t* dataPtr = decimalArray->data()->GetValues<int128_t>(1);
const int128_t* dataPtr = decimalArray->data()->GetValues<int128_t>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2803,7 +2810,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
//----------------------------------------------------------------------
case WriteEngine::WR_ULONGLONG:
{
const uint64_t* dataPtr = columnData->data()->GetValues<uint64_t>(1);
const uint64_t* dataPtr = columnData->data()->GetValues<uint64_t>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2869,7 +2876,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
case WriteEngine::WR_UINT:
{
int64_t origVal;
const uint32_t* dataPtr = columnData->data()->GetValues<uint32_t>(1);
const uint32_t* dataPtr = columnData->data()->GetValues<uint32_t>(bufIndex);
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
{
@ -2947,7 +2954,7 @@ void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, un
{
if (column.dataType != CalpontSystemCatalog::DATE)
{
const int* dataPtr = columnData->data()->GetValues<int>(1);
const int* dataPtr = columnData->data()->GetValues<int>(bufIndex);
long long origVal;
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
@ -3724,6 +3731,7 @@ int BulkLoadBuffer::fillFromFileParquet(RID& totalReadRows, RID& correctTotalRow
try
{
fParquetBatch.reset();
PARQUET_THROW_NOT_OK(fParquetReader->ReadNext(&fParquetBatch));
fStartRow = correctTotalRows;
fStartRowForLogging = totalReadRows;

View File

@ -140,7 +140,7 @@ int ExtentStripeAlloc::allocateExtent(OID oid, uint16_t dbRoot,
startLbid = extentEntryIter->second.fStartLbid;
allocSize = extentEntryIter->second.fAllocSize;
hwm = extentEntryIter->second.fHwm;
errMsg = extentEntryIter->second.fStatusMsg;
errMsg = *extentEntryIter->second.fStatusMsg;
retStatus = extentEntryIter->second.fStatus;
fMap.erase(extentEntryIter);
@ -274,7 +274,7 @@ void ExtentStripeAlloc::print()
<< "; seg: " << iter->second.fSegNum << "; lbid: " << iter->second.fStartLbid
<< "; size: " << iter->second.fAllocSize << "; hwm: " << iter->second.fHwm
<< "; stripe: " << iter->second.fStripeKey << "; stat: " << iter->second.fStatus
<< "; msg: " << iter->second.fStatusMsg;
<< "; msg: " << *iter->second.fStatusMsg;
}
}
else

View File

@ -46,21 +46,6 @@ class Log;
class AllocExtEntry
{
public:
// Default constructor
AllocExtEntry()
: fOid(0)
, fColWidth(0)
, fDbRoot(0)
, fPartNum(0)
, fSegNum(0)
, fStartLbid(0)
, fAllocSize(0)
, fHwm(0)
, fStatus(NO_ERROR)
, fStripeKey(0)
{
}
// Used to create entry for an existing extent we are going to add data to.
AllocExtEntry(OID& oid, int colWidth, uint16_t dbRoot, uint32_t partNum, uint16_t segNum,
BRM::LBID_t startLbid, int allocSize, HWM hwm, int status, const std::string& statusMsg,
@ -74,22 +59,22 @@ class AllocExtEntry
, fAllocSize(allocSize)
, fHwm(hwm)
, fStatus(status)
, fStatusMsg(statusMsg)
, fStatusMsg(new std::string(statusMsg))
, fStripeKey(stripeKey)
{
}
OID fOid; // column OID
int fColWidth; // colum width (in bytes)
uint16_t fDbRoot; // DBRoot of allocated extent
uint32_t fPartNum; // Partition number of allocated extent
uint16_t fSegNum; // Segment number of allocated extent
BRM::LBID_t fStartLbid; // Starting LBID of allocated extent
int fAllocSize; // Number of allocated LBIDS
HWM fHwm; // Starting fbo or hwm of allocated extent
int fStatus; // Status of extent allocation
std::string fStatusMsg; // Status msg of extent allocation
unsigned int fStripeKey; // "Stripe" identifier for this extent
OID fOid = 0; // column OID
int fColWidth = 0; // colum width (in bytes)
uint16_t fDbRoot = 0; // DBRoot of allocated extent
uint32_t fPartNum = 0; // Partition number of allocated extent
uint16_t fSegNum = 0; // Segment number of allocated extent
BRM::LBID_t fStartLbid = 0; // Starting LBID of allocated extent
int fAllocSize = 0; // Number of allocated LBIDS
HWM fHwm = 0; // Starting fbo or hwm of allocated extent
int fStatus = NO_ERROR; // Status of extent allocation
std::shared_ptr<std::string> fStatusMsg{new std::string()}; // Status msg of extent allocation
unsigned int fStripeKey = 0; // "Stripe" identifier for this extent
};
//------------------------------------------------------------------------------

View File

@ -182,7 +182,7 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN
TableInfo::~TableInfo()
{
fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
freeProcessingBuffers();
//freeProcessingBuffers();
}
//------------------------------------------------------------------------------

View File

@ -95,16 +95,16 @@ void BulkLoad::read(int id)
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
#endif
int rc = fTableInfo[tableId].readTableData();
int rc = fTableInfo[tableId]->readTableData();
if (rc != NO_ERROR)
{
// Error occurred while reading the data, break out of loop.
BulkStatus::setJobStatus(EXIT_FAILURE);
ostringstream oss;
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". Terminating this job.";
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
break;
}
@ -117,7 +117,7 @@ void BulkLoad::read(int id)
if (tableId != -1)
oss << "Bulkload Read (thread " << id << ") Stopped reading Table "
<< fTableInfo[tableId].getTableName() << ". " << ex.what();
<< fTableInfo[tableId]->getTableName() << ". " << ex.what();
else
oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what();
@ -129,14 +129,14 @@ void BulkLoad::read(int id)
ostringstream oss;
if (tableId != -1)
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". " << ex.what() << ". Terminating this job.";
else
oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what()
<< ". Terminating this job.";
if (tableId != -1)
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
}
@ -146,13 +146,13 @@ void BulkLoad::read(int id)
ostringstream oss;
if (tableId != -1)
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". Terminating this job.";
else
oss << "Bulkload Read (thread " << id << ") Failed for Table. Terminating this job.";
if (tableId != -1)
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
}
@ -170,7 +170,7 @@ int BulkLoad::lockTableForRead(int id)
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
if (fTableInfo[i].lockForRead(id))
if (fTableInfo[i]->lockForRead(id))
return i;
}
@ -292,16 +292,16 @@ void BulkLoad::parse(int id)
// Have obtained the table and column for parsing.
// Start parsing the column data.
double processingTime;
int rc = fTableInfo[tableId].parseColumn(columnId, myParseBuffer, processingTime);
int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime);
if (rc != NO_ERROR)
{
// Error occurred while parsing the data, break out of loop.
BulkStatus::setJobStatus(EXIT_FAILURE);
ostringstream oss;
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< " during parsing. Terminating this job.";
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
setParseErrorOnTable(tableId, true);
@ -310,7 +310,7 @@ void BulkLoad::parse(int id)
// Parsing is complete. Acquire the mutex and increment
// the parsingComplete value for the buffer
if (fTableInfo[tableId].getStatusTI() != WriteEngine::ERR)
if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR)
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
@ -320,15 +320,15 @@ void BulkLoad::parse(int id)
Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
Stats::startParseEvent(WE_STATS_COMPLETING_PARSE);
#endif
rc = fTableInfo[tableId].setParseComplete(columnId, myParseBuffer, processingTime);
rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime);
if (rc != NO_ERROR)
{
BulkStatus::setJobStatus(EXIT_FAILURE);
ostringstream oss;
oss << "Bulkload Parse (thread " << id << ") Failed for Table "
<< fTableInfo[tableId].getTableName() << " during parse completion. Terminating this job.";
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
<< fTableInfo[tableId]->getTableName() << " during parse completion. Terminating this job.";
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
setParseErrorOnTable(tableId, false);
@ -349,7 +349,7 @@ void BulkLoad::parse(int id)
if (tableId != -1)
{
oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table "
<< fTableInfo[tableId].getTableName() << ". " << ex.what();
<< fTableInfo[tableId]->getTableName() << ". " << ex.what();
setParseErrorOnTable(tableId, true);
}
@ -367,11 +367,11 @@ void BulkLoad::parse(int id)
if (tableId != -1)
{
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". " << ex.what() << ". Terminating this job.";
setParseErrorOnTable(tableId, true);
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
}
else
{
@ -388,11 +388,11 @@ void BulkLoad::parse(int id)
if (tableId != -1)
{
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". Terminating this job.";
setParseErrorOnTable(tableId, true);
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
}
else
{
@ -421,10 +421,10 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
if (fTableInfo[i].getStatusTI() == WriteEngine::PARSE_COMPLETE)
if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE)
continue;
int currentParseBuffer = fTableInfo[i].getCurrentParseBuffer();
int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer();
myParseBuffer = currentParseBuffer;
do
@ -434,7 +434,7 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
{
ostringstream oss;
std::string bufStatusStr;
Status stat = fTableInfo[i].getStatusTI();
Status stat = fTableInfo[i]->getStatusTI();
ColumnInfo::convertStatusToString(stat, bufStatusStr);
oss << " - " << pthread_self() <<
":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")";
@ -452,13 +452,13 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
// @bug2099-
// get a buffer and column to parse if available.
if ((columnId = fTableInfo[i].getColumnForParse(thrdId, myParseBuffer, report)) != -1)
if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1)
{
tableId = i;
return true;
}
myParseBuffer = (myParseBuffer + 1) % fTableInfo[i].getNumberOfBuffers();
myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers();
} while (myParseBuffer != currentParseBuffer);
}
@ -479,10 +479,10 @@ bool BulkLoad::allTablesDone(Status status)
{
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
if (fTableInfo[i].getStatusTI() == WriteEngine::ERR)
if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR)
return true;
if (fTableInfo[i].getStatusTI() != status)
if (fTableInfo[i]->getStatusTI() != status)
return false;
}
@ -499,11 +499,11 @@ void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex)
if (lockParseMutex)
{
boost::mutex::scoped_lock lock(fParseMutex);
fTableInfo[tableId].setParseError();
fTableInfo[tableId]->setParseError();
}
else
{
fTableInfo[tableId].setParseError();
fTableInfo[tableId]->setParseError();
}
}

View File

@ -57,6 +57,7 @@ using namespace execplan;
namespace WriteEngine
{
BRMWrapper* volatile BRMWrapper::m_instance = NULL;
std::atomic<bool> BRMWrapper::finishReported(false);
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
boost::mutex BRMWrapper::m_instanceCreateMutex;
@ -750,6 +751,10 @@ uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId)
void BRMWrapper::finishCpimportJob(uint32_t jobId)
{
if (finishReported.exchange(true)) // get old and set to true; if old is true, do nothing.
{
return;
}
blockRsltnMgrPtr->finishCpimportJob(jobId);
}

View File

@ -23,6 +23,7 @@
#pragma once
#include <atomic>
#include <iostream>
#include <memory>
#include <vector>
@ -474,6 +475,8 @@ class BRMWrapper : public WEObj
static IDBDataFile* m_curVBFile;
BRM::DBRM* blockRsltnMgrPtr;
EXPORT static std::atomic<bool> finishReported;
};
//------------------------------------------------------------------------------