1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

2473 lines
80 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*******************************************************************************
* $Id: we_tableinfo.cpp 4648 2013-05-29 21:42:40Z rdempsey $
*
*******************************************************************************/
/** @file */
#include "we_tableinfo.h"
#include "we_bulkstatus.h"
#include "we_bulkload.h"
#include <sstream>
#include <sys/time.h>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <utility>
// @bug 2099+
#include <iostream>
#include <libmarias3/marias3.h>
#include <string.h>
using namespace std;
// @bug 2099-
#include <boost/filesystem/path.hpp>
using namespace boost;
#include "we_config.h"
#include "we_simplesyslog.h"
#include "we_bulkrollbackmgr.h"
#include "we_confirmhdfsdbfile.h"
#include "querytele.h"
using namespace querytele;
#include "oamcache.h"
#include "cacheutils.h"
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/exception.h>
namespace
{
const std::string BAD_FILE_SUFFIX = ".bad"; // Reject data file suffix
const std::string ERR_FILE_SUFFIX = ".err"; // Job error file suffix
const std::string BOLD_START = "\033[0;1m";
const std::string BOLD_STOP = "\033[0;39m";
} // namespace
namespace WriteEngine
{
// Helpers
int TableInfo::compareHWMs(const int smallestColumnId, const int widerColumnId,
const uint32_t smallerColumnWidth, const uint32_t widerColumnWidth,
const std::vector<DBRootExtentInfo>& segFileInfo, int& colIdx)
{
int rc = NO_ERROR;
if (widerColumnId < 0)
{
return rc;
}
uint32_t columnDiffMultiplier = widerColumnWidth / smallerColumnWidth;
HWM hwmLo = segFileInfo[smallestColumnId].fLocalHwm * columnDiffMultiplier;
HWM hwmHi = hwmLo + columnDiffMultiplier - 1;
if ((segFileInfo[widerColumnId].fLocalHwm < hwmLo) || (segFileInfo[widerColumnId].fLocalHwm > hwmHi))
{
colIdx = widerColumnId;
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
}
return rc;
}
//------------------------------------------------------------------------------
// Puts the current thread to sleep for the specified number of milliseconds.
// (Ex: used to wait for a Read buffer to become available.)
//------------------------------------------------------------------------------
void TableInfo::sleepMS(long ms)
{
struct timespec rm_ts;
rm_ts.tv_sec = ms / 1000;
rm_ts.tv_nsec = ms % 1000 * 1000000;
struct timespec abs_ts;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
} while (nanosleep(&abs_ts, &rm_ts) < 0);
}
//------------------------------------------------------------------------------
// TableInfo constructor
//------------------------------------------------------------------------------
TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processName, OID tableOID,
const string& tableName, bool bKeepRbMetaFile)
: fTableId(-1)
, fBufferSize(0)
, fFileBufSize(0)
, fStatusTI(WriteEngine::NEW)
, fReadBufCount(0)
, fNumberOfColumns(0)
, fHandle(NULL)
, fCurrentReadBuffer(0)
, fTotalReadRows(0)
, fTotalErrRows(0)
, fMaxErrorRows(5)
, fLastBufferId(-1)
, fFileBuffer(NULL)
, fCurrentParseBuffer(0)
, fNumberOfColsParsed(0)
, fLocker(-1)
, fTableName(tableName)
, fTableOID(tableOID)
, fJobId(0)
, fLog(logger)
, fTxnID(txnID)
, fRBMetaWriter(processName, logger)
, fProcessName(processName)
, fKeepRbMetaFile(bKeepRbMetaFile)
, fbTruncationAsError(false)
, fImportDataMode(IMPORT_DATA_TEXT)
, fTimeZone(dataconvert::systemTimeZoneOffset())
, fTableLocked(false)
, fReadFromStdin(false)
, fReadFromS3(false)
, fNullStringMode(false)
, fEnclosedByChar('\0')
, fEscapeChar('\\')
, fProcessingBegun(false)
, fBulkMode(BULK_MODE_LOCAL)
, fBRMReporter(logger, tableName)
, fTableLockID(0)
, fRejectDataCnt(0)
, fRejectErrCnt(0)
, fExtentStrAlloc(tableOID, logger)
, fOamCachePtr(oam::OamCache::makeOamCache())
, fParquetReader(nullptr)
, fReader(nullptr)
{
fBuffers.clear();
fColumns.clear();
fStartTime.tv_sec = 0;
fStartTime.tv_usec = 0;
string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
if (!teleServerHost.empty())
{
int teleServerPort =
config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
if (teleServerPort > 0)
{
fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
}
}
}
//------------------------------------------------------------------------------
// TableInfo destructor
//------------------------------------------------------------------------------
TableInfo::~TableInfo()
{
fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
freeProcessingBuffers();
}
//------------------------------------------------------------------------------
// Frees up processing buffer memory. We don't reset fReadBufCount to 0,
// because BulkLoad::lockColumnForParse() is calling getNumberOfBuffers()
// and dividing by the return value. So best not to risk returning 0.
// Once we get far enough to call this freeProcessingBuffers() function,
// the application code obviously better be completely through accessing
// fBuffers and fColumns.
//------------------------------------------------------------------------------
void TableInfo::freeProcessingBuffers()
{
// fLog->logMsg(
// string("Releasing TableInfo Buffer for ")+fTableName,
// MSGLVL_INFO1);
fBuffers.clear();
fColumns.clear();
fNumberOfColumns = 0;
}
//------------------------------------------------------------------------------
// Close any database column or dictionary store files left open for this table.
// Under "normal" circumstances, there should be no files left open when we
// reach the end of the job, but in some error cases, the parsing threads may
// bail out without closing a file. So this function is called as part of
// EOJ cleanup for any tables that are still holding a table lock.
//
// Files will automatically get closed when the program terminates, but when
// we are preparing for a bulk rollback, we want to explicitly close the files
// before we "reopen" them and start rolling back the contents of the files.
//
// For mode1 and mode2 imports, cpimport.bin does not lock the table or perform
// a bulk rollback, and closeOpenDbFile() is not called. We instead rely on
// the program to implicitly close the files.
//------------------------------------------------------------------------------
void TableInfo::closeOpenDbFiles()
{
ostringstream oss;
oss << "Closing DB files for table " << fTableName << ", left open by abnormal termination.";
fLog->logMsg(oss.str(), MSGLVL_INFO2);
for (unsigned int k = 0; k < fColumns.size(); k++)
{
stringstream oss1;
oss1 << "Closing DB column file for: " << fColumns[k].column.colName << " (OID-"
<< fColumns[k].column.mapOid << ")";
fLog->logMsg(oss1.str(), MSGLVL_INFO2);
fColumns[k].closeColumnFile(false, true);
if (fColumns[k].column.colType == COL_TYPE_DICT)
{
stringstream oss2;
oss2 << "Closing DB store file for: " << fColumns[k].column.colName << " (OID-"
<< fColumns[k].column.dctnry.dctnryOid << ")";
fLog->logMsg(oss2.str(), MSGLVL_INFO2);
fColumns[k].closeDctnryStore(true);
}
}
}
//------------------------------------------------------------------------------
// Locks this table for reading to the specified thread (locker) "if" the table
// has not yet been assigned to a read thread.
//------------------------------------------------------------------------------
bool TableInfo::lockForRead(const int& locker)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
if (fLocker == -1)
{
if (fStatusTI == WriteEngine::NEW)
{
fLocker = locker;
return true;
}
}
return false;
}
//------------------------------------------------------------------------------
// Loop thru reading the import file(s) assigned to this TableInfo object.
//------------------------------------------------------------------------------
int TableInfo::readTableData()
{
RID validTotalRows = 0;
RID totalRowsPerInputFile = 0;
int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function
// needs int64_t type
int filesTBProcessed = fLoadFileList.size();
int fileCounter = 0;
unsigned long long qtSentAt = 0;
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
if (fHandle == NULL)
{
fFileName = fLoadFileList[fileCounter];
int rc = openTableFile();
if (rc != NO_ERROR)
{
// Mark the table status as error and exit.
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
return rc;
}
fileCounter++;
}
}
else
{
if (fParquetReader == NULL)
{
fFileName = fLoadFileList[fileCounter];
int rc = openTableFileParquet(totalRowsParquet);
if (rc != NO_ERROR)
{
// Mark the table status as error and exit.
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
return rc;
}
fileCounter++;
}
}
timeval readStart;
gettimeofday(&readStart, NULL);
ostringstream ossStartMsg;
ossStartMsg << "Start reading and loading table " << fTableName;
fLog->logMsg(ossStartMsg.str(), MSGLVL_INFO2);
fProcessingBegun = true;
ImportTeleStats its;
its.job_uuid = fJobUUID;
its.import_uuid = QueryTeleClient::genUUID();
its.msg_type = ImportTeleStats::IT_START;
its.start_time = QueryTeleClient::timeNowms();
its.table_list.push_back(fTableName);
its.rows_so_far.push_back(0);
its.system_name = fOamCachePtr->getSystemName();
its.module_name = fOamCachePtr->getModuleName();
string tn = getTableName();
its.schema_name = string(tn, 0, tn.find('.'));
fQtc.postImportTele(its);
//
// LOOP to read all the import data for this table
//
while (true)
{
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
its.msg_type = ImportTeleStats::IT_TERM;
its.rows_so_far.pop_back();
its.rows_so_far.push_back(0);
fQtc.postImportTele(its);
throw SecondaryShutdownException(
"TableInfo::"
"readTableData(1) responding to job termination");
}
// @bug 3271: Conditionally compile the thread deadlock debug logging
#ifdef DEADLOCK_DEBUG
// @bug2099+. Temp hack to diagnose deadlock.
struct timeval tvStart;
gettimeofday(&tvStart, 0);
bool report = false;
bool reported = false;
// @bug2099-
#else
const bool report = false;
#endif
#ifdef PROFILE
Stats::startReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
#endif
//
// LOOP to wait for, and read, the next avail BulkLoadBuffer object
//
while (!isBufferAvailable(report))
{
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
its.msg_type = ImportTeleStats::IT_TERM;
its.rows_so_far.pop_back();
its.rows_so_far.push_back(0);
fQtc.postImportTele(its);
throw SecondaryShutdownException(
"TableInfo::"
"readTableData(2) responding to job termination");
}
// Sleep and check the condition again.
sleepMS(1);
#ifdef DEADLOCK_DEBUG
// @bug2099+
if (report)
report = false; // report one time.
if (!reported)
{
struct timeval tvNow;
gettimeofday(&tvNow, 0);
if ((tvNow.tv_sec - tvStart.tv_sec) > 100)
{
time_t t = time(0);
char timeString[50];
ctime_r(&t, timeString);
timeString[strlen(timeString) - 1] = '\0';
ostringstream oss;
oss << endl
<< timeString << ": "
<< "TableInfo::readTableData: " << fTableName << "; Diff is " << (tvNow.tv_sec - tvStart.tv_sec)
<< endl;
cout << oss.str();
cout.flush();
report = true;
reported = true;
}
}
// @bug2099-
#endif
}
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
Stats::startReadEvent(WE_STATS_READ_INTO_BUF);
#endif
int readBufNo = fCurrentReadBuffer;
int prevReadBuf = (fCurrentReadBuffer - 1);
if (prevReadBuf < 0)
prevReadBuf = fReadBufCount + prevReadBuf;
// We keep a running total of read errors; fMaxErrorRows specifies
// the error limit. Here's where we see how many more errors we
// still have below the limit, and we pass this to fillFromFile().
unsigned allowedErrCntThisCall = ((fMaxErrorRows > fTotalErrRows) ? (fMaxErrorRows - fTotalErrRows) : 0);
// Fill in the specified buffer.
// fTotalReadRowsPerInputFile is ongoing total number of rows read,
// per input file.
// validTotalRows is ongoing total of valid rows read for all files
// pertaining to this DB table.
int readRc;
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
if (fReadFromS3)
{
readRc = fBuffers[readBufNo].fillFromMemory(fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength,
&fS3ParseLength, totalRowsPerInputFile, validTotalRows,
fColumns, allowedErrCntThisCall);
}
else
{
readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
validTotalRows, fColumns, allowedErrCntThisCall);
}
}
else
{
readRc = fBuffers[readBufNo].fillFromFileParquet(totalRowsPerInputFile, validTotalRows);
}
if (readRc != NO_ERROR)
{
// error occurred.
// need to exit.
// mark the table status as error and exit.
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
}
closeTableFile();
// Error occurred on next row not read, so increment
// totalRowsPerInputFile row count for the error msg
WErrorCodes ec;
ostringstream oss;
oss << "Error reading import file " << fFileName << "; near line " << totalRowsPerInputFile + 1 << "; "
<< ec.errorString(readRc);
fLog->logMsg(oss.str(), readRc, MSGLVL_ERROR);
its.msg_type = ImportTeleStats::IT_TERM;
its.rows_so_far.pop_back();
its.rows_so_far.push_back(0);
fQtc.postImportTele(its);
return readRc;
}
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_READ_INTO_BUF);
#endif
its.msg_type = ImportTeleStats::IT_PROGRESS;
its.rows_so_far.pop_back();
its.rows_so_far.push_back(totalRowsPerInputFile);
unsigned long long thisRows = static_cast<unsigned long long>(totalRowsPerInputFile);
thisRows /= 1000000;
if (thisRows > qtSentAt)
{
fQtc.postImportTele(its);
qtSentAt = thisRows;
}
// Check if there were any errors in the read data.
// if yes, copy it to the error list.
// if the number of errors is greater than the maximum error count
// mark the table status as error and exit.
// call the method to copy the errors
writeErrorList(&fBuffers[readBufNo].getErrorRows(), &fBuffers[readBufNo].getExactErrorRows(), false);
fBuffers[readBufNo].clearErrRows();
if (fTotalErrRows > fMaxErrorRows)
{
// flush the reject data file and output the rejected rows
// flush err file and output the rejected row id and the reason.
writeErrorList(0, 0, true);
// number of errors > maximum allowed. hence return error.
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStartTime = readStart;
fStatusTI = WriteEngine::ERR;
fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
}
closeTableFile();
ostringstream oss5;
oss5 << "Actual error row count(" << fTotalErrRows << ") exceeds the max error rows(" << fMaxErrorRows
<< ") allowed for table " << fTableName;
fLog->logMsg(oss5.str(), ERR_BULK_MAX_ERR_NUM, MSGLVL_ERROR);
// List Err and Bad files to report file (if applicable)
fBRMReporter.rptMaxErrJob(fBRMRptFileName, fErrFiles, fBadFiles);
its.msg_type = ImportTeleStats::IT_TERM;
its.rows_so_far.pop_back();
its.rows_so_far.push_back(0);
fQtc.postImportTele(its);
return ERR_BULK_MAX_ERR_NUM;
}
// mark the buffer status as read complete.
{
#ifdef PROFILE
Stats::startReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
#endif
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
Stats::startReadEvent(WE_STATS_COMPLETING_READ);
#endif
fStartTime = readStart;
fBuffers[readBufNo].setStatusBLB(WriteEngine::READ_COMPLETE);
fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
// bufferCount++;
if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) ||
(totalRowsPerInputFile == (RID)totalRowsParquet))
{
timeval readFinished;
gettimeofday(&readFinished, NULL);
closeTableFile();
if (fReadFromStdin)
{
fLog->logMsg("Finished loading " + fTableName + " from STDIN" + ", Time taken = " +
Convertor::int2Str((int)(readFinished.tv_sec - readStart.tv_sec)) + " seconds",
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
MSGLVL_INFO2);
}
else if (fReadFromS3)
{
fLog->logMsg("Finished loading " + fTableName + " from S3" + ", Time taken = " +
Convertor::int2Str((int)(readFinished.tv_sec - readStart.tv_sec)) + " seconds",
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
MSGLVL_INFO2);
}
else
{
fLog->logMsg("Finished reading file " + fFileName + ", Time taken = " +
Convertor::int2Str((int)(readFinished.tv_sec - readStart.tv_sec)) + " seconds",
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
MSGLVL_INFO2);
}
// flush the reject data file and output the rejected rows
// flush err file and output the rejected row id and the reason.
writeErrorList(0, 0, true);
// If > 1 file for this table, then open next file in the list
if (fileCounter < filesTBProcessed)
{
fFileName = fLoadFileList[fileCounter];
int rc;
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
rc = openTableFile();
}
else
{
rc = openTableFileParquet(totalRowsParquet);
}
if (rc != NO_ERROR)
{
// Mark the table status as error and exit.
fStatusTI = WriteEngine::ERR;
return rc;
}
fileCounter++;
fTotalReadRows += totalRowsPerInputFile;
totalRowsPerInputFile = 0;
}
else // All files read for this table; break out of read loop
{
fStatusTI = WriteEngine::READ_COMPLETE;
fLastBufferId = readBufNo;
fTotalReadRows += totalRowsPerInputFile;
break;
}
gettimeofday(&readStart, NULL);
} // reached EOF
#ifdef PROFILE
Stats::stopReadEvent(WE_STATS_COMPLETING_READ);
#endif
} // mark buffer status as read-complete within scope of a mutex
} // loop to read all data for this table
its.msg_type = ImportTeleStats::IT_SUMMARY;
its.end_time = QueryTeleClient::timeNowms();
its.rows_so_far.pop_back();
its.rows_so_far.push_back(fTotalReadRows);
fQtc.postImportTele(its);
fQtc.waitForQueues();
return NO_ERROR;
}
//------------------------------------------------------------------------------
// writeErrorList()
// errorRows - vector of row numbers and corresponding error messages
// errorDatRows - vector of bad rows that have been rejected
//
// Adds errors pertaining to a specific buffer, to the cumulative list of
// errors to be reported to the user.
//------------------------------------------------------------------------------
void TableInfo::writeErrorList(const std::vector<std::pair<RID, std::string> >* errorRows,
const std::vector<std::string>* errorDatRows, bool bCloseFile)
{
size_t errorRowsCount = 0;
size_t errorDatRowsCount = 0;
if (errorRows)
errorRowsCount = errorRows->size();
if (errorDatRows)
errorDatRowsCount = errorDatRows->size();
if ((errorRowsCount > 0) || (errorDatRowsCount > 0) || (bCloseFile))
{
boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
if ((errorRowsCount > 0) || (bCloseFile))
writeErrReason(errorRows, bCloseFile);
if ((errorDatRowsCount > 0) || (bCloseFile))
writeBadRows(errorDatRows, bCloseFile);
fTotalErrRows += errorRowsCount;
}
}
//------------------------------------------------------------------------------
// Parse the specified column (columnId) in the specified buffer (bufferId).
//------------------------------------------------------------------------------
int TableInfo::parseColumn(const int& columnId, const int& bufferId, double& processingTime)
{
// parse the column
// note the time and update the column's last processing time
timeval parseStart, parseEnd;
gettimeofday(&parseStart, NULL);
// Will need to check whether the column needs to extend.
// If size of the file is less than the required size, extend the column
int rc = fBuffers[bufferId].parse(fColumns[columnId]);
gettimeofday(&parseEnd, NULL);
processingTime = (parseEnd.tv_usec / 1000 + parseEnd.tv_sec * 1000) -
(parseStart.tv_usec / 1000 + parseStart.tv_sec * 1000);
return rc;
}
//------------------------------------------------------------------------------
// Mark the specified column (columnId) in the specified buffer (bufferId) as
// PARSE_COMPLETE. If this is the last column to be parsed for this buffer,
// then mark the buffer as PARSE_COMPLETE.
// If the last buffer for this table has been read (fLastBufferId != -1), then
// see if all the data for columnId has been parsed for all the buffers, in
// which case we are finished parsing columnId.
// If this is the last column to finish parsing for this table, then mark the
// table status as PARSE_COMPLETE.
//------------------------------------------------------------------------------
int TableInfo::setParseComplete(const int& columnId, const int& bufferId, double processingTime)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
// Check table status in case race condition results in this function
// being called after fStatusTI was set to ERR by another thread.
if (fStatusTI == WriteEngine::ERR)
return ERR_UNKNOWN;
fColumns[columnId].lastProcessingTime = processingTime;
#ifdef PROFILE
fColumns[columnId].totalProcessingTime += processingTime;
#endif
// Set buffer status to complete if setColumnStatus indicates that
// all the columns are complete
if (fBuffers[bufferId].setColumnStatus(columnId, WriteEngine::PARSE_COMPLETE))
fBuffers[bufferId].setStatusBLB(WriteEngine::PARSE_COMPLETE);
// fLastBufferId != -1 means the Read thread has read the last
// buffer for this table
if (fLastBufferId != -1)
{
// check if the status of the column in all the fBuffers is parse
// complete then update the column status as parse complete.
bool allBuffersDoneForAColumn = true;
for (int i = 0; i < fReadBufCount; ++i)
{
// check the status of the column in this buffer.
Status bufferStatus = fBuffers[i].getStatusBLB();
if ((bufferStatus == WriteEngine::READ_COMPLETE) || (bufferStatus == WriteEngine::PARSE_COMPLETE))
{
if (fBuffers[i].getColumnStatus(columnId) != WriteEngine::PARSE_COMPLETE)
{
allBuffersDoneForAColumn = false;
break;
}
}
}
// allBuffersDoneForAColumn==TRUE means we are finished parsing columnId
if (allBuffersDoneForAColumn)
{
// Accumulate list of HWM dictionary blocks to be flushed from cache
std::vector<BRM::LBID_t> dictBlksToFlush;
fColumns[columnId].getDictFlushBlks(dictBlksToFlush);
for (unsigned kk = 0; kk < dictBlksToFlush.size(); kk++)
{
fDictFlushBlks.push_back(dictBlksToFlush[kk]);
}
int rc = fColumns[columnId].finishParsing();
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete completion error; "
"Failed to load table: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
return rc;
}
fNumberOfColsParsed++;
//
// If all columns have been parsed, then finished with this tbl
//
if (fNumberOfColsParsed >= fNumberOfColumns)
{
// After closing the column and dictionary store files,
// flush any updated dictionary blocks in PrimProc.
// We only do this for non-HDFS. For HDFS we don't want
// to flush till "after" we have "confirmed" all the file
// changes, which flushes the changes to disk.
if (!idbdatafile::IDBPolicy::useHdfs())
{
if (fDictFlushBlks.size() > 0)
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
#endif
if (fLog->isDebug(DEBUG_2))
{
ostringstream oss;
oss << "Dictionary cache flush: ";
for (uint32_t i = 0; i < fDictFlushBlks.size(); i++)
{
oss << fDictFlushBlks[i] << ", ";
}
oss << endl;
fLog->logMsg(oss.str(), MSGLVL_INFO1);
}
cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks);
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
#endif
fDictFlushBlks.clear();
}
}
// Update auto-increment next value if applicable.
rc = synchronizeAutoInc();
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete: autoInc update error; "
"Failed to load table: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
return rc;
}
//..Validate that all the HWM's are consistent and in-sync
std::vector<DBRootExtentInfo> segFileInfo;
for (unsigned i = 0; i < fColumns.size(); ++i)
{
DBRootExtentInfo extentInfo;
fColumns[i].getSegFileInfo(extentInfo);
segFileInfo.push_back(extentInfo);
}
rc = validateColumnHWMs(0, segFileInfo, "Ending");
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete: HWM validation error; "
"Failed to load table: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
ostringstream oss2;
oss2 << "Ending HWMs for table " << fTableName << ": ";
for (unsigned int n = 0; n < fColumns.size(); n++)
{
oss2 << std::endl;
oss2 << " " << fColumns[n].column.colName << "; DBRoot/part/seg/hwm: " << segFileInfo[n].fDbRoot
<< "/" << segFileInfo[n].fPartition << "/" << segFileInfo[n].fSegment << "/"
<< segFileInfo[n].fLocalHwm;
}
fLog->logMsg(oss2.str(), MSGLVL_INFO1);
return rc;
}
//..Confirm changes to DB files (necessary for HDFS)
rc = confirmDBFileChanges();
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete: Error confirming DB changes; "
"Failed to load table: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
return rc;
}
//..Update BRM with HWM and Casual Partition info, etc.
rc = finishBRM();
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete: BRM error; "
"Failed to load table: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
return rc;
}
// Change table lock state to CLEANUP
rc = changeTableLockState();
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete: table lock state change error; "
"Table load completed: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
return rc;
}
// Finished with this table, so delete bulk rollback
// meta data file and release the table lock.
deleteTempDBFileChanges();
deleteMetaDataRollbackFile();
rc = releaseTableLock();
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "setParseComplete: table lock release error; "
"Failed to load table: "
<< fTableName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fStatusTI = WriteEngine::ERR;
return rc;
}
#ifdef PROFILE
// Loop through columns again to print out the elapsed
// parse times
for (unsigned i = 0; i < fColumns.size(); ++i)
{
ostringstream ossColTime;
ossColTime << "Column " << i << "; OID-" << fColumns[i].column.mapOid << "; parseTime-"
<< (fColumns[i].totalProcessingTime / 1000.0) << " seconds";
fLog->logMsg(ossColTime.str(), MSGLVL_INFO1);
}
#endif
timeval endTime;
gettimeofday(&endTime, 0);
double elapsedTime = (endTime.tv_sec + (endTime.tv_usec / 1000000.0)) -
(fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
fStatusTI = WriteEngine::PARSE_COMPLETE;
reportTotals(elapsedTime);
// Reduce memory use by allocating and releasing as needed
freeProcessingBuffers();
} // end of if (fNumberOfColsParsed >= fNumberOfColumns)
} // end of if (allBuffersDoneForAColumn)
} // end of if (fLastBufferId != -1)
// If we finished parsing the buffer associated with currentParseBuffer,
// but have not finshed the entire table, then advance currentParseBuffer.
if ((fStatusTI != WriteEngine::PARSE_COMPLETE) &&
(fBuffers[bufferId].getStatusBLB() == WriteEngine::PARSE_COMPLETE))
{
// Find the BulkLoadBuffer object that is next in line to be parsed
// and assign fCurrentParseBuffer accordingly. Break out of the
// loop if we wrap all the way around and catch up with the current-
// Read buffer.
if (bufferId == fCurrentParseBuffer)
{
int currentParseBuffer = fCurrentParseBuffer;
while (fBuffers[currentParseBuffer].getStatusBLB() == WriteEngine::PARSE_COMPLETE)
{
currentParseBuffer = (currentParseBuffer + 1) % fReadBufCount;
fCurrentParseBuffer = currentParseBuffer;
if (fCurrentParseBuffer == fCurrentReadBuffer)
break;
}
}
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Report summary totals to applicable destination (stdout, cpimport.bin log
// file, BRMReport file (for mode1) etc).
// elapsedTime is number of seconds taken to import this table.
//------------------------------------------------------------------------------
void TableInfo::reportTotals(double elapsedTime)
{
ostringstream oss1;
oss1 << "For table " << fTableName << ": " << fTotalReadRows << " rows processed and "
<< (fTotalReadRows - fTotalErrRows) << " rows inserted.";
fLog->logMsg(oss1.str(), MSGLVL_INFO1);
ostringstream oss2;
oss2 << "For table " << fTableName << ": "
<< "Elapsed time to load this table: " << elapsedTime << " secs";
fLog->logMsg(oss2.str(), MSGLVL_INFO2);
// @bug 3504: Loop through columns to print saturation counts
std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType, uint64_t, uint64_t> > satCounts;
for (unsigned i = 0; i < fColumns.size(); ++i)
{
// std::string colName(fTableName);
// colName += '.';
// colName += fColumns[i].column.colName;
long long satCount = fColumns[i].saturatedCnt();
satCounts.push_back(boost::make_tuple(fColumns[i].column.dataType, fColumns[i].column.mapOid, satCount));
if (satCount > 0)
{
// @bug 3375: report invalid dates/times set to null
ostringstream ossSatCnt;
ossSatCnt << "Column " << fTableName << '.' << fColumns[i].column.colName << "; Number of ";
if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::DATE)
{
ossSatCnt << "invalid dates replaced with zero value : ";
}
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::DATETIME)
{
// bug5383
ossSatCnt << "invalid date/times replaced with zero value : ";
}
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIMESTAMP)
{
ossSatCnt << "invalid timestamps replaced with zero value : ";
}
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIME)
{
ossSatCnt << "invalid times replaced with zero value : ";
}
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::CHAR)
ossSatCnt << "character strings truncated: ";
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::VARCHAR)
ossSatCnt << "character strings truncated: ";
else
ossSatCnt << "rows inserted with saturated values: ";
ossSatCnt << satCount;
fLog->logMsg(ossSatCnt.str(), MSGLVL_WARNING);
}
}
logging::Message::Args tblFinishedMsgArgs;
tblFinishedMsgArgs.add(fJobId);
tblFinishedMsgArgs.add(fTableName);
tblFinishedMsgArgs.add((fTotalReadRows - fTotalErrRows));
SimpleSysLog::instance()->logMsg(tblFinishedMsgArgs, logging::LOG_TYPE_INFO, logging::M0083);
// Bug1375 - cpimport.bin did not add entries to the transaction
// log file: data_mods.log
if ((fTotalReadRows - fTotalErrRows) > 0)
logToDataMods(fjobFileName, oss1.str());
// Log totals in report file if applicable
fBRMReporter.reportTotals(fTotalReadRows, (fTotalReadRows - fTotalErrRows), satCounts);
}
//------------------------------------------------------------------------------
// Report BRM updates to a report file or to BRM directly.
//------------------------------------------------------------------------------
int TableInfo::finishBRM()
{
// Collect the CP and HWM information for all the columns
for (unsigned i = 0; i < fColumns.size(); ++i)
{
fColumns[i].getBRMUpdateInfo(fBRMReporter);
}
// We use mutex not to synchronize contention among parallel threads,
// because we should be the only thread accessing the fErrFiles and
// fBadFiles at this point. But we do use the mutex as a memory barrier
// to make sure we have the latest copy of the data.
std::vector<std::string>* errFiles = 0;
std::vector<std::string>* badFiles = 0;
{
boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
errFiles = &fErrFiles;
badFiles = &fBadFiles;
}
// Save the info just collected, to a report file or send to BRM
int rc = fBRMReporter.sendBRMInfo(fBRMRptFileName, *errFiles, *badFiles);
return rc;
}
//------------------------------------------------------------------------------
// Update status of table to reflect an error.
// No need to update the buffer or column status, because we are not going to
// continue the job anyway. Other threads should terminate when they see that
// the JobStatus has been set to EXIT_FAILURE and/or the table status has been
// set to WriteEngine::ERR.
//------------------------------------------------------------------------------
void TableInfo::setParseError()
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
fStatusTI = WriteEngine::ERR;
}
//------------------------------------------------------------------------------
// Locks a column from the specified buffer (bufferId) for the specified parse
// thread (id); and returns the column id. A return value of -1 means no
// column could be locked for parsing.
//------------------------------------------------------------------------------
// @bug2099. Temporary hack to diagnose deadlock.
// Added report parm and couts below.
int TableInfo::getColumnForParse(const int& id, const int& bufferId, bool report)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
double maxTime = 0;
int columnId = -1;
while (true)
{
// See if JobStatus has been set to terminate by another thread
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
{
fStatusTI = WriteEngine::ERR;
throw SecondaryShutdownException(
"TableInfo::"
"getColumnForParse() responding to job termination");
}
if (!bufferReadyForParse(bufferId, report))
return -1;
// @bug2099+
ostringstream oss;
if (report)
{
oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId << "]: (colLocker,status,lasttime)- ";
}
// @bug2099-
for (unsigned k = 0; k < fNumberOfColumns; ++k)
{
// @bug2099+
if (report)
{
Status colStatus = fBuffers[bufferId].getColumnStatus(k);
int colLocker = fBuffers[bufferId].getColumnLocker(k);
string colStatusStr;
ColumnInfo::convertStatusToString(colStatus, colStatusStr);
oss << '(' << colLocker << ',' << colStatusStr << ',' << fColumns[k].lastProcessingTime << ") ";
}
// @bug2099-
if (fBuffers[bufferId].getColumnLocker(k) == -1)
{
if (columnId == -1)
columnId = k;
else if (fColumns[k].lastProcessingTime == 0)
{
if (fColumns[k].column.width >= fColumns[columnId].column.width)
columnId = k;
}
else if (fColumns[k].lastProcessingTime > maxTime)
{
maxTime = fColumns[k].lastProcessingTime;
columnId = k;
}
}
}
// @bug2099+
if (report)
{
oss << "; selected colId: " << columnId;
if (columnId != -1)
oss << "; maxTime: " << maxTime;
oss << endl;
if (!BulkLoad::disableConsoleOutput())
{
cout << oss.str();
cout.flush();
}
}
// @bug2099-
if (columnId == -1)
return -1;
if (fBuffers[bufferId].tryAndLockColumn(columnId, id))
{
return columnId;
}
}
}
//------------------------------------------------------------------------------
// Check if the specified buffer is ready for parsing (status == READ_COMPLETE)
// @bug 2099. Temporary hack to diagnose deadlock. Added report parm
// and couts below.
//------------------------------------------------------------------------------
bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
{
if (fBuffers.size() == 0)
return false;
Status stat = fBuffers[bufferId].getStatusBLB();
if (report)
{
ostringstream oss;
string bufStatusStr;
ColumnInfo::convertStatusToString(stat, bufStatusStr);
oss << " --- " << pthread_self() << ":fBuffers[" << bufferId << "]=" << bufStatusStr << " (" << stat
<< ")" << std::endl;
cout << oss.str();
}
return (stat == WriteEngine::READ_COMPLETE) ? true : false;
}
//------------------------------------------------------------------------------
// Create the specified number (noOfBuffer) of BulkLoadBuffer objects and store
// them in fBuffers. jobFieldRefList lists the fields in this import.
// fixedBinaryRecLen is fixed record length for binary imports (it is n/a
// for text bulk loads).
//------------------------------------------------------------------------------
int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList,
unsigned int fixedBinaryRecLen)
{
fReadBufCount = noOfBuffers;
// initialize and populate the buffer vector.
for (int i = 0; i < fReadBufCount; ++i)
{
BulkLoadBuffer* buffer =
new BulkLoadBuffer(fNumberOfColumns, fBufferSize, fLog, i, fTableName, jobFieldRefList);
buffer->setColDelimiter(fColDelim);
buffer->setNullStringMode(fNullStringMode);
buffer->setEnclosedByChar(fEnclosedByChar);
buffer->setEscapeChar(fEscapeChar);
buffer->setTruncationAsError(getTruncationAsError());
buffer->setImportDataMode(fImportDataMode, fixedBinaryRecLen);
buffer->setTimeZone(fTimeZone);
fBuffers.push_back(buffer);
}
if (!fS3Key.empty())
{
ms3_library_init();
ms3 = ms3_init(fS3Key.c_str(), fS3Secret.c_str(), fS3Region.c_str(), fS3Host.c_str());
if (!ms3)
{
ostringstream oss;
oss << "Error initiating S3 library";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
}
return 0;
}
//------------------------------------------------------------------------------
// Add the specified ColumnInfo object (info) into this table's fColumns vector.
//------------------------------------------------------------------------------
void TableInfo::addColumn(ColumnInfo* info)
{
fColumns.push_back(info);
fNumberOfColumns = fColumns.size();
fExtentStrAlloc.addColumn(info->column.mapOid, info->column.width, info->column.dataType);
}
int TableInfo::openTableFileParquet(int64_t& totalRowsParquet)
{
if (fParquetReader != NULL)
return NO_ERROR;
std::shared_ptr<arrow::io::ReadableFile> infile;
try
{
PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(fFileName, arrow::default_memory_pool()));
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &fReader));
fReader->set_batch_size(1000);
PARQUET_THROW_NOT_OK(fReader->ScanContents({0}, 1000, &totalRowsParquet));
PARQUET_THROW_NOT_OK(fReader->GetRecordBatchReader(&fParquetReader));
}
catch (std::exception& ex)
{
ostringstream oss;
oss << "Error opening import file " << fFileName << ".";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
catch (...)
{
ostringstream oss;
oss << "Error opening import file " << fFileName << ".";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
// initialize fBuffers batch source
for (auto& buffer : fBuffers)
{
buffer.setParquetReader(fParquetReader);
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Open the file corresponding to fFileName so that we can import it's contents.
// A buffer is also allocated and passed to setvbuf().
// If fReadFromStdin is true, we just assign stdin to our fHandle for reading.
//------------------------------------------------------------------------------
int TableInfo::openTableFile()
{
if (fHandle != NULL)
return NO_ERROR;
if (fReadFromStdin)
{
fHandle = stdin;
// Not 100% sure that calling setvbuf on stdin does much, but in
// some tests, it made a slight difference.
fFileBuffer = new char[fFileBufSize];
setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
ostringstream oss;
oss << BOLD_START << "Reading input from STDIN to import into table " << fTableName << "..." << BOLD_STOP;
fLog->logMsg(oss.str(), MSGLVL_INFO1);
}
else if (fReadFromS3)
{
int res;
res = ms3_get(ms3, fS3Bucket.c_str(), fFileName.c_str(), (uint8_t**)&fFileBuffer, &fS3ReadLength);
fS3ParseLength = 0;
if (res)
{
ostringstream oss;
oss << "Error retrieving file " << fFileName << " from S3: ";
if (ms3_server_error(ms3))
{
oss << ms3_server_error(ms3);
}
else
{
oss << ms3_error(res);
}
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
}
else
{
if (fImportDataMode == IMPORT_DATA_TEXT)
fHandle = fopen(fFileName.c_str(), "r");
else
fHandle = fopen(fFileName.c_str(), "rb");
if (fHandle == NULL)
{
int errnum = errno;
ostringstream oss;
oss << "Error opening import file " << fFileName << ". " << strerror(errnum);
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
// return an error; caller should set fStatusTI if needed
return ERR_FILE_OPEN;
}
// now the input load file is available for reading the data.
// read the data from the load file into the buffers.
fFileBuffer = new char[fFileBufSize];
setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
ostringstream oss;
oss << "Opening " << fFileName << " to import into table " << fTableName;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Close the current open file we have been importing.
//------------------------------------------------------------------------------
void TableInfo::closeTableFile()
{
if (fImportDataMode != IMPORT_DATA_PARQUET)
{
if (fHandle)
{
// If reading from stdin, we don't delete the buffer out from under
// the file handle, because stdin is still open. This will cause a
// memory leak, but when using stdin, we can only read in 1 table.
// So it's not like we will be leaking multiple buffers for several
// tables over the life of the job.
if (!fReadFromStdin)
{
fclose(fHandle);
delete[] fFileBuffer;
}
fHandle = 0;
}
else if (ms3)
{
ms3_free((uint8_t*)fFileBuffer);
}
}
else
{
fReader.reset();
fParquetReader.reset();
}
}
//------------------------------------------------------------------------------
// "Grabs" the current read buffer for TableInfo so that the read thread that
// is calling this function, can read the next buffer's set of data.
//------------------------------------------------------------------------------
// @bug2099. Temporary hack to diagnose deadlock.
// Added report parm and couts below.
bool TableInfo::isBufferAvailable(bool report)
{
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
Status bufferStatus = fBuffers[fCurrentReadBuffer].getStatusBLB();
if ((bufferStatus == WriteEngine::PARSE_COMPLETE) || (bufferStatus == WriteEngine::NEW))
{
// reset buffer status and column locks while we have
// an fSyncUpdatesTI lock
fBuffers[fCurrentReadBuffer].setStatusBLB(WriteEngine::READ_PROGRESS);
fBuffers[fCurrentReadBuffer].resetColumnLocks();
return true;
}
if (report)
{
ostringstream oss;
string bufferStatusStr;
ColumnInfo::convertStatusToString(bufferStatus, bufferStatusStr);
oss << " Buffer status is " << bufferStatusStr << ". " << endl;
oss << " fCurrentReadBuffer is " << fCurrentReadBuffer << endl;
cout << oss.str();
cout.flush();
}
return false;
}
//------------------------------------------------------------------------------
// Report whether rows were rejected, and if so, then list them out into the
// reject file.
//------------------------------------------------------------------------------
void TableInfo::writeBadRows(const std::vector<std::string>* errorDatRows, bool bCloseFile)
{
size_t errorDatRowsCount = 0;
if (errorDatRows)
errorDatRowsCount = errorDatRows->size();
if (errorDatRowsCount > 0)
{
if (!fRejectDataFile.is_open())
{
ostringstream rejectFileName;
if (fErrorDir.size() > 0)
{
rejectFileName << fErrorDir << basename(getFileName().c_str());
}
else
{
if (fReadFromS3)
{
rejectFileName << basename(getFileName().c_str());
}
else
{
rejectFileName << getFileName();
}
}
rejectFileName << ".Job_" << fJobId << '_' << ::getpid() << BAD_FILE_SUFFIX;
fRejectDataFileName = rejectFileName.str();
fRejectDataFile.open(rejectFileName.str().c_str(), ofstream::out);
if (!fRejectDataFile)
{
ostringstream oss;
oss << "Unable to create file: " << rejectFileName.str() << "; Check permission.";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return;
}
}
for (std::vector<string>::const_iterator iter = errorDatRows->begin(); iter != errorDatRows->end();
++iter)
{
fRejectDataFile << *iter;
}
fRejectDataCnt += errorDatRowsCount;
}
if (bCloseFile)
{
if (fRejectDataFile.is_open())
fRejectDataFile.close();
fRejectDataFile.clear();
if (fRejectDataCnt > 0)
{
ostringstream oss;
std::string rejectFileNameToLog;
// Construct/report complete file name and save in list of files
boost::filesystem::path p(fRejectDataFileName);
if (!p.has_root_path())
{
// We could fail here having fixed size buffer
char cwdPath[4096];
char* buffPtr = &cwdPath[0];
buffPtr = getcwd(cwdPath, sizeof(cwdPath));
boost::filesystem::path rejectFileName2(buffPtr);
rejectFileName2 /= fRejectDataFileName;
fBadFiles.push_back(rejectFileName2.string());
rejectFileNameToLog = rejectFileName2.string();
}
else
{
fBadFiles.push_back(fRejectDataFileName);
rejectFileNameToLog = fRejectDataFileName;
}
oss << "Number of rows with bad data = " << fRejectDataCnt
<< ". Exact rows are listed in file located here: " << fErrorDir;
fLog->logMsg(oss.str(), MSGLVL_INFO1);
fRejectDataCnt = 0;
}
}
}
//------------------------------------------------------------------------------
// Report whether rows were rejected, and if so, then list out the row numbers
// and error reasons into the error file.
//------------------------------------------------------------------------------
void TableInfo::writeErrReason(const std::vector<std::pair<RID, string> >* errorRows, bool bCloseFile)
{
size_t errorRowsCount = 0;
if (errorRows)
errorRowsCount = errorRows->size();
if (errorRowsCount > 0)
{
if (!fRejectErrFile.is_open())
{
ostringstream errFileName;
if (fErrorDir.size() > 0)
{
errFileName << fErrorDir << basename(getFileName().c_str());
}
else
{
if (fReadFromS3)
{
errFileName << basename(getFileName().c_str());
}
else
{
errFileName << getFileName();
}
}
errFileName << ".Job_" << fJobId << '_' << ::getpid() << ERR_FILE_SUFFIX;
fRejectErrFileName = errFileName.str();
fRejectErrFile.open(errFileName.str().c_str(), ofstream::out);
if (!fRejectErrFile)
{
ostringstream oss;
oss << "Unable to create file: " << errFileName.str() << "; Check permission.";
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return;
}
}
for (std::vector<std::pair<RID, std::string> >::const_iterator iter = errorRows->begin();
iter != errorRows->end(); ++iter)
{
fRejectErrFile << "Line number " << iter->first << "; Error: " << iter->second << endl;
}
fRejectErrCnt += errorRowsCount;
}
if (bCloseFile)
{
if (fRejectErrFile.is_open())
fRejectErrFile.close();
fRejectErrFile.clear();
if (fRejectErrCnt > 0)
{
ostringstream oss;
std::string errFileNameToLog;
// Construct/report complete file name and save in list of files
boost::filesystem::path p(fRejectErrFileName);
if (!p.has_root_path())
{
char cwdPath[4096];
char* buffPtr = &cwdPath[0];
buffPtr = getcwd(cwdPath, sizeof(cwdPath));
boost::filesystem::path errFileName2(buffPtr);
errFileName2 /= fRejectErrFileName;
fErrFiles.push_back(errFileName2.string());
errFileNameToLog = errFileName2.string();
}
else
{
fErrFiles.push_back(fRejectErrFileName);
errFileNameToLog = fRejectErrFileName;
}
oss << "Number of rows with errors = " << fRejectDataCnt
<< ". Exact rows are listed in file located here: " << fErrorDir;
fLog->logMsg(oss.str(), MSGLVL_INFO1);
fRejectErrCnt = 0;
}
}
}
//------------------------------------------------------------------------------
// Logs "Bulkload |Job" message along with the specified message text
// (messageText) to the critical log.
//------------------------------------------------------------------------------
void TableInfo::logToDataMods(const string& jobFile, const string& messageText)
{
logging::Message::Args args;
unsigned subsystemId = 19; // writeengine
logging::LoggingID loggingId(subsystemId, 0, fTxnID.id, 0);
logging::MessageLog messageLog(loggingId, LOG_LOCAL1);
logging::Message m(8);
args.add("Bulkload |Job: " + jobFile);
args.add("|" + messageText);
m.format(args);
messageLog.logInfoMessage(m);
}
//------------------------------------------------------------------------------
// Acquires DB table lock for this TableInfo object.
// Function employs retry logic based on the SystemConfig/WaitPeriod.
//------------------------------------------------------------------------------
int TableInfo::acquireTableLock(bool disableTimeOut)
{
// Save DBRoot list at start of job; used to compare at EOJ.
Config::getRootIdList(fOrigDbRootIds);
// If executing distributed (mode1) or central command (mode2) then
// don't worry about table locks. The client front-end will manage locks.
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
{
if (fLog->isDebug(DEBUG_1))
{
ostringstream oss;
oss << "Bypass acquiring table lock in distributed mode, "
"for table"
<< fTableName << "; OID-" << fTableOID;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
return NO_ERROR;
}
const int SLEEP_INTERVAL = 100; // sleep 100 milliseconds between checks
const int NUM_TRIES_PER_SEC = 10; // try 10 times per second
int waitSeconds = Config::getWaitPeriod();
const int NUM_TRIES = NUM_TRIES_PER_SEC * waitSeconds;
std::string tblLockErrMsg;
// Retry loop to lock the db table associated with this TableInfo object
std::string processName;
uint32_t processId;
int32_t sessionId;
int32_t transId;
ostringstream pmModOss;
pmModOss << " (pm" << Config::getLocalModuleID() << ')';
bool timeout = false;
// for (int i=0; i<NUM_TRIES; i++)
int try_count = 0;
while (!timeout)
{
processName = fProcessName;
processName += pmModOss.str();
processId = ::getpid();
sessionId = -1;
transId = -1;
int rc = BRMWrapper::getInstance()->getTableLock(fTableOID, processName, processId, sessionId, transId,
fTableLockID, tblLockErrMsg);
if ((rc == NO_ERROR) && (fTableLockID > 0))
{
fTableLocked = true;
if (fLog->isDebug(DEBUG_1))
{
ostringstream oss;
oss << "Table lock acquired for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
<< fTableLockID;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
return NO_ERROR;
}
else if (fTableLockID == 0)
{
// sleep and then go back and try getting table lock again
sleepMS(SLEEP_INTERVAL);
if (fLog->isDebug(DEBUG_1))
{
ostringstream oss;
oss << "Retrying to acquire table lock for table " << fTableName << "; OID-" << fTableOID;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
}
else
{
ostringstream oss;
oss << "Error in acquiring table lock for table " << fTableName << "; OID-" << fTableOID << "; "
<< tblLockErrMsg;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
// if disableTimeOut is set then no timeout for table lock. Forever wait....
timeout = (disableTimeOut ? false : (++try_count >= NUM_TRIES));
}
ostringstream oss;
oss << "Unable to acquire lock for table " << fTableName << "; OID-" << fTableOID
<< "; table currently locked by process-" << processName << "; pid-" << processId << "; session-"
<< sessionId << "; txn-" << transId;
fLog->logMsg(oss.str(), ERR_TBLLOCK_GET_LOCK_LOCKED, MSGLVL_ERROR);
return ERR_TBLLOCK_GET_LOCK_LOCKED;
}
//------------------------------------------------------------------------------
// Change table lock state (to cleanup)
//------------------------------------------------------------------------------
int TableInfo::changeTableLockState()
{
// If executing distributed (mode1) or central command (mode2) then
// don't worry about table locks. The client front-end will manage locks.
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
{
return NO_ERROR;
}
std::string tblLockErrMsg;
bool bChanged = false;
int rc =
BRMWrapper::getInstance()->changeTableLockState(fTableLockID, BRM::CLEANUP, bChanged, tblLockErrMsg);
if (rc == NO_ERROR)
{
if (fLog->isDebug(DEBUG_1))
{
ostringstream oss;
if (bChanged)
{
oss << "Table lock state changed to CLEANUP for table " << fTableName << "; OID-" << fTableOID
<< "; lockID-" << fTableLockID;
}
else
{
oss << "Table lock state not changed to CLEANUP for table " << fTableName << "; OID-" << fTableOID
<< "; lockID-" << fTableLockID << ". Table lot locked.";
}
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
}
else
{
ostringstream oss;
oss << "Error in changing table state for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
<< fTableLockID << "; " << tblLockErrMsg;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Releases DB table lock assigned to this TableInfo object.
//------------------------------------------------------------------------------
int TableInfo::releaseTableLock()
{
// If executing distributed (mode1) or central command (mode2) then
// don't worry about table locks. The client front-end will manage locks.
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
{
if (fLog->isDebug(DEBUG_1))
{
ostringstream oss;
oss << "Bypass releasing table lock in distributed mode, "
"for table "
<< fTableName << "; OID-" << fTableOID;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
return NO_ERROR;
}
std::string tblLockErrMsg;
bool bReleased = false;
// Unlock the database table
int rc = BRMWrapper::getInstance()->releaseTableLock(fTableLockID, bReleased, tblLockErrMsg);
if (rc == NO_ERROR)
{
fTableLocked = false;
if (fLog->isDebug(DEBUG_1))
{
ostringstream oss;
if (bReleased)
{
oss << "Table lock released for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
<< fTableLockID;
}
else
{
oss << "Table lock not released for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
<< fTableLockID << ". Table not locked.";
}
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
}
else
{
ostringstream oss;
oss << "Error in releasing table lock for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
<< fTableLockID << "; " << tblLockErrMsg;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Delete bulk rollback metadata file.
//------------------------------------------------------------------------------
void TableInfo::deleteMetaDataRollbackFile()
{
// If executing distributed (mode1) or central command (mode2) then
// don't worry about table locks, or deleting meta data files. The
// client front-end will manage these tasks after all imports are finished.
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
{
return;
}
if (!fKeepRbMetaFile)
{
// Treat any error as non-fatal, though we log it.
try
{
fRBMetaWriter.deleteFile();
}
catch (WeException& ex)
{
ostringstream oss;
oss << "Error deleting meta file; " << ex.what();
fLog->logMsg(oss.str(), ex.errorCode(), MSGLVL_ERROR);
}
}
}
//------------------------------------------------------------------------------
// Changes to "existing" DB files must be confirmed on HDFS system.
// This function triggers this action.
//------------------------------------------------------------------------------
// @bug 5572 - Add db file confirmation for HDFS
int TableInfo::confirmDBFileChanges()
{
// Unlike deleteTempDBFileChanges(), note that confirmDBFileChanges()
// executes regardless of the import mode. We go ahead and confirm
// the file changes at the end of a successful cpimport.bin.
if (idbdatafile::IDBPolicy::useHdfs())
{
ostringstream oss;
oss << "Confirming DB file changes for " << fTableName;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
std::string errMsg;
ConfirmHdfsDbFile confirmHdfs;
int rc = confirmHdfs.confirmDbFileListFromMetaFile(fTableOID, errMsg);
if (rc != NO_ERROR)
{
ostringstream ossErrMsg;
ossErrMsg << "Unable to confirm changes to table " << fTableName << "; " << errMsg;
fLog->logMsg(ossErrMsg.str(), rc, MSGLVL_ERROR);
return rc;
}
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Temporary swap files must be deleted on HDFS system.
// This function triggers this action.
//------------------------------------------------------------------------------
// @bug 5572 - Add db file confirmation for HDFS
void TableInfo::deleteTempDBFileChanges()
{
// If executing distributed (mode1) or central command (mode2) then
// no action necessary. The client front-end will initiate the deletion
// of the temp files, only after all the distributed imports have
// successfully completed.
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
{
return;
}
if (idbdatafile::IDBPolicy::useHdfs())
{
ostringstream oss;
oss << "Deleting DB temp swap files for " << fTableName;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
std::string errMsg;
ConfirmHdfsDbFile confirmHdfs;
int rc = confirmHdfs.endDbFileListFromMetaFile(fTableOID, true, errMsg);
// Treat any error as non-fatal, though we log it.
if (rc != NO_ERROR)
{
ostringstream ossErrMsg;
ossErrMsg << "Unable to delete temp swap files for table " << fTableName << "; " << errMsg;
fLog->logMsg(ossErrMsg.str(), rc, MSGLVL_ERROR);
}
}
}
//------------------------------------------------------------------------------
// Validates the correctness of the current HWMs for this table.
// The HWMs for all the 1 byte columns should be identical. Same goes
// for all the 2 byte columns, etc. The 2 byte column HWMs should be
// "roughly" (but not necessarily exactly) twice that of a 1 byte column.
// Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
// jobTable - table/column information to use with validation.
// We use jobTable.colList[] (if provided) instead of data memmber
// fColumns, because this function is called during preprocessing,
// before TableInfo.fColumns has been initialized with data from
// colList.
// segFileInfo - Vector of File objects carrying current DBRoot, partition,
// HWM, etc to be validated for the columns belonging to jobTable.
// stage - Current stage we are validating. "Starting" or "Ending".
//------------------------------------------------------------------------------
int TableInfo::validateColumnHWMs(const JobTable* jobTable, const std::vector<DBRootExtentInfo>& segFileInfo,
const char* stage)
{
int rc = NO_ERROR;
// Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
int byte1First = -1;
int byte2First = -1;
int byte4First = -1;
int byte8First = -1;
int byte16First = -1;
// Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
// 4-byte, and 8-byte columns as well.
for (unsigned k = 0; k < segFileInfo.size(); k++)
{
int k1 = 0;
// Validate HWMs in jobTable if we have it, else use fColumns.
const JobColumn& jobColK = ((jobTable != 0) ? jobTable->colList[k] : fColumns[k].column);
// Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
// Use those as our reference HWM for the respective column widths.
switch (jobColK.width)
{
case 1:
{
if (byte1First == -1)
byte1First = k;
k1 = byte1First;
break;
}
case 2:
{
if (byte2First == -1)
byte2First = k;
k1 = byte2First;
break;
}
case 4:
{
if (byte4First == -1)
byte4First = k;
k1 = byte4First;
break;
}
case 8:
{
if (byte8First == -1)
byte8First = k;
k1 = byte8First;
break;
}
case 16:
{
if (byte16First == -1)
byte16First = k;
k1 = byte16First;
break;
}
default:
{
ostringstream oss;
oss << stage
<< " Unsupported width for"
" OID-"
<< jobColK.mapOid << "; column-" << jobColK.colName << "; width-" << jobColK.width;
fLog->logMsg(oss.str(), ERR_BRM_UNSUPP_WIDTH, MSGLVL_ERROR);
return ERR_BRM_UNSUPP_WIDTH;
}
} // end of switch based on column width.
// Validate HWMs in jobTable if we have it, else use fColumns.
const JobColumn& jobColK1 = ((jobTable != 0) ? jobTable->colList[k1] : fColumns[k1].column);
// std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
// "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
// " <to> col-" << k <<
// "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
// Validate that the HWM for this column (k) matches that of the
// corresponding reference column with the same width.
if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
(segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
(segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
(segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
{
ostringstream oss;
oss << stage
<< " HWMs do not match for"
" OID1-"
<< jobColK1.mapOid << "; column-" << jobColK1.colName << "; DBRoot-" << segFileInfo[k1].fDbRoot
<< "; partition-" << segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment
<< "; hwm-" << segFileInfo[k1].fLocalHwm << "; width-" << jobColK1.width << ':' << std::endl
<< " and OID2-" << jobColK.mapOid << "; column-" << jobColK.colName << "; DBRoot-"
<< segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-"
<< segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << jobColK.width;
fLog->logMsg(oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR);
return ERR_BRM_HWMS_NOT_EQUAL;
}
// HWM DBRoot, partition, and segment number should match for all
// columns; so compare DBRoot, part#, and seg# with first column.
if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
(segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
(segFileInfo[0].fSegment != segFileInfo[k].fSegment))
{
const JobColumn& jobCol0 = ((jobTable != 0) ? jobTable->colList[0] : fColumns[0].column);
ostringstream oss;
oss << stage
<< " HWM DBRoot,Part#, or Seg# do not match for"
" OID1-"
<< jobCol0.mapOid << "; column-" << jobCol0.colName << "; DBRoot-" << segFileInfo[0].fDbRoot
<< "; partition-" << segFileInfo[0].fPartition << "; segment-" << segFileInfo[0].fSegment
<< "; hwm-" << segFileInfo[0].fLocalHwm << "; width-" << jobCol0.width << ':' << std::endl
<< " and OID2-" << jobColK.mapOid << "; column-" << jobColK.colName << "; DBRoot-"
<< segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-"
<< segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << jobColK.width;
fLog->logMsg(oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR);
return ERR_BRM_HWMS_NOT_EQUAL;
}
} // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
// Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
// Without knowing the exact row count, we can't extrapolate the exact HWM
// for the wider column, but we can narrow it down to an expected range.
int refCol = 0;
int colIdx = 0;
// Validate/compare HWMs given a 1-byte column as a starting point
if (byte1First >= 0)
{
refCol = byte1First;
if ((rc = compareHWMs(byte1First, byte2First, 1, 2, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
if ((rc = compareHWMs(byte1First, byte4First, 1, 4, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
if ((rc = compareHWMs(byte1First, byte8First, 1, 8, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
if ((rc = compareHWMs(byte1First, byte16First, 1, 16, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
}
// Validate/compare HWMs given a 2-byte column as a starting point
if (byte2First >= 0)
{
refCol = byte2First;
if ((rc = compareHWMs(byte2First, byte4First, 2, 4, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
if ((rc = compareHWMs(byte2First, byte8First, 2, 8, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
if ((rc = compareHWMs(byte2First, byte16First, 2, 16, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
}
// Validate/compare HWMs given a 4-byte column as a starting point
if (byte4First >= 0)
{
refCol = byte4First;
if ((rc = compareHWMs(byte4First, byte8First, 4, 8, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
if ((rc = compareHWMs(byte4First, byte16First, 4, 16, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
}
if (byte8First >= 0)
{
refCol = byte8First;
if ((rc = compareHWMs(byte8First, byte16First, 8, 16, segFileInfo, colIdx) != NO_ERROR))
{
goto errorCheck;
}
}
// To avoid repeating this message 6 times in the preceding source code, we
// use the "dreaded" goto to branch to this single place for error handling.
errorCheck:
if (rc != NO_ERROR)
{
const JobColumn& jobColRef = ((jobTable != 0) ? jobTable->colList[refCol] : fColumns[refCol].column);
const JobColumn& jobColIdx = ((jobTable != 0) ? jobTable->colList[colIdx] : fColumns[colIdx].column);
ostringstream oss;
oss << stage
<< " HWMs are not in sync for"
" OID1-"
<< jobColRef.mapOid << "; column-" << jobColRef.colName << "; DBRoot-" << segFileInfo[refCol].fDbRoot
<< "; partition-" << segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment
<< "; hwm-" << segFileInfo[refCol].fLocalHwm << "; width-" << jobColRef.width << ':' << std::endl
<< " and OID2-" << jobColIdx.mapOid << "; column-" << jobColIdx.colName << "; DBRoot-"
<< segFileInfo[colIdx].fDbRoot << "; partition-" << segFileInfo[colIdx].fPartition << "; segment-"
<< segFileInfo[colIdx].fSegment << "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-"
<< jobColIdx.width;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
}
return rc;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Initialize the bulk rollback metadata writer for this table.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int TableInfo::initBulkRollbackMetaData()
{
int rc = NO_ERROR;
try
{
fRBMetaWriter.init(fTableOID, fTableName);
}
catch (WeException& ex)
{
fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
rc = ex.errorCode();
}
return rc;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Saves snapshot of extentmap into a bulk rollback meta data file, for
// use in a bulk rollback, if the current cpimport.bin job should fail.
// The source code in RBMetaWriter::saveBulkRollbackMetaData() used to
// reside in this TableInfo function. But much of the source code was
// factored out to create RBMetaWriter::saveBulkRollbackMetaData(), so
// that the function would reside in the shared library for reuse by DML.
// PARAMETERS:
// job - current job
// segFileInfo - Vector of File objects carrying starting DBRoot, partition,
// etc, for each column belonging to tableNo.
// dbRootHWMInfoVecCol - vector of last local HWM info for each DBRoot
// (asssigned to current PM) for each column in "this" table.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int TableInfo::saveBulkRollbackMetaData(Job& job, const std::vector<DBRootExtentInfo>& segFileInfo,
const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoVecCol)
{
int rc = NO_ERROR;
std::vector<Column> cols;
std::vector<OID> dctnryOids;
// Loop through the columns in the specified table
for (size_t i = 0; i < job.jobTableList[fTableId].colList.size(); i++)
{
JobColumn& jobCol = job.jobTableList[fTableId].colList[i];
Column col;
col.colNo = i;
col.colWidth = jobCol.width;
col.colType = jobCol.weType;
col.colDataType = jobCol.dataType;
col.dataFile.oid = jobCol.mapOid;
col.dataFile.fid = jobCol.mapOid;
col.dataFile.hwm = segFileInfo[i].fLocalHwm; // starting HWM
col.dataFile.pFile = 0;
col.dataFile.fPartition = segFileInfo[i].fPartition; // starting Part#
col.dataFile.fSegment = segFileInfo[i].fSegment; // starting seg#
col.dataFile.fDbRoot = segFileInfo[i].fDbRoot; // starting DBRoot
col.compressionType = jobCol.compressionType;
cols.push_back(col);
OID dctnryOid = 0;
if (jobCol.colType == COL_TYPE_DICT)
dctnryOid = jobCol.dctnry.dctnryOid;
dctnryOids.push_back(dctnryOid);
} // end of loop through columns
fRBMetaWriter.setUIDGID(this);
try
{
fRBMetaWriter.saveBulkRollbackMetaData(cols, dctnryOids, dbRootHWMInfoVecCol);
}
catch (WeException& ex)
{
fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
rc = ex.errorCode();
}
return rc;
}
//------------------------------------------------------------------------------
// Synchronize system catalog auto-increment next value with BRM.
// This function is called at the end of normal processing to get the system
// catalog back in line with the latest auto increment next value generated by
// BRM.
//------------------------------------------------------------------------------
int TableInfo::synchronizeAutoInc()
{
for (unsigned i = 0; i < fColumns.size(); ++i)
{
if (fColumns[i].column.autoIncFlag)
{
// TBD: Do we rollback flush cache error for autoinc.
// Not sure we should bail out and rollback on a
// ERR_BLKCACHE_FLUSH_LIST error, but we currently
// rollback for "any" updateNextValue() error
int rc = fColumns[i].finishAutoInc();
if (rc != NO_ERROR)
{
return rc;
}
break; // okay to break; only 1 autoinc column per table
}
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Rollback changes made to "this" table by the current import job, delete the
// meta-data files, and release the table lock. This function only applies to
// mode3 import. Table lock and bulk rollbacks are managed by parent cpimport
// (file splitter) process for mode1 and mode2.
//------------------------------------------------------------------------------
int TableInfo::rollbackWork()
{
// Close any column or store files left open by abnormal termination.
// We want to do this before reopening the files and doing a bulk rollback.
closeOpenDbFiles();
// Abort "local" bulk rollback if a DBRoot from the start of the job, is
// now missing. User should run cleartablelock to execute a rollback on
// this PM "and" the PM where the DBRoot was moved to.
std::vector<uint16_t> dbRootIds;
Config::getRootIdList(dbRootIds);
for (unsigned int j = 0; j < fOrigDbRootIds.size(); j++)
{
bool bFound = false;
for (unsigned int k = 0; k < dbRootIds.size(); k++)
{
if (fOrigDbRootIds[j] == dbRootIds[k])
{
bFound = true;
break;
}
}
if (!bFound)
{
ostringstream oss;
oss << "Mode3 bulk rollback not performed for table " << fTableName << "; DBRoot" << fOrigDbRootIds[j]
<< " moved from this PM during bulk load. "
<< " Run cleartablelock to rollback and release the table lock "
<< "across PMs.";
int rc = ERR_BULK_ROLLBACK_MISS_ROOT;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
// Restore/rollback the DB files if we got far enough to begin processing
// this table.
int rc = NO_ERROR;
if (hasProcessingBegun())
{
BulkRollbackMgr rbMgr(fTableOID, fTableLockID, fTableName, fProcessName, fLog);
rc = rbMgr.rollback(fKeepRbMetaFile);
if (rc != NO_ERROR)
{
ostringstream oss;
oss << "Error rolling back table " << fTableName << "; " << rbMgr.getErrorMsg();
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
// Delete the meta data files after rollback is complete
deleteMetaDataRollbackFile();
// Release the table lock
rc = releaseTableLock();
if (rc != NO_ERROR)
{
ostringstream oss;
oss << "Table lock not cleared for table " << fTableName;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
return rc;
}
//------------------------------------------------------------------------------
// Allocate extent from BRM (through the stripe allocator).
//------------------------------------------------------------------------------
int TableInfo::allocateBRMColumnExtent(OID columnOID, uint16_t dbRoot, uint32_t& partition, uint16_t& segment,
BRM::LBID_t& startLbid, int& allocSize, HWM& hwm, std::string& errMsg)
{
int rc = fExtentStrAlloc.allocateExtent(columnOID, dbRoot, partition, segment, startLbid, allocSize, hwm,
errMsg);
// fExtentStrAlloc.print();
return rc;
}
bool TableInfo::readFromSTDIN()
{
return fReadFromStdin;
}
} // namespace WriteEngine
// end of namespace