mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
There were numerous memory leaks in plugin's code and associated code. During typical run of MTR tests it leaked around 65 megabytes of objects. As a result they may severely affect long-lived connections. This patch fixes (almost) all leaks found in the plugin. The exceptions are two leaks associated with SHOW CREATE TABLE columnstore_table and getting information of columns of columnstore-handled table. These should be fixed on the server side and work is on the way.
511 lines
16 KiB
C++
511 lines
16 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*****************************************************************************
|
|
* $Id: we_workers.cpp 4450 2013-01-21 14:13:24Z rdempsey $
|
|
*
|
|
****************************************************************************/
|
|
|
|
#include "we_bulkload.h"
|
|
#include "we_bulkstatus.h"
|
|
#include "we_stats.h"
|
|
#include <sys/time.h>
|
|
#include <cmath>
|
|
#include "dataconvert.h"
|
|
#include <we_convertor.h>
|
|
using namespace std;
|
|
using namespace dataconvert;
|
|
|
|
namespace WriteEngine
|
|
{
|
|
//------------------------------------------------------------------------------
|
|
// Puts the current thread to sleep for the specified number of milliseconds.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoad::sleepMS(long ms)
|
|
{
|
|
struct timespec rm_ts;
|
|
|
|
rm_ts.tv_sec = ms / 1000;
|
|
rm_ts.tv_nsec = ms % 1000 * 1000000;
|
|
struct timespec abs_ts;
|
|
|
|
do
|
|
{
|
|
abs_ts.tv_sec = rm_ts.tv_sec;
|
|
abs_ts.tv_nsec = rm_ts.tv_nsec;
|
|
} while (nanosleep(&abs_ts, &rm_ts) < 0);
|
|
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// This is the main entry point method for each Read thread.
|
|
// id is the one-up number (starting at 0) associated with each Read thread.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoad::read(int id)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::registerReadProfThread();
|
|
#endif
|
|
// First get a table to work on.
|
|
// Acquire the read mutex
|
|
// Iterate over the table list
|
|
// if the table's status is new, set the locker = id
|
|
// and then exit.
|
|
int tableId = -1;
|
|
|
|
try
|
|
{
|
|
//
|
|
// LOOP to select and read the next table
|
|
//
|
|
while (true)
|
|
{
|
|
tableId = -1;
|
|
#ifdef PROFILE
|
|
Stats::startReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
|
|
#endif
|
|
|
|
if ((tableId = lockTableForRead(id)) == -1)
|
|
{
|
|
fLog.logMsg(
|
|
"BulkLoad::ReadOperation No more tables "
|
|
"available for processing. Read thread " +
|
|
Convertor::int2Str(id) + " exiting...",
|
|
MSGLVL_INFO2);
|
|
#ifdef PROFILE
|
|
Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
|
|
#endif
|
|
return;
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
|
|
#endif
|
|
int rc = fTableInfo[tableId]->readTableData();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Error occurred while reading the data, break out of loop.
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
|
|
<< ". Terminating this job.";
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (SecondaryShutdownException& ex)
|
|
{
|
|
// We are bailing out because another thread set bad job status
|
|
ostringstream oss;
|
|
|
|
if (tableId != -1)
|
|
oss << "Bulkload Read (thread " << id << ") Stopped reading Table "
|
|
<< fTableInfo[tableId]->getTableName() << ". " << ex.what();
|
|
else
|
|
oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what();
|
|
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
|
|
if (tableId != -1)
|
|
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
|
|
<< ". " << ex.what() << ". Terminating this job.";
|
|
else
|
|
oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what()
|
|
<< ". Terminating this job.";
|
|
|
|
if (tableId != -1)
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
|
|
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
|
|
}
|
|
catch (...)
|
|
{
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
|
|
if (tableId != -1)
|
|
oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
|
|
<< ". Terminating this job.";
|
|
else
|
|
oss << "Bulkload Read (thread " << id << ") Failed for Table. Terminating this job.";
|
|
|
|
if (tableId != -1)
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
|
|
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Search for an available table to be read.
|
|
// First available table that is found, is locked and assigned to the specified
|
|
// thread id.
|
|
// Return value is -1 if no table is available for reading.
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::lockTableForRead(int id)
|
|
{
|
|
boost::mutex::scoped_lock lock(fReadMutex);
|
|
|
|
for (unsigned i = 0; i < fTableInfo.size(); ++i)
|
|
{
|
|
if (fTableInfo[i]->lockForRead(id))
|
|
return i;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// This is the main entry point method for each parsing thread.
|
|
// id is the one-up number (starting at 0) associated with each parsing thread.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoad::parse(int id)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::registerParseProfThread();
|
|
#endif
|
|
// Get a column from a buffer to parse.
|
|
// The currentParseBuffer will be the buffer to be worked on
|
|
int tableId = -1;
|
|
int columnId = -1;
|
|
int myParseBuffer = -1;
|
|
|
|
try
|
|
{
|
|
//
|
|
// LOOP to parse BulkLoadBuffers as they're loaded by Read thread(s)
|
|
//
|
|
while (true)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_WAIT_TO_SELECT_COL);
|
|
#endif
|
|
|
|
// @bug 3271: Conditionally compile the thread deadlock debug logging
|
|
#ifdef DEADLOCK_DEBUG
|
|
// @bug2099+ Temporary hack.
|
|
struct timeval tvStart;
|
|
gettimeofday(&tvStart, 0);
|
|
bool report = false;
|
|
bool reported = false;
|
|
// @bug2099-
|
|
#else
|
|
const bool report = false;
|
|
#endif
|
|
|
|
//
|
|
// LOOP to wait and select table/column/buffers
|
|
// (BulkLoadBuffers) as they are loaded by the Read buffer.
|
|
//
|
|
while (true)
|
|
{
|
|
tableId = -1;
|
|
columnId = -1;
|
|
myParseBuffer = -1;
|
|
|
|
// See if JobStatus has been set to terminate by other thread
|
|
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
|
|
{
|
|
throw SecondaryShutdownException(
|
|
"BulkLoad::"
|
|
"parse() responding to job termination");
|
|
}
|
|
|
|
if (allTablesDone(WriteEngine::PARSE_COMPLETE))
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_SELECT_COL);
|
|
#endif
|
|
// no column from any of the tables available for
|
|
// parsing, hence exit.
|
|
return;
|
|
}
|
|
|
|
if (lockColumnForParse(id, tableId, columnId, myParseBuffer, report))
|
|
break;
|
|
|
|
// Sleep and check the condition again.
|
|
sleepMS(1);
|
|
#ifdef DEADLOCK_DEBUG
|
|
|
|
// @bug2099+
|
|
if (report)
|
|
report = false; // report one time.
|
|
|
|
if (!reported)
|
|
{
|
|
struct timeval tvNow;
|
|
gettimeofday(&tvNow, 0);
|
|
|
|
if ((tvNow.tv_sec - tvStart.tv_sec) >= 100)
|
|
{
|
|
time_t t = time(0);
|
|
char timeString[50];
|
|
ctime_r(&t, timeString);
|
|
timeString[strlen(timeString) - 1] = '\0';
|
|
ostringstream oss;
|
|
oss << endl
|
|
<< endl
|
|
<< timeString << ": BulkLoad::parse(" << id << "); "
|
|
<<
|
|
" Worker Thread " << pthread_self() <<
|
|
":" << endl
|
|
<< "---------------------------------------"
|
|
"-------------------"
|
|
<< endl;
|
|
cout << oss.str();
|
|
cout.flush();
|
|
report = true;
|
|
reported = true;
|
|
}
|
|
}
|
|
|
|
// @bug2099-
|
|
#endif
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_SELECT_COL);
|
|
#endif
|
|
// Have obtained the table and column for parsing.
|
|
// Start parsing the column data.
|
|
double processingTime;
|
|
int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Error occurred while parsing the data, break out of loop.
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
|
|
<< " during parsing. Terminating this job.";
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
|
|
|
|
setParseErrorOnTable(tableId, true);
|
|
return;
|
|
}
|
|
|
|
// Parsing is complete. Acquire the mutex and increment
|
|
// the parsingComplete value for the buffer
|
|
if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
|
|
#endif
|
|
boost::mutex::scoped_lock lock(fParseMutex);
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
|
|
Stats::startParseEvent(WE_STATS_COMPLETING_PARSE);
|
|
#endif
|
|
rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
oss << "Bulkload Parse (thread " << id << ") Failed for Table "
|
|
<< fTableInfo[tableId]->getTableName() << " during parse completion. Terminating this job.";
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
|
|
|
|
setParseErrorOnTable(tableId, false);
|
|
return;
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_COMPLETING_PARSE);
|
|
#endif
|
|
}
|
|
}
|
|
}
|
|
catch (SecondaryShutdownException& ex)
|
|
{
|
|
// We are bailing out because another thread set bad job status
|
|
ostringstream oss;
|
|
|
|
if (tableId != -1)
|
|
{
|
|
oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table "
|
|
<< fTableInfo[tableId]->getTableName() << ". " << ex.what();
|
|
|
|
setParseErrorOnTable(tableId, true);
|
|
}
|
|
else
|
|
{
|
|
oss << "Bulkload Parse (thread " << id << ") Stopped parsing Tables. " << ex.what();
|
|
}
|
|
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
catch (exception& ex)
|
|
{
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
|
|
if (tableId != -1)
|
|
{
|
|
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
|
|
<< ". " << ex.what() << ". Terminating this job.";
|
|
|
|
setParseErrorOnTable(tableId, true);
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
}
|
|
else
|
|
{
|
|
oss << "Bulkload Parse (thread " << id << ") Failed for Table. " << ex.what()
|
|
<< ". Terminating this job.";
|
|
}
|
|
|
|
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
|
|
}
|
|
catch (...)
|
|
{
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
ostringstream oss;
|
|
|
|
if (tableId != -1)
|
|
{
|
|
oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
|
|
<< ". Terminating this job.";
|
|
|
|
setParseErrorOnTable(tableId, true);
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
}
|
|
else
|
|
{
|
|
oss << "Bulkload Parse (thread " << id << ") Failed for Table. Terminating this job.";
|
|
}
|
|
|
|
fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Search for an available table/column/buffer to be parsed.
|
|
// First available table/column/buffer that is found, is locked and assigned to
|
|
// the specified thread id.
|
|
// Return value is -1 if no table/column/buffer is available for parsing.
|
|
//------------------------------------------------------------------------------
|
|
// @bug2099 - Temp hack to diagnose deadlock. Added report parm and couts below.
|
|
bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& myParseBuffer, bool report)
|
|
{
|
|
// Acquire mutex
|
|
// Iterate on the table list
|
|
// Check if the currentParseBuffer is available for parsing
|
|
// If yes, put the locker and fill the tableId and columnId
|
|
// else, go to the next table for checking if a column is available
|
|
boost::mutex::scoped_lock lock(fParseMutex);
|
|
|
|
for (unsigned i = 0; i < fTableInfo.size(); ++i)
|
|
{
|
|
if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE)
|
|
continue;
|
|
|
|
int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer();
|
|
myParseBuffer = currentParseBuffer;
|
|
|
|
do
|
|
{
|
|
// @bug2099+
|
|
if (report)
|
|
{
|
|
ostringstream oss;
|
|
std::string bufStatusStr;
|
|
Status stat = fTableInfo[i]->getStatusTI();
|
|
ColumnInfo::convertStatusToString(stat, bufStatusStr);
|
|
oss << " - " << pthread_self() <<
|
|
":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")";
|
|
|
|
if (stat != WriteEngine::PARSE_COMPLETE)
|
|
{
|
|
oss << "; fCurrentParseBuffer is " << myParseBuffer;
|
|
}
|
|
|
|
oss << endl;
|
|
cout << oss.str();
|
|
cout.flush();
|
|
}
|
|
|
|
// @bug2099-
|
|
|
|
// get a buffer and column to parse if available.
|
|
if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1)
|
|
{
|
|
tableId = i;
|
|
return true;
|
|
}
|
|
|
|
myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers();
|
|
} while (myParseBuffer != currentParseBuffer);
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Used by the parsing threads to check to see if all the tables match the
|
|
// specified status. The only current use for the function is to check to
|
|
// see if all the tables have a PARSE_COMPLETE status, hence the name of the
|
|
// function.
|
|
// If a table has an ERR status, then all the tables are considered finished,
|
|
// because that basically means we had an import error, and we need to shut
|
|
// down the job.
|
|
// Returns TRUE if all the tables are considered completed, else returns FALSE.
|
|
//------------------------------------------------------------------------------
|
|
bool BulkLoad::allTablesDone(Status status)
|
|
{
|
|
for (unsigned i = 0; i < fTableInfo.size(); ++i)
|
|
{
|
|
if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR)
|
|
return true;
|
|
|
|
if (fTableInfo[i]->getStatusTI() != status)
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Set status for specified table to reflect a parsing error.
|
|
// Optionally lock fParseMutex (if requested).
|
|
// May evaluate later whether we need to employ the fParseMutex for this call.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex)
|
|
{
|
|
if (lockParseMutex)
|
|
{
|
|
boost::mutex::scoped_lock lock(fParseMutex);
|
|
fTableInfo[tableId]->setParseError();
|
|
}
|
|
else
|
|
{
|
|
fTableInfo[tableId]->setParseError();
|
|
}
|
|
}
|
|
|
|
} // namespace WriteEngine
|