1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

fix(memory leaks): MCOL-5791 - get rid of memory leaks in plugin code (#3365)

There were numerous memory leaks in plugin's code and associated code.
During typical run of MTR tests it leaked around 65 megabytes of
objects. As a result they may severely affect long-lived connections.

This patch fixes (almost) all leaks found in the plugin. The exceptions
are two leaks associated with SHOW CREATE TABLE columnstore_table and
getting information of columns of columnstore-handled table. These
should be fixed on the server side and work is on the way.
This commit is contained in:
Sergey Zefirov
2024-12-06 12:04:55 +03:00
committed by GitHub
parent aa4bbc0152
commit 3bcc2e2fda
30 changed files with 450 additions and 217 deletions

View File

@ -70,7 +70,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix
// extern WriteEngine::BRMWrapper* brmWrapperPtr;
namespace WriteEngine
{
/* static */ boost::ptr_vector<TableInfo> BulkLoad::fTableInfo;
/* static */ std::vector<std::shared_ptr<TableInfo>> BulkLoad::fTableInfo;
/* static */ boost::mutex* BulkLoad::fDDLMutex = 0;
/* static */ const std::string BulkLoad::DIR_BULK_JOB("job");
@ -519,7 +519,7 @@ void BulkLoad::spawnWorkers()
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tableInfo)
{
int rc = NO_ERROR, minWidth = 9999; // give a big number
HWM minHWM = 999999; // rp 9/25/07 Bug 473
@ -701,7 +701,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
<< "Table-" << job.jobTableList[tableNo].tblName << "...";
fLog.logMsg(oss11.str(), MSGLVL_INFO2);
rc = saveBulkRollbackMetaData(job, tableInfo, segFileInfo, dbRootHWMInfoColVec);
rc = saveBulkRollbackMetaData(job, tableInfo.get(), segFileInfo, dbRootHWMInfoColVec);
if (rc != NO_ERROR)
{
@ -733,10 +733,10 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
if (job.jobTableList[tableNo].colList[i].compressionType)
info = new ColumnInfoCompressed(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker,
tableInfo);
tableInfo.get());
// tableInfo->rbMetaWriter());
else
info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo);
info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo.get());
if (pwd)
info->setUIDGID(pwd->pw_uid, pwd->pw_gid);
@ -840,7 +840,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, TableInfo* tableInfo)
if (rc)
return rc;
fTableInfo.push_back(tableInfo);
fTableInfo.push_back(std::shared_ptr<TableInfo>(tableInfo));
return NO_ERROR;
}
@ -1039,19 +1039,19 @@ int BulkLoad::processJob()
//--------------------------------------------------------------------------
// Validate the existence of the import data files
//--------------------------------------------------------------------------
std::vector<TableInfo*> tables;
std::vector<std::shared_ptr<TableInfo>> tables;
for (i = 0; i < curJob.jobTableList.size(); i++)
{
TableInfo* tableInfo = new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid,
curJob.jobTableList[i].tblName, fKeepRbMetaFiles);
std::shared_ptr<TableInfo> tableInfo(new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid,
curJob.jobTableList[i].tblName, fKeepRbMetaFiles));
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
tableInfo->setBulkLoadMode(fBulkMode, fBRMRptFileName);
tableInfo->setErrorDir(string(getErrorDir()));
tableInfo->setTruncationAsError(getTruncationAsError());
rc = manageImportDataFileList(curJob, i, tableInfo);
rc = manageImportDataFileList(curJob, i, tableInfo.get());
if (rc != NO_ERROR)
{
@ -1495,7 +1495,7 @@ int BulkLoad::rollbackLockedTables()
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (fTableInfo[i].isTableLocked())
if (fTableInfo[i]->isTableLocked())
{
lockedTableFound = true;
break;
@ -1509,10 +1509,10 @@ int BulkLoad::rollbackLockedTables()
// Report the tables that were successfully loaded
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (!fTableInfo[i].isTableLocked())
if (!fTableInfo[i]->isTableLocked())
{
ostringstream oss;
oss << "Table " << fTableInfo[i].getTableName() << " was successfully loaded. ";
oss << "Table " << fTableInfo[i]->getTableName() << " was successfully loaded. ";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
}
@ -1520,24 +1520,24 @@ int BulkLoad::rollbackLockedTables()
// Report the tables that were not successfully loaded
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (fTableInfo[i].isTableLocked())
if (fTableInfo[i]->isTableLocked())
{
if (fTableInfo[i].hasProcessingBegun())
if (fTableInfo[i]->hasProcessingBegun())
{
ostringstream oss;
oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")"
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
<< " was not successfully loaded. Rolling back.";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
else
{
ostringstream oss;
oss << "Table " << fTableInfo[i].getTableName() << " (OID-" << fTableInfo[i].getTableOID() << ")"
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
<< " did not start loading. No rollback necessary.";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
rc = rollbackLockedTable(fTableInfo[i]);
rc = rollbackLockedTable(*fTableInfo[i]);
if (rc != NO_ERROR)
{
@ -1603,9 +1603,9 @@ bool BulkLoad::addErrorMsg2BrmUpdater(const std::string& tablename, const ostrin
for (int tableId = 0; tableId < size; tableId++)
{
if (fTableInfo[tableId].getTableName() == tablename)
if (fTableInfo[tableId]->getTableName() == tablename)
{
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
return true;
}
}

View File

@ -77,7 +77,7 @@ class BulkLoad : public FileOp
/**
* @brief Pre process jobs to validate and assign values to the job structure
*/
int preProcess(Job& job, int tableNo, TableInfo* tableInfo);
int preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tableInfo);
/**
* @brief Print job information
@ -194,7 +194,7 @@ class BulkLoad : public FileOp
std::string fAlternateImportDir; // Alternate bulk import directory
std::string fErrorDir; // Opt. where error records record
std::string fProcessName; // Application process name
static boost::ptr_vector<TableInfo> fTableInfo; // Vector of Table information
static std::vector<std::shared_ptr<TableInfo>> fTableInfo; // Vector of Table information
int fNoOfParseThreads; // Number of parse threads
int fNoOfReadThreads; // Number of read threads
boost::thread_group fReadThreads; // Read thread group

View File

@ -140,7 +140,7 @@ int ExtentStripeAlloc::allocateExtent(OID oid, uint16_t dbRoot,
startLbid = extentEntryIter->second.fStartLbid;
allocSize = extentEntryIter->second.fAllocSize;
hwm = extentEntryIter->second.fHwm;
errMsg = extentEntryIter->second.fStatusMsg;
errMsg = *extentEntryIter->second.fStatusMsg;
retStatus = extentEntryIter->second.fStatus;
fMap.erase(extentEntryIter);
@ -274,7 +274,7 @@ void ExtentStripeAlloc::print()
<< "; seg: " << iter->second.fSegNum << "; lbid: " << iter->second.fStartLbid
<< "; size: " << iter->second.fAllocSize << "; hwm: " << iter->second.fHwm
<< "; stripe: " << iter->second.fStripeKey << "; stat: " << iter->second.fStatus
<< "; msg: " << iter->second.fStatusMsg;
<< "; msg: " << *iter->second.fStatusMsg;
}
}
else

View File

@ -46,21 +46,6 @@ class Log;
class AllocExtEntry
{
public:
// Default constructor
AllocExtEntry()
: fOid(0)
, fColWidth(0)
, fDbRoot(0)
, fPartNum(0)
, fSegNum(0)
, fStartLbid(0)
, fAllocSize(0)
, fHwm(0)
, fStatus(NO_ERROR)
, fStripeKey(0)
{
}
// Used to create entry for an existing extent we are going to add data to.
AllocExtEntry(OID& oid, int colWidth, uint16_t dbRoot, uint32_t partNum, uint16_t segNum,
BRM::LBID_t startLbid, int allocSize, HWM hwm, int status, const std::string& statusMsg,
@ -74,22 +59,22 @@ class AllocExtEntry
, fAllocSize(allocSize)
, fHwm(hwm)
, fStatus(status)
, fStatusMsg(statusMsg)
, fStatusMsg(new std::string(statusMsg))
, fStripeKey(stripeKey)
{
}
OID fOid; // column OID
int fColWidth; // colum width (in bytes)
uint16_t fDbRoot; // DBRoot of allocated extent
uint32_t fPartNum; // Partition number of allocated extent
uint16_t fSegNum; // Segment number of allocated extent
BRM::LBID_t fStartLbid; // Starting LBID of allocated extent
int fAllocSize; // Number of allocated LBIDS
HWM fHwm; // Starting fbo or hwm of allocated extent
int fStatus; // Status of extent allocation
std::string fStatusMsg; // Status msg of extent allocation
unsigned int fStripeKey; // "Stripe" identifier for this extent
OID fOid = 0; // column OID
int fColWidth = 0; // colum width (in bytes)
uint16_t fDbRoot = 0; // DBRoot of allocated extent
uint32_t fPartNum = 0; // Partition number of allocated extent
uint16_t fSegNum = 0; // Segment number of allocated extent
BRM::LBID_t fStartLbid = 0; // Starting LBID of allocated extent
int fAllocSize = 0; // Number of allocated LBIDS
HWM fHwm = 0; // Starting fbo or hwm of allocated extent
int fStatus = NO_ERROR; // Status of extent allocation
std::shared_ptr<std::string> fStatusMsg{new std::string()}; // Status msg of extent allocation
unsigned int fStripeKey = 0; // "Stripe" identifier for this extent
};
//------------------------------------------------------------------------------

View File

@ -178,7 +178,6 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN
TableInfo::~TableInfo()
{
fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
freeProcessingBuffers();
}
//------------------------------------------------------------------------------

View File

@ -95,16 +95,16 @@ void BulkLoad::read(int id)
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
#endif
int rc = fTableInfo[tableId].readTableData();
int rc = fTableInfo[tableId]->readTableData();
if (rc != NO_ERROR)
{
// Error occurred while reading the data, break out of loop.
BulkStatus::setJobStatus(EXIT_FAILURE);
ostringstream oss;
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". Terminating this job.";
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
break;
}
@ -117,7 +117,7 @@ void BulkLoad::read(int id)
if (tableId != -1)
oss << "Bulkload Read (thread " << id << ") Stopped reading Table "
<< fTableInfo[tableId].getTableName() << ". " << ex.what();
<< fTableInfo[tableId]->getTableName() << ". " << ex.what();
else
oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what();
@ -129,14 +129,14 @@ void BulkLoad::read(int id)
ostringstream oss;
if (tableId != -1)
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". " << ex.what() << ". Terminating this job.";
else
oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what()
<< ". Terminating this job.";
if (tableId != -1)
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
}
@ -146,13 +146,13 @@ void BulkLoad::read(int id)
ostringstream oss;
if (tableId != -1)
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". Terminating this job.";
else
oss << "Bulkload Read (thread " << id << ") Failed for Table. Terminating this job.";
if (tableId != -1)
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
}
@ -170,7 +170,7 @@ int BulkLoad::lockTableForRead(int id)
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
if (fTableInfo[i].lockForRead(id))
if (fTableInfo[i]->lockForRead(id))
return i;
}
@ -292,16 +292,16 @@ void BulkLoad::parse(int id)
// Have obtained the table and column for parsing.
// Start parsing the column data.
double processingTime;
int rc = fTableInfo[tableId].parseColumn(columnId, myParseBuffer, processingTime);
int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime);
if (rc != NO_ERROR)
{
// Error occurred while parsing the data, break out of loop.
BulkStatus::setJobStatus(EXIT_FAILURE);
ostringstream oss;
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< " during parsing. Terminating this job.";
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
setParseErrorOnTable(tableId, true);
@ -310,7 +310,7 @@ void BulkLoad::parse(int id)
// Parsing is complete. Acquire the mutex and increment
// the parsingComplete value for the buffer
if (fTableInfo[tableId].getStatusTI() != WriteEngine::ERR)
if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR)
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
@ -320,15 +320,15 @@ void BulkLoad::parse(int id)
Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
Stats::startParseEvent(WE_STATS_COMPLETING_PARSE);
#endif
rc = fTableInfo[tableId].setParseComplete(columnId, myParseBuffer, processingTime);
rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime);
if (rc != NO_ERROR)
{
BulkStatus::setJobStatus(EXIT_FAILURE);
ostringstream oss;
oss << "Bulkload Parse (thread " << id << ") Failed for Table "
<< fTableInfo[tableId].getTableName() << " during parse completion. Terminating this job.";
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
<< fTableInfo[tableId]->getTableName() << " during parse completion. Terminating this job.";
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
setParseErrorOnTable(tableId, false);
@ -349,7 +349,7 @@ void BulkLoad::parse(int id)
if (tableId != -1)
{
oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table "
<< fTableInfo[tableId].getTableName() << ". " << ex.what();
<< fTableInfo[tableId]->getTableName() << ". " << ex.what();
setParseErrorOnTable(tableId, true);
}
@ -367,11 +367,11 @@ void BulkLoad::parse(int id)
if (tableId != -1)
{
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". " << ex.what() << ". Terminating this job.";
setParseErrorOnTable(tableId, true);
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
}
else
{
@ -388,11 +388,11 @@ void BulkLoad::parse(int id)
if (tableId != -1)
{
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId].getTableName()
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
<< ". Terminating this job.";
setParseErrorOnTable(tableId, true);
fTableInfo[tableId].fBRMReporter.addToErrMsgEntry(oss.str());
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
}
else
{
@ -421,10 +421,10 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
if (fTableInfo[i].getStatusTI() == WriteEngine::PARSE_COMPLETE)
if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE)
continue;
int currentParseBuffer = fTableInfo[i].getCurrentParseBuffer();
int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer();
myParseBuffer = currentParseBuffer;
do
@ -434,7 +434,7 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
{
ostringstream oss;
std::string bufStatusStr;
Status stat = fTableInfo[i].getStatusTI();
Status stat = fTableInfo[i]->getStatusTI();
ColumnInfo::convertStatusToString(stat, bufStatusStr);
oss << " - " << pthread_self() <<
":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")";
@ -452,13 +452,13 @@ bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int&
// @bug2099-
// get a buffer and column to parse if available.
if ((columnId = fTableInfo[i].getColumnForParse(thrdId, myParseBuffer, report)) != -1)
if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1)
{
tableId = i;
return true;
}
myParseBuffer = (myParseBuffer + 1) % fTableInfo[i].getNumberOfBuffers();
myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers();
} while (myParseBuffer != currentParseBuffer);
}
@ -479,10 +479,10 @@ bool BulkLoad::allTablesDone(Status status)
{
for (unsigned i = 0; i < fTableInfo.size(); ++i)
{
if (fTableInfo[i].getStatusTI() == WriteEngine::ERR)
if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR)
return true;
if (fTableInfo[i].getStatusTI() != status)
if (fTableInfo[i]->getStatusTI() != status)
return false;
}
@ -499,11 +499,11 @@ void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex)
if (lockParseMutex)
{
boost::mutex::scoped_lock lock(fParseMutex);
fTableInfo[tableId].setParseError();
fTableInfo[tableId]->setParseError();
}
else
{
fTableInfo[tableId].setParseError();
fTableInfo[tableId]->setParseError();
}
}

View File

@ -57,6 +57,7 @@ using namespace execplan;
namespace WriteEngine
{
BRMWrapper* volatile BRMWrapper::m_instance = NULL;
std::atomic<bool> BRMWrapper::finishReported(false);
boost::thread_specific_ptr<int> BRMWrapper::m_ThreadDataPtr;
boost::mutex BRMWrapper::m_instanceCreateMutex;
@ -750,6 +751,10 @@ uint8_t BRMWrapper::newCpimportJob(uint32_t &jobId)
void BRMWrapper::finishCpimportJob(uint32_t jobId)
{
if (finishReported.exchange(true)) // get old and set to true; if old is true, do nothing.
{
return;
}
blockRsltnMgrPtr->finishCpimportJob(jobId);
}

View File

@ -23,6 +23,7 @@
#pragma once
#include <atomic>
#include <iostream>
#include <memory>
#include <vector>
@ -474,6 +475,8 @@ class BRMWrapper : public WEObj
static IDBDataFile* m_curVBFile;
BRM::DBRM* blockRsltnMgrPtr;
EXPORT static std::atomic<bool> finishReported;
};
//------------------------------------------------------------------------------