/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /***************************************************************************** * $Id: we_workers.cpp 4450 2013-01-21 14:13:24Z rdempsey $ * ****************************************************************************/ #include "we_bulkload.h" #include "we_bulkstatus.h" #include "we_stats.h" #include #include #include "dataconvert.h" #include using namespace std; using namespace dataconvert; namespace WriteEngine { //------------------------------------------------------------------------------ // Puts the current thread to sleep for the specified number of milliseconds. //------------------------------------------------------------------------------ void BulkLoad::sleepMS(long ms) { struct timespec rm_ts; rm_ts.tv_sec = ms / 1000; rm_ts.tv_nsec = ms % 1000 * 1000000; struct timespec abs_ts; do { abs_ts.tv_sec = rm_ts.tv_sec; abs_ts.tv_nsec = rm_ts.tv_nsec; } while (nanosleep(&abs_ts, &rm_ts) < 0); } //------------------------------------------------------------------------------ // This is the main entry point method for each Read thread. // id is the one-up number (starting at 0) associated with each Read thread. //------------------------------------------------------------------------------ void BulkLoad::read(int id) { #ifdef PROFILE Stats::registerReadProfThread(); #endif // First get a table to work on. // Acquire the read mutex // Iterate over the table list // if the table's status is new, set the locker = id // and then exit. int tableId = -1; try { // // LOOP to select and read the next table // while (true) { tableId = -1; #ifdef PROFILE Stats::startReadEvent(WE_STATS_WAIT_TO_SELECT_TBL); #endif if ((tableId = lockTableForRead(id)) == -1) { fLog.logMsg( "BulkLoad::ReadOperation No more tables " "available for processing. Read thread " + Convertor::int2Str(id) + " exiting...", MSGLVL_INFO2); #ifdef PROFILE Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL); #endif return; } #ifdef PROFILE Stats::stopReadEvent(WE_STATS_WAIT_TO_SELECT_TBL); #endif int rc = fTableInfo[tableId]->readTableData(); if (rc != NO_ERROR) { // Error occurred while reading the data, break out of loop. BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); break; } } } catch (SecondaryShutdownException& ex) { // We are bailing out because another thread set bad job status ostringstream oss; if (tableId != -1) oss << "Bulkload Read (thread " << id << ") Stopped reading Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what(); else oss << "Bulkload Read (thread " << id << ") Stopped reading Tables. " << ex.what(); fLog.logMsg(oss.str(), MSGLVL_INFO1); } catch (exception& ex) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; if (tableId != -1) oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what() << ". Terminating this job."; else oss << "Bulkload Read (thread " << id << ") Failed for Table. " << ex.what() << ". Terminating this job."; if (tableId != -1) fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } catch (...) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; if (tableId != -1) oss << "Bulkload Read (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; else oss << "Bulkload Read (thread " << id << ") Failed for Table. Terminating this job."; if (tableId != -1) fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } } //------------------------------------------------------------------------------ // Search for an available table to be read. // First available table that is found, is locked and assigned to the specified // thread id. // Return value is -1 if no table is available for reading. //------------------------------------------------------------------------------ int BulkLoad::lockTableForRead(int id) { boost::mutex::scoped_lock lock(fReadMutex); for (unsigned i = 0; i < fTableInfo.size(); ++i) { if (fTableInfo[i]->lockForRead(id)) return i; } return -1; } //------------------------------------------------------------------------------ // This is the main entry point method for each parsing thread. // id is the one-up number (starting at 0) associated with each parsing thread. //------------------------------------------------------------------------------ void BulkLoad::parse(int id) { #ifdef PROFILE Stats::registerParseProfThread(); #endif // Get a column from a buffer to parse. // The currentParseBuffer will be the buffer to be worked on int tableId = -1; int columnId = -1; int myParseBuffer = -1; try { // // LOOP to parse BulkLoadBuffers as they're loaded by Read thread(s) // while (true) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_WAIT_TO_SELECT_COL); #endif // @bug 3271: Conditionally compile the thread deadlock debug logging #ifdef DEADLOCK_DEBUG // @bug2099+ Temporary hack. struct timeval tvStart; gettimeofday(&tvStart, 0); bool report = false; bool reported = false; // @bug2099- #else const bool report = false; #endif // // LOOP to wait and select table/column/buffers // (BulkLoadBuffers) as they are loaded by the Read buffer. // while (true) { tableId = -1; columnId = -1; myParseBuffer = -1; // See if JobStatus has been set to terminate by other thread if (BulkStatus::getJobStatus() == EXIT_FAILURE) { throw SecondaryShutdownException( "BulkLoad::" "parse() responding to job termination"); } if (allTablesDone(WriteEngine::PARSE_COMPLETE)) { #ifdef PROFILE Stats::stopParseEvent(WE_STATS_WAIT_TO_SELECT_COL); #endif // no column from any of the tables available for // parsing, hence exit. return; } if (lockColumnForParse(id, tableId, columnId, myParseBuffer, report)) break; // Sleep and check the condition again. sleepMS(1); #ifdef DEADLOCK_DEBUG // @bug2099+ if (report) report = false; // report one time. if (!reported) { struct timeval tvNow; gettimeofday(&tvNow, 0); if ((tvNow.tv_sec - tvStart.tv_sec) >= 100) { time_t t = time(0); char timeString[50]; ctime_r(&t, timeString); timeString[strlen(timeString) - 1] = '\0'; ostringstream oss; oss << endl << endl << timeString << ": BulkLoad::parse(" << id << "); " << " Worker Thread " << pthread_self() << ":" << endl << "---------------------------------------" "-------------------" << endl; cout << oss.str(); cout.flush(); report = true; reported = true; } } // @bug2099- #endif } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_WAIT_TO_SELECT_COL); #endif // Have obtained the table and column for parsing. // Start parsing the column data. double processingTime; int rc = fTableInfo[tableId]->parseColumn(columnId, myParseBuffer, processingTime); if (rc != NO_ERROR) { // Error occurred while parsing the data, break out of loop. BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << " during parsing. Terminating this job."; fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); setParseErrorOnTable(tableId, true); return; } // Parsing is complete. Acquire the mutex and increment // the parsingComplete value for the buffer if (fTableInfo[tableId]->getStatusTI() != WriteEngine::ERR) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE); #endif boost::mutex::scoped_lock lock(fParseMutex); #ifdef PROFILE Stats::stopParseEvent(WE_STATS_WAIT_TO_COMPLETE_PARSE); Stats::startParseEvent(WE_STATS_COMPLETING_PARSE); #endif rc = fTableInfo[tableId]->setParseComplete(columnId, myParseBuffer, processingTime); if (rc != NO_ERROR) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << " during parse completion. Terminating this job."; fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); fLog.logMsg(oss.str(), rc, MSGLVL_CRITICAL); setParseErrorOnTable(tableId, false); return; } #ifdef PROFILE Stats::stopParseEvent(WE_STATS_COMPLETING_PARSE); #endif } } } catch (SecondaryShutdownException& ex) { // We are bailing out because another thread set bad job status ostringstream oss; if (tableId != -1) { oss << "Bulkload Parse (thread " << id << ") Stopped parsing Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what(); setParseErrorOnTable(tableId, true); } else { oss << "Bulkload Parse (thread " << id << ") Stopped parsing Tables. " << ex.what(); } fLog.logMsg(oss.str(), MSGLVL_INFO1); } catch (exception& ex) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; if (tableId != -1) { oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". " << ex.what() << ". Terminating this job."; setParseErrorOnTable(tableId, true); fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); } else { oss << "Bulkload Parse (thread " << id << ") Failed for Table. " << ex.what() << ". Terminating this job."; } fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } catch (...) { BulkStatus::setJobStatus(EXIT_FAILURE); ostringstream oss; if (tableId != -1) { oss << "Bulkload Parse (thread " << id << ") Failed for Table " << fTableInfo[tableId]->getTableName() << ". Terminating this job."; setParseErrorOnTable(tableId, true); fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str()); } else { oss << "Bulkload Parse (thread " << id << ") Failed for Table. Terminating this job."; } fLog.logMsg(oss.str(), ERR_UNKNOWN, MSGLVL_CRITICAL); } } //------------------------------------------------------------------------------ // Search for an available table/column/buffer to be parsed. // First available table/column/buffer that is found, is locked and assigned to // the specified thread id. // Return value is -1 if no table/column/buffer is available for parsing. //------------------------------------------------------------------------------ // @bug2099 - Temp hack to diagnose deadlock. Added report parm and couts below. bool BulkLoad::lockColumnForParse(int thrdId, int& tableId, int& columnId, int& myParseBuffer, bool report) { // Acquire mutex // Iterate on the table list // Check if the currentParseBuffer is available for parsing // If yes, put the locker and fill the tableId and columnId // else, go to the next table for checking if a column is available boost::mutex::scoped_lock lock(fParseMutex); for (unsigned i = 0; i < fTableInfo.size(); ++i) { if (fTableInfo[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE) continue; int currentParseBuffer = fTableInfo[i]->getCurrentParseBuffer(); myParseBuffer = currentParseBuffer; do { // @bug2099+ if (report) { ostringstream oss; std::string bufStatusStr; Status stat = fTableInfo[i]->getStatusTI(); ColumnInfo::convertStatusToString(stat, bufStatusStr); oss << " - " << pthread_self() << ":fTableInfo[" << i << "]" << bufStatusStr << " (" << stat << ")"; if (stat != WriteEngine::PARSE_COMPLETE) { oss << "; fCurrentParseBuffer is " << myParseBuffer; } oss << endl; cout << oss.str(); cout.flush(); } // @bug2099- // get a buffer and column to parse if available. if ((columnId = fTableInfo[i]->getColumnForParse(thrdId, myParseBuffer, report)) != -1) { tableId = i; return true; } myParseBuffer = (myParseBuffer + 1) % fTableInfo[i]->getNumberOfBuffers(); } while (myParseBuffer != currentParseBuffer); } return false; } //------------------------------------------------------------------------------ // Used by the parsing threads to check to see if all the tables match the // specified status. The only current use for the function is to check to // see if all the tables have a PARSE_COMPLETE status, hence the name of the // function. // If a table has an ERR status, then all the tables are considered finished, // because that basically means we had an import error, and we need to shut // down the job. // Returns TRUE if all the tables are considered completed, else returns FALSE. //------------------------------------------------------------------------------ bool BulkLoad::allTablesDone(Status status) { for (unsigned i = 0; i < fTableInfo.size(); ++i) { if (fTableInfo[i]->getStatusTI() == WriteEngine::ERR) return true; if (fTableInfo[i]->getStatusTI() != status) return false; } return true; } //------------------------------------------------------------------------------ // Set status for specified table to reflect a parsing error. // Optionally lock fParseMutex (if requested). // May evaluate later whether we need to employ the fParseMutex for this call. //------------------------------------------------------------------------------ void BulkLoad::setParseErrorOnTable(int tableId, bool lockParseMutex) { if (lockParseMutex) { boost::mutex::scoped_lock lock(fParseMutex); fTableInfo[tableId]->setParseError(); } else { fTableInfo[tableId]->setParseError(); } } } // namespace WriteEngine