You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	There were numerous memory leaks in plugin's code and associated code. During typical run of MTR tests it leaked around 65 megabytes of objects. As a result they may severely affect long-lived connections. This patch fixes (almost) all leaks found in the plugin. The exceptions are two leaks associated with SHOW CREATE TABLE columnstore_table and getting information of columns of columnstore-handled table. These should be fixed on the server side and work is on the way.
		
			
				
	
	
		
			511 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			511 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /*****************************************************************************
 | |
|  * $Id: we_workers.cpp 4450 2013-01-21 14:13:24Z rdempsey $
 | |
|  *
 | |
|  ****************************************************************************/
 | |
| 
 | |
| #include "we_bulkload.h"
 | |
| #include "we_bulkstatus.h"
 | |
| #include "we_stats.h"
 | |
| #include <sys/time.h>
 | |
| #include <cmath>
 | |
| #include "dataconvert.h"
 | |
| #include <we_convertor.h>
 | |
| using namespace std;
 | |
| using namespace dataconvert;
 | |
| 
 | |
| namespace WriteEngine
 | |
| {
 | |
| //------------------------------------------------------------------------------
 | |
| // Puts the current thread to sleep for the specified number of milliseconds.
 | |
| //------------------------------------------------------------------------------
 | |
| void BulkLoad::sleepMS(long ms)
 | |
| {
 | |
|   struct timespec rm_ts;
 | |
| 
 | |
|   rm_ts.tv_sec = ms / 1000;
 | |
|   rm_ts.tv_nsec = ms % 1000 * 1000000;
 | |
|   struct timespec abs_ts;
 | |
| 
 | |
|   do
 | |
|   {
 | |
|     abs_ts.tv_sec = rm_ts.tv_sec;
 | |
|     abs_ts.tv_nsec = rm_ts.tv_nsec;
 | |
|   } while (nanosleep(&abs_ts, &rm_ts) < 0);
 | |
| 
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // This is the main entry point method for each Read thread.
 | |
| // id is the one-up number (starting at 0) associated with each Read thread.
 | |
| //------------------------------------------------------------------------------
 | |
| void BulkLoad::read(int id)
 | |
| {
 | |
| #ifdef PROFILE
 | |
|   Stats::registerReadProfThread();
 | |
| #endif
 | |
|   // First get a table to work on.
 | |
|   // Acquire the read mutex
 | |
|   // Iterate over the table list
 | |
|   // if the table's status is new, set the locker = id
 | |
|   // and then exit.
 | |
|   int tableId = -1;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     //
 | |
|     // LOOP to select and read the next table
 | |
|     //
 | |
|     while (true)
 | |
|     {
 | |
|       tableId = -1;
 | |
| #ifdef PROFILE
 | |
|       Stats::startReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
 | |
| #endif
 | |
| 
 | |
|       if ((tableId = lockTableForRead(id)) == -1)
 | |
|       {
 | |
|         fLog.logMsg(
 | |
|             "BulkLoad::ReadOperation No more tables "
 | |
|             "available for processing. Read thread " +
 | |
|                 Convertor::int2Str(id) + " exiting...",
 | |
|             MSGLVL_INFO2);
 | |
| #ifdef PROFILE
 | |
|         Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
 | |
| #endif
 | |
|         return;
 | |
|       }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|       Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL);
 | |
| #endif
 | |
|       int rc = fTableInfo[tableId]->readTableData();
 | |
| 
 | |
|       if (rc != NO_ERROR)
 | |
|       {
 | |
|         // Error occurred while reading the data, break out of loop.
 | |
|         BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|         ostringstream oss;
 | |
|         oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
 | |
|             << ".  Terminating this job.";
 | |
|         fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
|         fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   catch (SecondaryShutdownException& ex)
 | |
|   {
 | |
|     // We are bailing out because another thread set bad job status
 | |
|     ostringstream oss;
 | |
| 
 | |
|     if (tableId != -1)
 | |
|       oss << "Bulkload Read (thread " << id << ") Stopped reading Table "
 | |
|           << fTableInfo[tableId]->getTableName() << ".  " << ex.what();
 | |
|     else
 | |
|       oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what();
 | |
| 
 | |
|     fLog.logMsg(oss.str(), MSGLVL_INFO1);
 | |
|   }
 | |
|   catch (exception& ex)
 | |
|   {
 | |
|     BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|     ostringstream oss;
 | |
| 
 | |
|     if (tableId != -1)
 | |
|       oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
 | |
|           << ".  " << ex.what() << ".  Terminating this job.";
 | |
|     else
 | |
|       oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what()
 | |
|           << ".  Terminating this job.";
 | |
| 
 | |
|     if (tableId != -1)
 | |
|       fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
| 
 | |
|     fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|     ostringstream oss;
 | |
| 
 | |
|     if (tableId != -1)
 | |
|       oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
 | |
|           << ".  Terminating this job.";
 | |
|     else
 | |
|       oss << "Bulkload Read (thread " << id << ") Failed for Table.  Terminating this job.";
 | |
| 
 | |
|     if (tableId != -1)
 | |
|       fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
| 
 | |
|     fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
 | |
|   }
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Search for an available table to be read.
 | |
| // First available table that is found, is locked and assigned to the specified
 | |
| // thread id.
 | |
| // Return value is -1 if no table is available for reading.
 | |
| //------------------------------------------------------------------------------
 | |
| int BulkLoad::lockTableForRead(int id)
 | |
| {
 | |
|   boost::mutex::scoped_lock lock(fReadMutex);
 | |
| 
 | |
|   for (unsigned i = 0; i < fTableInfo.size(); ++i)
 | |
|   {
 | |
|     if (fTableInfo[i]->lockForRead(id))
 | |
|       return i;
 | |
|   }
 | |
| 
 | |
|   return -1;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // This is the main entry point method for each parsing thread.
 | |
| // id is the one-up number (starting at 0) associated with each parsing thread.
 | |
| //------------------------------------------------------------------------------
 | |
| void BulkLoad::parse(int id)
 | |
| {
 | |
| #ifdef PROFILE
 | |
|   Stats::registerParseProfThread();
 | |
| #endif
 | |
|   // Get a column from a buffer to parse.
 | |
|   // The currentParseBuffer will be the buffer to be worked on
 | |
|   int tableId = -1;
 | |
|   int columnId = -1;
 | |
|   int myParseBuffer = -1;
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     //
 | |
|     // LOOP to parse BulkLoadBuffers as they're loaded by Read thread(s)
 | |
|     //
 | |
|     while (true)
 | |
|     {
 | |
| #ifdef PROFILE
 | |
|       Stats::startParseEvent(WE_STATS_WAIT_TO_SELECT_COL);
 | |
| #endif
 | |
| 
 | |
| // @bug 3271: Conditionally compile the thread deadlock debug logging
 | |
| #ifdef DEADLOCK_DEBUG
 | |
|       // @bug2099+ Temporary hack.
 | |
|       struct timeval tvStart;
 | |
|       gettimeofday(&tvStart, 0);
 | |
|       bool report = false;
 | |
|       bool reported = false;
 | |
|       // @bug2099-
 | |
| #else
 | |
|       const bool report = false;
 | |
| #endif
 | |
| 
 | |
|       //
 | |
|       // LOOP to wait and select table/column/buffers
 | |
|       // (BulkLoadBuffers) as they are loaded by the Read buffer.
 | |
|       //
 | |
|       while (true)
 | |
|       {
 | |
|         tableId = -1;
 | |
|         columnId = -1;
 | |
|         myParseBuffer = -1;
 | |
| 
 | |
|         // See if JobStatus has been set to terminate by other thread
 | |
|         if (BulkStatus::getJobStatus() == EXIT_FAILURE)
 | |
|         {
 | |
|           throw SecondaryShutdownException(
 | |
|               "BulkLoad::"
 | |
|               "parse() responding to job termination");
 | |
|         }
 | |
| 
 | |
|         if (allTablesDone(WriteEngine::PARSE_COMPLETE))
 | |
|         {
 | |
| #ifdef PROFILE
 | |
|           Stats::stopParseEvent(WE_STATS_WAIT_TO_SELECT_COL);
 | |
| #endif
 | |
|           // no column from any of the tables available for
 | |
|           // parsing, hence exit.
 | |
|           return;
 | |
|         }
 | |
| 
 | |
|         if (lockColumnForParse(id, tableId, columnId, myParseBuffer, report))
 | |
|           break;
 | |
| 
 | |
|         // Sleep and check the condition again.
 | |
|         sleepMS(1);
 | |
| #ifdef DEADLOCK_DEBUG
 | |
| 
 | |
|         // @bug2099+
 | |
|         if (report)
 | |
|           report = false;  // report one time.
 | |
| 
 | |
|         if (!reported)
 | |
|         {
 | |
|           struct timeval tvNow;
 | |
|           gettimeofday(&tvNow, 0);
 | |
| 
 | |
|           if ((tvNow.tv_sec - tvStart.tv_sec) >= 100)
 | |
|           {
 | |
|             time_t t = time(0);
 | |
|             char timeString[50];
 | |
|             ctime_r(&t, timeString);
 | |
|             timeString[strlen(timeString) - 1] = '\0';
 | |
|             ostringstream oss;
 | |
|             oss << endl
 | |
|                 << endl
 | |
|                 << timeString << ": BulkLoad::parse(" << id << "); "
 | |
|                 <<
 | |
|                 " Worker Thread " << pthread_self() <<
 | |
|                 ":" << endl
 | |
|                 << "---------------------------------------"
 | |
|                    "-------------------"
 | |
|                 << endl;
 | |
|             cout << oss.str();
 | |
|             cout.flush();
 | |
|             report = true;
 | |
|             reported = true;
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         // @bug2099-
 | |
| #endif
 | |
|       }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|       Stats::stopParseEvent(WE_STATS_WAIT_TO_SELECT_COL);
 | |
| #endif
 | |
|       // Have obtained the table and column for parsing.
 | |
|       // Start parsing the column data.
 | |
|       double processingTime;
 | |
|       int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime);
 | |
| 
 | |
|       if (rc != NO_ERROR)
 | |
|       {
 | |
|         // Error occurred while parsing the data, break out of loop.
 | |
|         BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|         ostringstream oss;
 | |
|         oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
 | |
|             << " during parsing.  Terminating this job.";
 | |
|         fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
|         fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
 | |
| 
 | |
|         setParseErrorOnTable(tableId, true);
 | |
|         return;
 | |
|       }
 | |
| 
 | |
|       // Parsing is complete. Acquire the mutex and increment
 | |
|       // the parsingComplete value for the buffer
 | |
|       if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR)
 | |
|       {
 | |
| #ifdef PROFILE
 | |
|         Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
 | |
| #endif
 | |
|         boost::mutex::scoped_lock lock(fParseMutex);
 | |
| #ifdef PROFILE
 | |
|         Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE);
 | |
|         Stats::startParseEvent(WE_STATS_COMPLETING_PARSE);
 | |
| #endif
 | |
|         rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime);
 | |
| 
 | |
|         if (rc != NO_ERROR)
 | |
|         {
 | |
|           BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|           ostringstream oss;
 | |
|           oss << "Bulkload Parse (thread " << id << ") Failed for Table "
 | |
|               << fTableInfo[tableId]->getTableName() << " during parse completion.  Terminating this job.";
 | |
|           fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
|           fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL);
 | |
| 
 | |
|           setParseErrorOnTable(tableId, false);
 | |
|           return;
 | |
|         }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|         Stats::stopParseEvent(WE_STATS_COMPLETING_PARSE);
 | |
| #endif
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   catch (SecondaryShutdownException& ex)
 | |
|   {
 | |
|     // We are bailing out because another thread set bad job status
 | |
|     ostringstream oss;
 | |
| 
 | |
|     if (tableId != -1)
 | |
|     {
 | |
|       oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table "
 | |
|           << fTableInfo[tableId]->getTableName() << ".  " << ex.what();
 | |
| 
 | |
|       setParseErrorOnTable(tableId, true);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       oss << "Bulkload Parse (thread " << id << ") Stopped parsing Tables. " << ex.what();
 | |
|     }
 | |
| 
 | |
|     fLog.logMsg(oss.str(), MSGLVL_INFO1);
 | |
|   }
 | |
|   catch (exception& ex)
 | |
|   {
 | |
|     BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|     ostringstream oss;
 | |
| 
 | |
|     if (tableId != -1)
 | |
|     {
 | |
|       oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
 | |
|           << ".  " << ex.what() << ".  Terminating this job.";
 | |
| 
 | |
|       setParseErrorOnTable(tableId, true);
 | |
|       fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       oss << "Bulkload Parse (thread " << id << ") Failed for Table. " << ex.what()
 | |
|           << ".  Terminating this job.";
 | |
|     }
 | |
| 
 | |
|     fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     BulkStatus::setJobStatus(EXIT_FAILURE);
 | |
|     ostringstream oss;
 | |
| 
 | |
|     if (tableId != -1)
 | |
|     {
 | |
|       oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName()
 | |
|           << ".  Terminating this job.";
 | |
| 
 | |
|       setParseErrorOnTable(tableId, true);
 | |
|       fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       oss << "Bulkload Parse (thread " << id << ") Failed for Table.  Terminating this job.";
 | |
|     }
 | |
| 
 | |
|     fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL);
 | |
|   }
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Search for an available table/column/buffer to be parsed.
 | |
| // First available table/column/buffer that is found, is locked and assigned to
 | |
| // the specified thread id.
 | |
| // Return value is -1 if no table/column/buffer is available for parsing.
 | |
| //------------------------------------------------------------------------------
 | |
| // @bug2099 - Temp hack to diagnose deadlock. Added report parm and couts below.
 | |
| bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& myParseBuffer, bool report)
 | |
| {
 | |
|   // Acquire mutex
 | |
|   // Iterate on the table list
 | |
|   // Check if the currentParseBuffer is available for parsing
 | |
|   // If yes, put the locker and fill the tableId and columnId
 | |
|   // else, go to the next table for checking if a column is available
 | |
|   boost::mutex::scoped_lock lock(fParseMutex);
 | |
| 
 | |
|   for (unsigned i = 0; i < fTableInfo.size(); ++i)
 | |
|   {
 | |
|     if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE)
 | |
|       continue;
 | |
| 
 | |
|     int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer();
 | |
|     myParseBuffer = currentParseBuffer;
 | |
| 
 | |
|     do
 | |
|     {
 | |
|       // @bug2099+
 | |
|       if (report)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         std::string bufStatusStr;
 | |
|         Status stat = fTableInfo[i]->getStatusTI();
 | |
|         ColumnInfo::convertStatusToString(stat, bufStatusStr);
 | |
|         oss << " - " << pthread_self() <<
 | |
|             ":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")";
 | |
| 
 | |
|         if (stat != WriteEngine::PARSE_COMPLETE)
 | |
|         {
 | |
|           oss << "; fCurrentParseBuffer is " << myParseBuffer;
 | |
|         }
 | |
| 
 | |
|         oss << endl;
 | |
|         cout << oss.str();
 | |
|         cout.flush();
 | |
|       }
 | |
| 
 | |
|       // @bug2099-
 | |
| 
 | |
|       // get a buffer and column to parse if available.
 | |
|       if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1)
 | |
|       {
 | |
|         tableId = i;
 | |
|         return true;
 | |
|       }
 | |
| 
 | |
|       myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers();
 | |
|     } while (myParseBuffer != currentParseBuffer);
 | |
|   }
 | |
| 
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Used by the parsing threads to check to see if all the tables match the
 | |
| // specified status.  The only current use for the function is to check to
 | |
| // see if all the tables have a PARSE_COMPLETE status, hence the name of the
 | |
| // function.
 | |
| // If a table has an ERR status, then all the tables are considered finished,
 | |
| // because that basically means we had an import error, and we need to shut
 | |
| // down the job.
 | |
| // Returns TRUE if all the tables are considered completed, else returns FALSE.
 | |
| //------------------------------------------------------------------------------
 | |
| bool BulkLoad::allTablesDone(Status status)
 | |
| {
 | |
|   for (unsigned i = 0; i < fTableInfo.size(); ++i)
 | |
|   {
 | |
|     if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR)
 | |
|       return true;
 | |
| 
 | |
|     if (fTableInfo[i]->getStatusTI() != status)
 | |
|       return false;
 | |
|   }
 | |
| 
 | |
|   return true;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Set status for specified table to reflect a parsing error.
 | |
| // Optionally lock fParseMutex (if requested).
 | |
| // May evaluate later whether we need to employ the fParseMutex for this call.
 | |
| //------------------------------------------------------------------------------
 | |
| void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex)
 | |
| {
 | |
|   if (lockParseMutex)
 | |
|   {
 | |
|     boost::mutex::scoped_lock lock(fParseMutex);
 | |
|     fTableInfo[tableId]->setParseError();
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     fTableInfo[tableId]->setParseError();
 | |
|   }
 | |
| }
 | |
| 
 | |
| }  // namespace WriteEngine
 |