mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
2473 lines
80 KiB
C++
2473 lines
80 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*******************************************************************************
|
|
* $Id: we_tableinfo.cpp 4648 2013-05-29 21:42:40Z rdempsey $
|
|
*
|
|
*******************************************************************************/
|
|
/** @file */
|
|
|
|
#include "we_tableinfo.h"
|
|
#include "we_bulkstatus.h"
|
|
#include "we_bulkload.h"
|
|
|
|
#include <sstream>
|
|
#include <sys/time.h>
|
|
#include <ctime>
|
|
#include <unistd.h>
|
|
#include <sys/types.h>
|
|
#include <cstdio>
|
|
#include <cerrno>
|
|
#include <cstring>
|
|
#include <utility>
|
|
// @bug 2099+
|
|
#include <iostream>
|
|
#include <libmarias3/marias3.h>
|
|
#include <string.h>
|
|
using namespace std;
|
|
|
|
// @bug 2099-
|
|
#include <boost/filesystem/path.hpp>
|
|
using namespace boost;
|
|
|
|
#include "we_config.h"
|
|
#include "we_simplesyslog.h"
|
|
#include "we_bulkrollbackmgr.h"
|
|
#include "we_confirmhdfsdbfile.h"
|
|
|
|
#include "querytele.h"
|
|
using namespace querytele;
|
|
|
|
#include "oamcache.h"
|
|
#include "cacheutils.h"
|
|
|
|
#include <arrow/io/api.h>
|
|
#include <parquet/arrow/reader.h>
|
|
#include <parquet/exception.h>
|
|
namespace
|
|
{
|
|
const std::string BAD_FILE_SUFFIX = ".bad"; // Reject data file suffix
|
|
const std::string ERR_FILE_SUFFIX = ".err"; // Job error file suffix
|
|
const std::string BOLD_START = "\033[0;1m";
|
|
const std::string BOLD_STOP = "\033[0;39m";
|
|
} // namespace
|
|
|
|
namespace WriteEngine
|
|
{
|
|
// Helpers
|
|
int TableInfo::compareHWMs(const int smallestColumnId, const int widerColumnId,
|
|
const uint32_t smallerColumnWidth, const uint32_t widerColumnWidth,
|
|
const std::vector<DBRootExtentInfo>& segFileInfo, int& colIdx)
|
|
{
|
|
int rc = NO_ERROR;
|
|
if (widerColumnId < 0)
|
|
{
|
|
return rc;
|
|
}
|
|
uint32_t columnDiffMultiplier = widerColumnWidth / smallerColumnWidth;
|
|
HWM hwmLo = segFileInfo[smallestColumnId].fLocalHwm * columnDiffMultiplier;
|
|
HWM hwmHi = hwmLo + columnDiffMultiplier - 1;
|
|
|
|
if ((segFileInfo[widerColumnId].fLocalHwm < hwmLo) || (segFileInfo[widerColumnId].fLocalHwm > hwmHi))
|
|
{
|
|
colIdx = widerColumnId;
|
|
rc = ERR_BRM_HWMS_OUT_OF_SYNC;
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Puts the current thread to sleep for the specified number of milliseconds.
|
|
// (Ex: used to wait for a Read buffer to become available.)
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::sleepMS(long ms)
|
|
{
|
|
struct timespec rm_ts;
|
|
|
|
rm_ts.tv_sec = ms / 1000;
|
|
rm_ts.tv_nsec = ms % 1000 * 1000000;
|
|
struct timespec abs_ts;
|
|
|
|
do
|
|
{
|
|
abs_ts.tv_sec = rm_ts.tv_sec;
|
|
abs_ts.tv_nsec = rm_ts.tv_nsec;
|
|
} while (nanosleep(&abs_ts, &rm_ts) < 0);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// TableInfo constructor
|
|
//------------------------------------------------------------------------------
|
|
TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processName, OID tableOID,
|
|
const string& tableName, bool bKeepRbMetaFile)
|
|
: fTableId(-1)
|
|
, fBufferSize(0)
|
|
, fFileBufSize(0)
|
|
, fStatusTI(WriteEngine::NEW)
|
|
, fReadBufCount(0)
|
|
, fNumberOfColumns(0)
|
|
, fHandle(NULL)
|
|
, fCurrentReadBuffer(0)
|
|
, fTotalReadRows(0)
|
|
, fTotalErrRows(0)
|
|
, fMaxErrorRows(5)
|
|
, fLastBufferId(-1)
|
|
, fFileBuffer(NULL)
|
|
, fCurrentParseBuffer(0)
|
|
, fNumberOfColsParsed(0)
|
|
, fLocker(-1)
|
|
, fTableName(tableName)
|
|
, fTableOID(tableOID)
|
|
, fJobId(0)
|
|
, fLog(logger)
|
|
, fTxnID(txnID)
|
|
, fRBMetaWriter(processName, logger)
|
|
, fProcessName(processName)
|
|
, fKeepRbMetaFile(bKeepRbMetaFile)
|
|
, fbTruncationAsError(false)
|
|
, fImportDataMode(IMPORT_DATA_TEXT)
|
|
, fTimeZone(dataconvert::systemTimeZoneOffset())
|
|
, fTableLocked(false)
|
|
, fReadFromStdin(false)
|
|
, fReadFromS3(false)
|
|
, fNullStringMode(false)
|
|
, fEnclosedByChar('\0')
|
|
, fEscapeChar('\\')
|
|
, fProcessingBegun(false)
|
|
, fBulkMode(BULK_MODE_LOCAL)
|
|
, fBRMReporter(logger, tableName)
|
|
, fTableLockID(0)
|
|
, fRejectDataCnt(0)
|
|
, fRejectErrCnt(0)
|
|
, fExtentStrAlloc(tableOID, logger)
|
|
, fOamCachePtr(oam::OamCache::makeOamCache())
|
|
, fParquetReader(nullptr)
|
|
, fReader(nullptr)
|
|
{
|
|
fBuffers.clear();
|
|
fColumns.clear();
|
|
fStartTime.tv_sec = 0;
|
|
fStartTime.tv_usec = 0;
|
|
string teleServerHost(config::Config::makeConfig()->getConfig("QueryTele", "Host"));
|
|
|
|
if (!teleServerHost.empty())
|
|
{
|
|
int teleServerPort =
|
|
config::Config::fromText(config::Config::makeConfig()->getConfig("QueryTele", "Port"));
|
|
|
|
if (teleServerPort > 0)
|
|
{
|
|
fQtc.serverParms(QueryTeleServerParms(teleServerHost, teleServerPort));
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// TableInfo destructor
|
|
//------------------------------------------------------------------------------
|
|
TableInfo::~TableInfo()
|
|
{
|
|
fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
|
|
freeProcessingBuffers();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Frees up processing buffer memory. We don't reset fReadBufCount to 0,
|
|
// because BulkLoad::lockColumnForParse() is calling getNumberOfBuffers()
|
|
// and dividing by the return value. So best not to risk returning 0.
|
|
// Once we get far enough to call this freeProcessingBuffers() function,
|
|
// the application code obviously better be completely through accessing
|
|
// fBuffers and fColumns.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::freeProcessingBuffers()
|
|
{
|
|
// fLog->logMsg(
|
|
// string("Releasing TableInfo Buffer for ")+fTableName,
|
|
// MSGLVL_INFO1);
|
|
fBuffers.clear();
|
|
fColumns.clear();
|
|
fNumberOfColumns = 0;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Close any database column or dictionary store files left open for this table.
|
|
// Under "normal" circumstances, there should be no files left open when we
|
|
// reach the end of the job, but in some error cases, the parsing threads may
|
|
// bail out without closing a file. So this function is called as part of
|
|
// EOJ cleanup for any tables that are still holding a table lock.
|
|
//
|
|
// Files will automatically get closed when the program terminates, but when
|
|
// we are preparing for a bulk rollback, we want to explicitly close the files
|
|
// before we "reopen" them and start rolling back the contents of the files.
|
|
//
|
|
// For mode1 and mode2 imports, cpimport.bin does not lock the table or perform
|
|
// a bulk rollback, and closeOpenDbFile() is not called. We instead rely on
|
|
// the program to implicitly close the files.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::closeOpenDbFiles()
|
|
{
|
|
ostringstream oss;
|
|
oss << "Closing DB files for table " << fTableName << ", left open by abnormal termination.";
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
|
|
for (unsigned int k = 0; k < fColumns.size(); k++)
|
|
{
|
|
stringstream oss1;
|
|
oss1 << "Closing DB column file for: " << fColumns[k].column.colName << " (OID-"
|
|
<< fColumns[k].column.mapOid << ")";
|
|
fLog->logMsg(oss1.str(), MSGLVL_INFO2);
|
|
fColumns[k].closeColumnFile(false, true);
|
|
|
|
if (fColumns[k].column.colType == COL_TYPE_DICT)
|
|
{
|
|
stringstream oss2;
|
|
oss2 << "Closing DB store file for: " << fColumns[k].column.colName << " (OID-"
|
|
<< fColumns[k].column.dctnry.dctnryOid << ")";
|
|
fLog->logMsg(oss2.str(), MSGLVL_INFO2);
|
|
fColumns[k].closeDctnryStore(true);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Locks this table for reading to the specified thread (locker) "if" the table
|
|
// has not yet been assigned to a read thread.
|
|
//------------------------------------------------------------------------------
|
|
bool TableInfo::lockForRead(const int& locker)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
|
|
if (fLocker == -1)
|
|
{
|
|
if (fStatusTI == WriteEngine::NEW)
|
|
{
|
|
fLocker = locker;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Loop thru reading the import file(s) assigned to this TableInfo object.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::readTableData()
|
|
{
|
|
RID validTotalRows = 0;
|
|
RID totalRowsPerInputFile = 0;
|
|
int64_t totalRowsParquet = 0; // totalRowsParquet to be used in later function
|
|
// needs int64_t type
|
|
int filesTBProcessed = fLoadFileList.size();
|
|
int fileCounter = 0;
|
|
unsigned long long qtSentAt = 0;
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
{
|
|
if (fHandle == NULL)
|
|
{
|
|
fFileName = fLoadFileList[fileCounter];
|
|
int rc = openTableFile();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Mark the table status as error and exit.
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
fileCounter++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fParquetReader == NULL)
|
|
{
|
|
fFileName = fLoadFileList[fileCounter];
|
|
int rc = openTableFileParquet(totalRowsParquet);
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Mark the table status as error and exit.
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
fileCounter++;
|
|
}
|
|
}
|
|
|
|
timeval readStart;
|
|
gettimeofday(&readStart, NULL);
|
|
ostringstream ossStartMsg;
|
|
ossStartMsg << "Start reading and loading table " << fTableName;
|
|
fLog->logMsg(ossStartMsg.str(), MSGLVL_INFO2);
|
|
fProcessingBegun = true;
|
|
|
|
ImportTeleStats its;
|
|
its.job_uuid = fJobUUID;
|
|
its.import_uuid = QueryTeleClient::genUUID();
|
|
its.msg_type = ImportTeleStats::IT_START;
|
|
its.start_time = QueryTeleClient::timeNowms();
|
|
its.table_list.push_back(fTableName);
|
|
its.rows_so_far.push_back(0);
|
|
its.system_name = fOamCachePtr->getSystemName();
|
|
its.module_name = fOamCachePtr->getModuleName();
|
|
string tn = getTableName();
|
|
its.schema_name = string(tn, 0, tn.find('.'));
|
|
fQtc.postImportTele(its);
|
|
|
|
//
|
|
// LOOP to read all the import data for this table
|
|
//
|
|
while (true)
|
|
{
|
|
// See if JobStatus has been set to terminate by another thread
|
|
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStartTime = readStart;
|
|
fStatusTI = WriteEngine::ERR;
|
|
its.msg_type = ImportTeleStats::IT_TERM;
|
|
its.rows_so_far.pop_back();
|
|
its.rows_so_far.push_back(0);
|
|
fQtc.postImportTele(its);
|
|
throw SecondaryShutdownException(
|
|
"TableInfo::"
|
|
"readTableData(1) responding to job termination");
|
|
}
|
|
|
|
// @bug 3271: Conditionally compile the thread deadlock debug logging
|
|
#ifdef DEADLOCK_DEBUG
|
|
// @bug2099+. Temp hack to diagnose deadlock.
|
|
struct timeval tvStart;
|
|
gettimeofday(&tvStart, 0);
|
|
bool report = false;
|
|
bool reported = false;
|
|
// @bug2099-
|
|
#else
|
|
const bool report = false;
|
|
#endif
|
|
|
|
#ifdef PROFILE
|
|
Stats::startReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
|
|
#endif
|
|
|
|
//
|
|
// LOOP to wait for, and read, the next avail BulkLoadBuffer object
|
|
//
|
|
while (!isBufferAvailable(report))
|
|
{
|
|
// See if JobStatus has been set to terminate by another thread
|
|
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStartTime = readStart;
|
|
fStatusTI = WriteEngine::ERR;
|
|
its.msg_type = ImportTeleStats::IT_TERM;
|
|
its.rows_so_far.pop_back();
|
|
its.rows_so_far.push_back(0);
|
|
fQtc.postImportTele(its);
|
|
throw SecondaryShutdownException(
|
|
"TableInfo::"
|
|
"readTableData(2) responding to job termination");
|
|
}
|
|
|
|
// Sleep and check the condition again.
|
|
sleepMS(1);
|
|
#ifdef DEADLOCK_DEBUG
|
|
|
|
// @bug2099+
|
|
if (report)
|
|
report = false; // report one time.
|
|
|
|
if (!reported)
|
|
{
|
|
struct timeval tvNow;
|
|
gettimeofday(&tvNow, 0);
|
|
|
|
if ((tvNow.tv_sec - tvStart.tv_sec) > 100)
|
|
{
|
|
time_t t = time(0);
|
|
char timeString[50];
|
|
ctime_r(&t, timeString);
|
|
timeString[strlen(timeString) - 1] = '\0';
|
|
ostringstream oss;
|
|
oss << endl
|
|
<< timeString << ": "
|
|
<< "TableInfo::readTableData: " << fTableName << "; Diff is " << (tvNow.tv_sec - tvStart.tv_sec)
|
|
<< endl;
|
|
cout << oss.str();
|
|
cout.flush();
|
|
report = true;
|
|
reported = true;
|
|
}
|
|
}
|
|
|
|
// @bug2099-
|
|
#endif
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopReadEvent(WE_STATS_WAIT_FOR_READ_BUF);
|
|
Stats::startReadEvent(WE_STATS_READ_INTO_BUF);
|
|
#endif
|
|
|
|
int readBufNo = fCurrentReadBuffer;
|
|
int prevReadBuf = (fCurrentReadBuffer - 1);
|
|
|
|
if (prevReadBuf < 0)
|
|
prevReadBuf = fReadBufCount + prevReadBuf;
|
|
|
|
// We keep a running total of read errors; fMaxErrorRows specifies
|
|
// the error limit. Here's where we see how many more errors we
|
|
// still have below the limit, and we pass this to fillFromFile().
|
|
unsigned allowedErrCntThisCall = ((fMaxErrorRows > fTotalErrRows) ? (fMaxErrorRows - fTotalErrRows) : 0);
|
|
|
|
// Fill in the specified buffer.
|
|
// fTotalReadRowsPerInputFile is ongoing total number of rows read,
|
|
// per input file.
|
|
// validTotalRows is ongoing total of valid rows read for all files
|
|
// pertaining to this DB table.
|
|
int readRc;
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
{
|
|
if (fReadFromS3)
|
|
{
|
|
readRc = fBuffers[readBufNo].fillFromMemory(fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength,
|
|
&fS3ParseLength, totalRowsPerInputFile, validTotalRows,
|
|
fColumns, allowedErrCntThisCall);
|
|
}
|
|
else
|
|
{
|
|
readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
|
|
validTotalRows, fColumns, allowedErrCntThisCall);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
readRc = fBuffers[readBufNo].fillFromFileParquet(totalRowsPerInputFile, validTotalRows);
|
|
}
|
|
|
|
if (readRc != NO_ERROR)
|
|
{
|
|
// error occurred.
|
|
// need to exit.
|
|
// mark the table status as error and exit.
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStartTime = readStart;
|
|
fStatusTI = WriteEngine::ERR;
|
|
fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
|
|
}
|
|
closeTableFile();
|
|
|
|
// Error occurred on next row not read, so increment
|
|
// totalRowsPerInputFile row count for the error msg
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "Error reading import file " << fFileName << "; near line " << totalRowsPerInputFile + 1 << "; "
|
|
<< ec.errorString(readRc);
|
|
fLog->logMsg(oss.str(), readRc, MSGLVL_ERROR);
|
|
|
|
its.msg_type = ImportTeleStats::IT_TERM;
|
|
its.rows_so_far.pop_back();
|
|
its.rows_so_far.push_back(0);
|
|
fQtc.postImportTele(its);
|
|
|
|
return readRc;
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopReadEvent(WE_STATS_READ_INTO_BUF);
|
|
#endif
|
|
its.msg_type = ImportTeleStats::IT_PROGRESS;
|
|
its.rows_so_far.pop_back();
|
|
its.rows_so_far.push_back(totalRowsPerInputFile);
|
|
unsigned long long thisRows = static_cast<unsigned long long>(totalRowsPerInputFile);
|
|
thisRows /= 1000000;
|
|
|
|
if (thisRows > qtSentAt)
|
|
{
|
|
fQtc.postImportTele(its);
|
|
qtSentAt = thisRows;
|
|
}
|
|
|
|
// Check if there were any errors in the read data.
|
|
// if yes, copy it to the error list.
|
|
// if the number of errors is greater than the maximum error count
|
|
// mark the table status as error and exit.
|
|
// call the method to copy the errors
|
|
writeErrorList(&fBuffers[readBufNo].getErrorRows(), &fBuffers[readBufNo].getExactErrorRows(), false);
|
|
fBuffers[readBufNo].clearErrRows();
|
|
|
|
if (fTotalErrRows > fMaxErrorRows)
|
|
{
|
|
// flush the reject data file and output the rejected rows
|
|
// flush err file and output the rejected row id and the reason.
|
|
writeErrorList(0, 0, true);
|
|
|
|
// number of errors > maximum allowed. hence return error.
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStartTime = readStart;
|
|
fStatusTI = WriteEngine::ERR;
|
|
fBuffers[readBufNo].setStatusBLB(WriteEngine::ERR);
|
|
}
|
|
closeTableFile();
|
|
ostringstream oss5;
|
|
oss5 << "Actual error row count(" << fTotalErrRows << ") exceeds the max error rows(" << fMaxErrorRows
|
|
<< ") allowed for table " << fTableName;
|
|
fLog->logMsg(oss5.str(), ERR_BULK_MAX_ERR_NUM, MSGLVL_ERROR);
|
|
|
|
// List Err and Bad files to report file (if applicable)
|
|
fBRMReporter.rptMaxErrJob(fBRMRptFileName, fErrFiles, fBadFiles);
|
|
|
|
its.msg_type = ImportTeleStats::IT_TERM;
|
|
its.rows_so_far.pop_back();
|
|
its.rows_so_far.push_back(0);
|
|
fQtc.postImportTele(its);
|
|
|
|
return ERR_BULK_MAX_ERR_NUM;
|
|
}
|
|
|
|
// mark the buffer status as read complete.
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
|
|
#endif
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
#ifdef PROFILE
|
|
Stats::stopReadEvent(WE_STATS_WAIT_TO_COMPLETE_READ);
|
|
Stats::startReadEvent(WE_STATS_COMPLETING_READ);
|
|
#endif
|
|
|
|
fStartTime = readStart;
|
|
fBuffers[readBufNo].setStatusBLB(WriteEngine::READ_COMPLETE);
|
|
|
|
fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
|
|
|
|
// bufferCount++;
|
|
if ((fHandle && feof(fHandle)) || (fReadFromS3 && (fS3ReadLength == fS3ParseLength)) ||
|
|
(totalRowsPerInputFile == (RID)totalRowsParquet))
|
|
{
|
|
timeval readFinished;
|
|
gettimeofday(&readFinished, NULL);
|
|
|
|
closeTableFile();
|
|
|
|
if (fReadFromStdin)
|
|
{
|
|
fLog->logMsg("Finished loading " + fTableName + " from STDIN" + ", Time taken = " +
|
|
Convertor::int2Str((int)(readFinished.tv_sec - readStart.tv_sec)) + " seconds",
|
|
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
|
|
MSGLVL_INFO2);
|
|
}
|
|
else if (fReadFromS3)
|
|
{
|
|
fLog->logMsg("Finished loading " + fTableName + " from S3" + ", Time taken = " +
|
|
Convertor::int2Str((int)(readFinished.tv_sec - readStart.tv_sec)) + " seconds",
|
|
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
|
|
MSGLVL_INFO2);
|
|
}
|
|
else
|
|
{
|
|
fLog->logMsg("Finished reading file " + fFileName + ", Time taken = " +
|
|
Convertor::int2Str((int)(readFinished.tv_sec - readStart.tv_sec)) + " seconds",
|
|
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
|
|
MSGLVL_INFO2);
|
|
}
|
|
|
|
// flush the reject data file and output the rejected rows
|
|
// flush err file and output the rejected row id and the reason.
|
|
writeErrorList(0, 0, true);
|
|
|
|
// If > 1 file for this table, then open next file in the list
|
|
if (fileCounter < filesTBProcessed)
|
|
{
|
|
fFileName = fLoadFileList[fileCounter];
|
|
int rc;
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
{
|
|
rc = openTableFile();
|
|
}
|
|
else
|
|
{
|
|
rc = openTableFileParquet(totalRowsParquet);
|
|
}
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Mark the table status as error and exit.
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
fileCounter++;
|
|
fTotalReadRows += totalRowsPerInputFile;
|
|
totalRowsPerInputFile = 0;
|
|
}
|
|
else // All files read for this table; break out of read loop
|
|
{
|
|
fStatusTI = WriteEngine::READ_COMPLETE;
|
|
fLastBufferId = readBufNo;
|
|
fTotalReadRows += totalRowsPerInputFile;
|
|
break;
|
|
}
|
|
|
|
gettimeofday(&readStart, NULL);
|
|
} // reached EOF
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopReadEvent(WE_STATS_COMPLETING_READ);
|
|
#endif
|
|
} // mark buffer status as read-complete within scope of a mutex
|
|
} // loop to read all data for this table
|
|
|
|
its.msg_type = ImportTeleStats::IT_SUMMARY;
|
|
its.end_time = QueryTeleClient::timeNowms();
|
|
its.rows_so_far.pop_back();
|
|
its.rows_so_far.push_back(fTotalReadRows);
|
|
fQtc.postImportTele(its);
|
|
fQtc.waitForQueues();
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// writeErrorList()
|
|
// errorRows - vector of row numbers and corresponding error messages
|
|
// errorDatRows - vector of bad rows that have been rejected
|
|
//
|
|
// Adds errors pertaining to a specific buffer, to the cumulative list of
|
|
// errors to be reported to the user.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::writeErrorList(const std::vector<std::pair<RID, std::string> >* errorRows,
|
|
const std::vector<std::string>* errorDatRows, bool bCloseFile)
|
|
{
|
|
size_t errorRowsCount = 0;
|
|
size_t errorDatRowsCount = 0;
|
|
|
|
if (errorRows)
|
|
errorRowsCount = errorRows->size();
|
|
|
|
if (errorDatRows)
|
|
errorDatRowsCount = errorDatRows->size();
|
|
|
|
if ((errorRowsCount > 0) || (errorDatRowsCount > 0) || (bCloseFile))
|
|
{
|
|
boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
|
|
|
|
if ((errorRowsCount > 0) || (bCloseFile))
|
|
writeErrReason(errorRows, bCloseFile);
|
|
|
|
if ((errorDatRowsCount > 0) || (bCloseFile))
|
|
writeBadRows(errorDatRows, bCloseFile);
|
|
|
|
fTotalErrRows += errorRowsCount;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parse the specified column (columnId) in the specified buffer (bufferId).
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::parseColumn(const int& columnId, const int& bufferId, double& processingTime)
|
|
{
|
|
// parse the column
|
|
// note the time and update the column's last processing time
|
|
timeval parseStart, parseEnd;
|
|
gettimeofday(&parseStart, NULL);
|
|
|
|
// Will need to check whether the column needs to extend.
|
|
// If size of the file is less than the required size, extend the column
|
|
int rc = fBuffers[bufferId].parse(fColumns[columnId]);
|
|
gettimeofday(&parseEnd, NULL);
|
|
|
|
processingTime = (parseEnd.tv_usec / 1000 + parseEnd.tv_sec * 1000) -
|
|
(parseStart.tv_usec / 1000 + parseStart.tv_sec * 1000);
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Mark the specified column (columnId) in the specified buffer (bufferId) as
|
|
// PARSE_COMPLETE. If this is the last column to be parsed for this buffer,
|
|
// then mark the buffer as PARSE_COMPLETE.
|
|
// If the last buffer for this table has been read (fLastBufferId != -1), then
|
|
// see if all the data for columnId has been parsed for all the buffers, in
|
|
// which case we are finished parsing columnId.
|
|
// If this is the last column to finish parsing for this table, then mark the
|
|
// table status as PARSE_COMPLETE.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::setParseComplete(const int& columnId, const int& bufferId, double processingTime)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
|
|
// Check table status in case race condition results in this function
|
|
// being called after fStatusTI was set to ERR by another thread.
|
|
if (fStatusTI == WriteEngine::ERR)
|
|
return ERR_UNKNOWN;
|
|
|
|
fColumns[columnId].lastProcessingTime = processingTime;
|
|
#ifdef PROFILE
|
|
fColumns[columnId].totalProcessingTime += processingTime;
|
|
#endif
|
|
|
|
// Set buffer status to complete if setColumnStatus indicates that
|
|
// all the columns are complete
|
|
if (fBuffers[bufferId].setColumnStatus(columnId, WriteEngine::PARSE_COMPLETE))
|
|
fBuffers[bufferId].setStatusBLB(WriteEngine::PARSE_COMPLETE);
|
|
|
|
// fLastBufferId != -1 means the Read thread has read the last
|
|
// buffer for this table
|
|
if (fLastBufferId != -1)
|
|
{
|
|
// check if the status of the column in all the fBuffers is parse
|
|
// complete then update the column status as parse complete.
|
|
bool allBuffersDoneForAColumn = true;
|
|
|
|
for (int i = 0; i < fReadBufCount; ++i)
|
|
{
|
|
// check the status of the column in this buffer.
|
|
Status bufferStatus = fBuffers[i].getStatusBLB();
|
|
|
|
if ((bufferStatus == WriteEngine::READ_COMPLETE) || (bufferStatus == WriteEngine::PARSE_COMPLETE))
|
|
{
|
|
if (fBuffers[i].getColumnStatus(columnId) != WriteEngine::PARSE_COMPLETE)
|
|
{
|
|
allBuffersDoneForAColumn = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// allBuffersDoneForAColumn==TRUE means we are finished parsing columnId
|
|
if (allBuffersDoneForAColumn)
|
|
{
|
|
// Accumulate list of HWM dictionary blocks to be flushed from cache
|
|
std::vector<BRM::LBID_t> dictBlksToFlush;
|
|
fColumns[columnId].getDictFlushBlks(dictBlksToFlush);
|
|
|
|
for (unsigned kk = 0; kk < dictBlksToFlush.size(); kk++)
|
|
{
|
|
fDictFlushBlks.push_back(dictBlksToFlush[kk]);
|
|
}
|
|
|
|
int rc = fColumns[columnId].finishParsing();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete completion error; "
|
|
"Failed to load table: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
fNumberOfColsParsed++;
|
|
|
|
//
|
|
// If all columns have been parsed, then finished with this tbl
|
|
//
|
|
if (fNumberOfColsParsed >= fNumberOfColumns)
|
|
{
|
|
// After closing the column and dictionary store files,
|
|
// flush any updated dictionary blocks in PrimProc.
|
|
// We only do this for non-HDFS. For HDFS we don't want
|
|
// to flush till "after" we have "confirmed" all the file
|
|
// changes, which flushes the changes to disk.
|
|
if (!idbdatafile::IDBPolicy::useHdfs())
|
|
{
|
|
if (fDictFlushBlks.size() > 0)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
|
|
#endif
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Dictionary cache flush: ";
|
|
for (uint32_t i = 0; i < fDictFlushBlks.size(); i++)
|
|
{
|
|
oss << fDictFlushBlks[i] << ", ";
|
|
}
|
|
oss << endl;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
cacheutils::flushPrimProcAllverBlocks(fDictFlushBlks);
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_FLUSH_PRIMPROC_BLOCKS);
|
|
#endif
|
|
fDictFlushBlks.clear();
|
|
}
|
|
}
|
|
|
|
// Update auto-increment next value if applicable.
|
|
rc = synchronizeAutoInc();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete: autoInc update error; "
|
|
"Failed to load table: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
//..Validate that all the HWM's are consistent and in-sync
|
|
std::vector<DBRootExtentInfo> segFileInfo;
|
|
|
|
for (unsigned i = 0; i < fColumns.size(); ++i)
|
|
{
|
|
DBRootExtentInfo extentInfo;
|
|
fColumns[i].getSegFileInfo(extentInfo);
|
|
segFileInfo.push_back(extentInfo);
|
|
}
|
|
|
|
rc = validateColumnHWMs(0, segFileInfo, "Ending");
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete: HWM validation error; "
|
|
"Failed to load table: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
|
|
ostringstream oss2;
|
|
oss2 << "Ending HWMs for table " << fTableName << ": ";
|
|
|
|
for (unsigned int n = 0; n < fColumns.size(); n++)
|
|
{
|
|
oss2 << std::endl;
|
|
oss2 << " " << fColumns[n].column.colName << "; DBRoot/part/seg/hwm: " << segFileInfo[n].fDbRoot
|
|
<< "/" << segFileInfo[n].fPartition << "/" << segFileInfo[n].fSegment << "/"
|
|
<< segFileInfo[n].fLocalHwm;
|
|
}
|
|
|
|
fLog->logMsg(oss2.str(), MSGLVL_INFO1);
|
|
|
|
return rc;
|
|
}
|
|
|
|
//..Confirm changes to DB files (necessary for HDFS)
|
|
rc = confirmDBFileChanges();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete: Error confirming DB changes; "
|
|
"Failed to load table: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
//..Update BRM with HWM and Casual Partition info, etc.
|
|
rc = finishBRM();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete: BRM error; "
|
|
"Failed to load table: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
// Change table lock state to CLEANUP
|
|
rc = changeTableLockState();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete: table lock state change error; "
|
|
"Table load completed: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
// Finished with this table, so delete bulk rollback
|
|
// meta data file and release the table lock.
|
|
deleteTempDBFileChanges();
|
|
deleteMetaDataRollbackFile();
|
|
|
|
rc = releaseTableLock();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "setParseComplete: table lock release error; "
|
|
"Failed to load table: "
|
|
<< fTableName << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
fStatusTI = WriteEngine::ERR;
|
|
return rc;
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
|
|
// Loop through columns again to print out the elapsed
|
|
// parse times
|
|
for (unsigned i = 0; i < fColumns.size(); ++i)
|
|
{
|
|
ostringstream ossColTime;
|
|
ossColTime << "Column " << i << "; OID-" << fColumns[i].column.mapOid << "; parseTime-"
|
|
<< (fColumns[i].totalProcessingTime / 1000.0) << " seconds";
|
|
fLog->logMsg(ossColTime.str(), MSGLVL_INFO1);
|
|
}
|
|
|
|
#endif
|
|
|
|
timeval endTime;
|
|
gettimeofday(&endTime, 0);
|
|
double elapsedTime = (endTime.tv_sec + (endTime.tv_usec / 1000000.0)) -
|
|
(fStartTime.tv_sec + (fStartTime.tv_usec / 1000000.0));
|
|
|
|
fStatusTI = WriteEngine::PARSE_COMPLETE;
|
|
reportTotals(elapsedTime);
|
|
|
|
// Reduce memory use by allocating and releasing as needed
|
|
freeProcessingBuffers();
|
|
|
|
} // end of if (fNumberOfColsParsed >= fNumberOfColumns)
|
|
} // end of if (allBuffersDoneForAColumn)
|
|
} // end of if (fLastBufferId != -1)
|
|
|
|
// If we finished parsing the buffer associated with currentParseBuffer,
|
|
// but have not finshed the entire table, then advance currentParseBuffer.
|
|
if ((fStatusTI != WriteEngine::PARSE_COMPLETE) &&
|
|
(fBuffers[bufferId].getStatusBLB() == WriteEngine::PARSE_COMPLETE))
|
|
{
|
|
// Find the BulkLoadBuffer object that is next in line to be parsed
|
|
// and assign fCurrentParseBuffer accordingly. Break out of the
|
|
// loop if we wrap all the way around and catch up with the current-
|
|
// Read buffer.
|
|
if (bufferId == fCurrentParseBuffer)
|
|
{
|
|
int currentParseBuffer = fCurrentParseBuffer;
|
|
|
|
while (fBuffers[currentParseBuffer].getStatusBLB() == WriteEngine::PARSE_COMPLETE)
|
|
{
|
|
currentParseBuffer = (currentParseBuffer + 1) % fReadBufCount;
|
|
fCurrentParseBuffer = currentParseBuffer;
|
|
|
|
if (fCurrentParseBuffer == fCurrentReadBuffer)
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Report summary totals to applicable destination (stdout, cpimport.bin log
|
|
// file, BRMReport file (for mode1) etc).
|
|
// elapsedTime is number of seconds taken to import this table.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::reportTotals(double elapsedTime)
|
|
{
|
|
ostringstream oss1;
|
|
oss1 << "For table " << fTableName << ": " << fTotalReadRows << " rows processed and "
|
|
<< (fTotalReadRows - fTotalErrRows) << " rows inserted.";
|
|
|
|
fLog->logMsg(oss1.str(), MSGLVL_INFO1);
|
|
|
|
ostringstream oss2;
|
|
oss2 << "For table " << fTableName << ": "
|
|
<< "Elapsed time to load this table: " << elapsedTime << " secs";
|
|
|
|
fLog->logMsg(oss2.str(), MSGLVL_INFO2);
|
|
|
|
// @bug 3504: Loop through columns to print saturation counts
|
|
std::vector<boost::tuple<execplan::CalpontSystemCatalog::ColDataType, uint64_t, uint64_t> > satCounts;
|
|
|
|
for (unsigned i = 0; i < fColumns.size(); ++i)
|
|
{
|
|
// std::string colName(fTableName);
|
|
// colName += '.';
|
|
// colName += fColumns[i].column.colName;
|
|
long long satCount = fColumns[i].saturatedCnt();
|
|
|
|
satCounts.push_back(boost::make_tuple(fColumns[i].column.dataType, fColumns[i].column.mapOid, satCount));
|
|
|
|
if (satCount > 0)
|
|
{
|
|
// @bug 3375: report invalid dates/times set to null
|
|
ostringstream ossSatCnt;
|
|
ossSatCnt << "Column " << fTableName << '.' << fColumns[i].column.colName << "; Number of ";
|
|
|
|
if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::DATE)
|
|
{
|
|
ossSatCnt << "invalid dates replaced with zero value : ";
|
|
}
|
|
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::DATETIME)
|
|
{
|
|
// bug5383
|
|
ossSatCnt << "invalid date/times replaced with zero value : ";
|
|
}
|
|
|
|
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIMESTAMP)
|
|
{
|
|
ossSatCnt << "invalid timestamps replaced with zero value : ";
|
|
}
|
|
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::TIME)
|
|
{
|
|
ossSatCnt << "invalid times replaced with zero value : ";
|
|
}
|
|
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::CHAR)
|
|
ossSatCnt << "character strings truncated: ";
|
|
else if (fColumns[i].column.dataType == execplan::CalpontSystemCatalog::VARCHAR)
|
|
ossSatCnt << "character strings truncated: ";
|
|
else
|
|
ossSatCnt << "rows inserted with saturated values: ";
|
|
|
|
ossSatCnt << satCount;
|
|
fLog->logMsg(ossSatCnt.str(), MSGLVL_WARNING);
|
|
}
|
|
}
|
|
|
|
logging::Message::Args tblFinishedMsgArgs;
|
|
tblFinishedMsgArgs.add(fJobId);
|
|
tblFinishedMsgArgs.add(fTableName);
|
|
tblFinishedMsgArgs.add((fTotalReadRows - fTotalErrRows));
|
|
SimpleSysLog::instance()->logMsg(tblFinishedMsgArgs, logging::LOG_TYPE_INFO, logging::M0083);
|
|
|
|
// Bug1375 - cpimport.bin did not add entries to the transaction
|
|
// log file: data_mods.log
|
|
if ((fTotalReadRows - fTotalErrRows) > 0)
|
|
logToDataMods(fjobFileName, oss1.str());
|
|
|
|
// Log totals in report file if applicable
|
|
fBRMReporter.reportTotals(fTotalReadRows, (fTotalReadRows - fTotalErrRows), satCounts);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Report BRM updates to a report file or to BRM directly.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::finishBRM()
|
|
{
|
|
// Collect the CP and HWM information for all the columns
|
|
for (unsigned i = 0; i < fColumns.size(); ++i)
|
|
{
|
|
fColumns[i].getBRMUpdateInfo(fBRMReporter);
|
|
}
|
|
|
|
// We use mutex not to synchronize contention among parallel threads,
|
|
// because we should be the only thread accessing the fErrFiles and
|
|
// fBadFiles at this point. But we do use the mutex as a memory barrier
|
|
// to make sure we have the latest copy of the data.
|
|
std::vector<std::string>* errFiles = 0;
|
|
std::vector<std::string>* badFiles = 0;
|
|
{
|
|
boost::mutex::scoped_lock lock(fErrorRptInfoMutex);
|
|
errFiles = &fErrFiles;
|
|
badFiles = &fBadFiles;
|
|
}
|
|
|
|
// Save the info just collected, to a report file or send to BRM
|
|
int rc = fBRMReporter.sendBRMInfo(fBRMRptFileName, *errFiles, *badFiles);
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Update status of table to reflect an error.
|
|
// No need to update the buffer or column status, because we are not going to
|
|
// continue the job anyway. Other threads should terminate when they see that
|
|
// the JobStatus has been set to EXIT_FAILURE and/or the table status has been
|
|
// set to WriteEngine::ERR.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::setParseError()
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
fStatusTI = WriteEngine::ERR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Locks a column from the specified buffer (bufferId) for the specified parse
|
|
// thread (id); and returns the column id. A return value of -1 means no
|
|
// column could be locked for parsing.
|
|
//------------------------------------------------------------------------------
|
|
// @bug2099. Temporary hack to diagnose deadlock.
|
|
// Added report parm and couts below.
|
|
int TableInfo::getColumnForParse(const int& id, const int& bufferId, bool report)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
double maxTime = 0;
|
|
int columnId = -1;
|
|
|
|
while (true)
|
|
{
|
|
// See if JobStatus has been set to terminate by another thread
|
|
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
|
|
{
|
|
fStatusTI = WriteEngine::ERR;
|
|
throw SecondaryShutdownException(
|
|
"TableInfo::"
|
|
"getColumnForParse() responding to job termination");
|
|
}
|
|
|
|
if (!bufferReadyForParse(bufferId, report))
|
|
return -1;
|
|
|
|
// @bug2099+
|
|
ostringstream oss;
|
|
|
|
if (report)
|
|
{
|
|
oss << " ----- " << pthread_self() << ":fBuffers[" << bufferId << "]: (colLocker,status,lasttime)- ";
|
|
}
|
|
|
|
// @bug2099-
|
|
|
|
for (unsigned k = 0; k < fNumberOfColumns; ++k)
|
|
{
|
|
// @bug2099+
|
|
if (report)
|
|
{
|
|
Status colStatus = fBuffers[bufferId].getColumnStatus(k);
|
|
int colLocker = fBuffers[bufferId].getColumnLocker(k);
|
|
|
|
string colStatusStr;
|
|
ColumnInfo::convertStatusToString(colStatus, colStatusStr);
|
|
|
|
oss << '(' << colLocker << ',' << colStatusStr << ',' << fColumns[k].lastProcessingTime << ") ";
|
|
}
|
|
|
|
// @bug2099-
|
|
|
|
if (fBuffers[bufferId].getColumnLocker(k) == -1)
|
|
{
|
|
if (columnId == -1)
|
|
columnId = k;
|
|
else if (fColumns[k].lastProcessingTime == 0)
|
|
{
|
|
if (fColumns[k].column.width >= fColumns[columnId].column.width)
|
|
columnId = k;
|
|
}
|
|
else if (fColumns[k].lastProcessingTime > maxTime)
|
|
{
|
|
maxTime = fColumns[k].lastProcessingTime;
|
|
columnId = k;
|
|
}
|
|
}
|
|
}
|
|
|
|
// @bug2099+
|
|
if (report)
|
|
{
|
|
oss << "; selected colId: " << columnId;
|
|
|
|
if (columnId != -1)
|
|
oss << "; maxTime: " << maxTime;
|
|
|
|
oss << endl;
|
|
|
|
if (!BulkLoad::disableConsoleOutput())
|
|
{
|
|
cout << oss.str();
|
|
cout.flush();
|
|
}
|
|
}
|
|
|
|
// @bug2099-
|
|
|
|
if (columnId == -1)
|
|
return -1;
|
|
|
|
if (fBuffers[bufferId].tryAndLockColumn(columnId, id))
|
|
{
|
|
return columnId;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Check if the specified buffer is ready for parsing (status == READ_COMPLETE)
|
|
// @bug 2099. Temporary hack to diagnose deadlock. Added report parm
|
|
// and couts below.
|
|
//------------------------------------------------------------------------------
|
|
bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
|
|
{
|
|
if (fBuffers.size() == 0)
|
|
return false;
|
|
|
|
Status stat = fBuffers[bufferId].getStatusBLB();
|
|
|
|
if (report)
|
|
{
|
|
ostringstream oss;
|
|
string bufStatusStr;
|
|
ColumnInfo::convertStatusToString(stat, bufStatusStr);
|
|
oss << " --- " << pthread_self() << ":fBuffers[" << bufferId << "]=" << bufStatusStr << " (" << stat
|
|
<< ")" << std::endl;
|
|
cout << oss.str();
|
|
}
|
|
|
|
return (stat == WriteEngine::READ_COMPLETE) ? true : false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Create the specified number (noOfBuffer) of BulkLoadBuffer objects and store
|
|
// them in fBuffers. jobFieldRefList lists the fields in this import.
|
|
// fixedBinaryRecLen is fixed record length for binary imports (it is n/a
|
|
// for text bulk loads).
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList,
|
|
unsigned int fixedBinaryRecLen)
|
|
{
|
|
fReadBufCount = noOfBuffers;
|
|
|
|
// initialize and populate the buffer vector.
|
|
for (int i = 0; i < fReadBufCount; ++i)
|
|
{
|
|
BulkLoadBuffer* buffer =
|
|
new BulkLoadBuffer(fNumberOfColumns, fBufferSize, fLog, i, fTableName, jobFieldRefList);
|
|
buffer->setColDelimiter(fColDelim);
|
|
buffer->setNullStringMode(fNullStringMode);
|
|
buffer->setEnclosedByChar(fEnclosedByChar);
|
|
buffer->setEscapeChar(fEscapeChar);
|
|
buffer->setTruncationAsError(getTruncationAsError());
|
|
buffer->setImportDataMode(fImportDataMode, fixedBinaryRecLen);
|
|
buffer->setTimeZone(fTimeZone);
|
|
fBuffers.push_back(buffer);
|
|
}
|
|
if (!fS3Key.empty())
|
|
{
|
|
ms3_library_init();
|
|
ms3 = ms3_init(fS3Key.c_str(), fS3Secret.c_str(), fS3Region.c_str(), fS3Host.c_str());
|
|
if (!ms3)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error initiating S3 library";
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Add the specified ColumnInfo object (info) into this table's fColumns vector.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::addColumn(ColumnInfo* info)
|
|
{
|
|
fColumns.push_back(info);
|
|
fNumberOfColumns = fColumns.size();
|
|
|
|
fExtentStrAlloc.addColumn(info->column.mapOid, info->column.width, info->column.dataType);
|
|
}
|
|
|
|
int TableInfo::openTableFileParquet(int64_t& totalRowsParquet)
|
|
{
|
|
if (fParquetReader != NULL)
|
|
return NO_ERROR;
|
|
std::shared_ptr<arrow::io::ReadableFile> infile;
|
|
try
|
|
{
|
|
PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(fFileName, arrow::default_memory_pool()));
|
|
PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &fReader));
|
|
fReader->set_batch_size(1000);
|
|
PARQUET_THROW_NOT_OK(fReader->ScanContents({0}, 1000, &totalRowsParquet));
|
|
PARQUET_THROW_NOT_OK(fReader->GetRecordBatchReader(&fParquetReader));
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error opening import file " << fFileName << ".";
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
catch (...)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error opening import file " << fFileName << ".";
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
// initialize fBuffers batch source
|
|
for (auto& buffer : fBuffers)
|
|
{
|
|
buffer.setParquetReader(fParquetReader);
|
|
}
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Open the file corresponding to fFileName so that we can import it's contents.
|
|
// A buffer is also allocated and passed to setvbuf().
|
|
// If fReadFromStdin is true, we just assign stdin to our fHandle for reading.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::openTableFile()
|
|
{
|
|
if (fHandle != NULL)
|
|
return NO_ERROR;
|
|
|
|
if (fReadFromStdin)
|
|
{
|
|
fHandle = stdin;
|
|
|
|
// Not 100% sure that calling setvbuf on stdin does much, but in
|
|
// some tests, it made a slight difference.
|
|
fFileBuffer = new char[fFileBufSize];
|
|
setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
|
|
ostringstream oss;
|
|
oss << BOLD_START << "Reading input from STDIN to import into table " << fTableName << "..." << BOLD_STOP;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
else if (fReadFromS3)
|
|
{
|
|
int res;
|
|
res = ms3_get(ms3, fS3Bucket.c_str(), fFileName.c_str(), (uint8_t**)&fFileBuffer, &fS3ReadLength);
|
|
fS3ParseLength = 0;
|
|
if (res)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error retrieving file " << fFileName << " from S3: ";
|
|
if (ms3_server_error(ms3))
|
|
{
|
|
oss << ms3_server_error(ms3);
|
|
}
|
|
else
|
|
{
|
|
oss << ms3_error(res);
|
|
}
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode == IMPORT_DATA_TEXT)
|
|
fHandle = fopen(fFileName.c_str(), "r");
|
|
else
|
|
fHandle = fopen(fFileName.c_str(), "rb");
|
|
|
|
if (fHandle == NULL)
|
|
{
|
|
int errnum = errno;
|
|
ostringstream oss;
|
|
oss << "Error opening import file " << fFileName << ". " << strerror(errnum);
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
|
|
// return an error; caller should set fStatusTI if needed
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
// now the input load file is available for reading the data.
|
|
// read the data from the load file into the buffers.
|
|
fFileBuffer = new char[fFileBufSize];
|
|
setvbuf(fHandle, fFileBuffer, _IOFBF, fFileBufSize);
|
|
|
|
ostringstream oss;
|
|
oss << "Opening " << fFileName << " to import into table " << fTableName;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Close the current open file we have been importing.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::closeTableFile()
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
{
|
|
if (fHandle)
|
|
{
|
|
// If reading from stdin, we don't delete the buffer out from under
|
|
// the file handle, because stdin is still open. This will cause a
|
|
// memory leak, but when using stdin, we can only read in 1 table.
|
|
// So it's not like we will be leaking multiple buffers for several
|
|
// tables over the life of the job.
|
|
if (!fReadFromStdin)
|
|
{
|
|
fclose(fHandle);
|
|
delete[] fFileBuffer;
|
|
}
|
|
|
|
fHandle = 0;
|
|
}
|
|
else if (ms3)
|
|
{
|
|
ms3_free((uint8_t*)fFileBuffer);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
fReader.reset();
|
|
fParquetReader.reset();
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// "Grabs" the current read buffer for TableInfo so that the read thread that
|
|
// is calling this function, can read the next buffer's set of data.
|
|
//------------------------------------------------------------------------------
|
|
// @bug2099. Temporary hack to diagnose deadlock.
|
|
// Added report parm and couts below.
|
|
bool TableInfo::isBufferAvailable(bool report)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
|
Status bufferStatus = fBuffers[fCurrentReadBuffer].getStatusBLB();
|
|
|
|
if ((bufferStatus == WriteEngine::PARSE_COMPLETE) || (bufferStatus == WriteEngine::NEW))
|
|
{
|
|
// reset buffer status and column locks while we have
|
|
// an fSyncUpdatesTI lock
|
|
fBuffers[fCurrentReadBuffer].setStatusBLB(WriteEngine::READ_PROGRESS);
|
|
fBuffers[fCurrentReadBuffer].resetColumnLocks();
|
|
return true;
|
|
}
|
|
|
|
if (report)
|
|
{
|
|
ostringstream oss;
|
|
string bufferStatusStr;
|
|
ColumnInfo::convertStatusToString(bufferStatus, bufferStatusStr);
|
|
oss << " Buffer status is " << bufferStatusStr << ". " << endl;
|
|
oss << " fCurrentReadBuffer is " << fCurrentReadBuffer << endl;
|
|
cout << oss.str();
|
|
cout.flush();
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Report whether rows were rejected, and if so, then list them out into the
|
|
// reject file.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::writeBadRows(const std::vector<std::string>* errorDatRows, bool bCloseFile)
|
|
{
|
|
size_t errorDatRowsCount = 0;
|
|
|
|
if (errorDatRows)
|
|
errorDatRowsCount = errorDatRows->size();
|
|
|
|
if (errorDatRowsCount > 0)
|
|
{
|
|
if (!fRejectDataFile.is_open())
|
|
{
|
|
ostringstream rejectFileName;
|
|
|
|
if (fErrorDir.size() > 0)
|
|
{
|
|
rejectFileName << fErrorDir << basename(getFileName().c_str());
|
|
}
|
|
else
|
|
{
|
|
if (fReadFromS3)
|
|
{
|
|
rejectFileName << basename(getFileName().c_str());
|
|
}
|
|
else
|
|
{
|
|
rejectFileName << getFileName();
|
|
}
|
|
}
|
|
|
|
rejectFileName << ".Job_" << fJobId << '_' << ::getpid() << BAD_FILE_SUFFIX;
|
|
fRejectDataFileName = rejectFileName.str();
|
|
fRejectDataFile.open(rejectFileName.str().c_str(), ofstream::out);
|
|
|
|
if (!fRejectDataFile)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to create file: " << rejectFileName.str() << "; Check permission.";
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
for (std::vector<string>::const_iterator iter = errorDatRows->begin(); iter != errorDatRows->end();
|
|
++iter)
|
|
{
|
|
fRejectDataFile << *iter;
|
|
}
|
|
|
|
fRejectDataCnt += errorDatRowsCount;
|
|
}
|
|
|
|
if (bCloseFile)
|
|
{
|
|
if (fRejectDataFile.is_open())
|
|
fRejectDataFile.close();
|
|
|
|
fRejectDataFile.clear();
|
|
|
|
if (fRejectDataCnt > 0)
|
|
{
|
|
ostringstream oss;
|
|
std::string rejectFileNameToLog;
|
|
|
|
// Construct/report complete file name and save in list of files
|
|
boost::filesystem::path p(fRejectDataFileName);
|
|
|
|
if (!p.has_root_path())
|
|
{
|
|
// We could fail here having fixed size buffer
|
|
char cwdPath[4096];
|
|
char* buffPtr = &cwdPath[0];
|
|
buffPtr = getcwd(cwdPath, sizeof(cwdPath));
|
|
boost::filesystem::path rejectFileName2(buffPtr);
|
|
rejectFileName2 /= fRejectDataFileName;
|
|
fBadFiles.push_back(rejectFileName2.string());
|
|
|
|
rejectFileNameToLog = rejectFileName2.string();
|
|
}
|
|
else
|
|
{
|
|
fBadFiles.push_back(fRejectDataFileName);
|
|
|
|
rejectFileNameToLog = fRejectDataFileName;
|
|
}
|
|
|
|
oss << "Number of rows with bad data = " << fRejectDataCnt
|
|
<< ". Exact rows are listed in file located here: " << fErrorDir;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO1);
|
|
|
|
fRejectDataCnt = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Report whether rows were rejected, and if so, then list out the row numbers
|
|
// and error reasons into the error file.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::writeErrReason(const std::vector<std::pair<RID, string> >* errorRows, bool bCloseFile)
|
|
{
|
|
size_t errorRowsCount = 0;
|
|
|
|
if (errorRows)
|
|
errorRowsCount = errorRows->size();
|
|
|
|
if (errorRowsCount > 0)
|
|
{
|
|
if (!fRejectErrFile.is_open())
|
|
{
|
|
ostringstream errFileName;
|
|
|
|
if (fErrorDir.size() > 0)
|
|
{
|
|
errFileName << fErrorDir << basename(getFileName().c_str());
|
|
}
|
|
else
|
|
{
|
|
if (fReadFromS3)
|
|
{
|
|
errFileName << basename(getFileName().c_str());
|
|
}
|
|
else
|
|
{
|
|
errFileName << getFileName();
|
|
}
|
|
}
|
|
|
|
errFileName << ".Job_" << fJobId << '_' << ::getpid() << ERR_FILE_SUFFIX;
|
|
fRejectErrFileName = errFileName.str();
|
|
fRejectErrFile.open(errFileName.str().c_str(), ofstream::out);
|
|
|
|
if (!fRejectErrFile)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to create file: " << errFileName.str() << "; Check permission.";
|
|
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
|
|
|
|
return;
|
|
}
|
|
}
|
|
|
|
for (std::vector<std::pair<RID, std::string> >::const_iterator iter = errorRows->begin();
|
|
iter != errorRows->end(); ++iter)
|
|
{
|
|
fRejectErrFile << "Line number " << iter->first << "; Error: " << iter->second << endl;
|
|
}
|
|
|
|
fRejectErrCnt += errorRowsCount;
|
|
}
|
|
|
|
if (bCloseFile)
|
|
{
|
|
if (fRejectErrFile.is_open())
|
|
fRejectErrFile.close();
|
|
|
|
fRejectErrFile.clear();
|
|
|
|
if (fRejectErrCnt > 0)
|
|
{
|
|
ostringstream oss;
|
|
std::string errFileNameToLog;
|
|
|
|
// Construct/report complete file name and save in list of files
|
|
boost::filesystem::path p(fRejectErrFileName);
|
|
|
|
if (!p.has_root_path())
|
|
{
|
|
char cwdPath[4096];
|
|
char* buffPtr = &cwdPath[0];
|
|
buffPtr = getcwd(cwdPath, sizeof(cwdPath));
|
|
boost::filesystem::path errFileName2(buffPtr);
|
|
errFileName2 /= fRejectErrFileName;
|
|
fErrFiles.push_back(errFileName2.string());
|
|
|
|
errFileNameToLog = errFileName2.string();
|
|
}
|
|
else
|
|
{
|
|
fErrFiles.push_back(fRejectErrFileName);
|
|
|
|
errFileNameToLog = fRejectErrFileName;
|
|
}
|
|
|
|
oss << "Number of rows with errors = " << fRejectDataCnt
|
|
<< ". Exact rows are listed in file located here: " << fErrorDir;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO1);
|
|
|
|
fRejectErrCnt = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Logs "Bulkload |Job" message along with the specified message text
|
|
// (messageText) to the critical log.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::logToDataMods(const string& jobFile, const string& messageText)
|
|
{
|
|
logging::Message::Args args;
|
|
|
|
unsigned subsystemId = 19; // writeengine
|
|
|
|
logging::LoggingID loggingId(subsystemId, 0, fTxnID.id, 0);
|
|
logging::MessageLog messageLog(loggingId, LOG_LOCAL1);
|
|
|
|
logging::Message m(8);
|
|
args.add("Bulkload |Job: " + jobFile);
|
|
args.add("|" + messageText);
|
|
m.format(args);
|
|
messageLog.logInfoMessage(m);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Acquires DB table lock for this TableInfo object.
|
|
// Function employs retry logic based on the SystemConfig/WaitPeriod.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::acquireTableLock(bool disableTimeOut)
|
|
{
|
|
// Save DBRoot list at start of job; used to compare at EOJ.
|
|
Config::getRootIdList(fOrigDbRootIds);
|
|
|
|
// If executing distributed (mode1) or central command (mode2) then
|
|
// don't worry about table locks. The client front-end will manage locks.
|
|
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Bypass acquiring table lock in distributed mode, "
|
|
"for table"
|
|
<< fTableName << "; OID-" << fTableOID;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
const int SLEEP_INTERVAL = 100; // sleep 100 milliseconds between checks
|
|
const int NUM_TRIES_PER_SEC = 10; // try 10 times per second
|
|
|
|
int waitSeconds = Config::getWaitPeriod();
|
|
const int NUM_TRIES = NUM_TRIES_PER_SEC * waitSeconds;
|
|
std::string tblLockErrMsg;
|
|
|
|
// Retry loop to lock the db table associated with this TableInfo object
|
|
std::string processName;
|
|
uint32_t processId;
|
|
int32_t sessionId;
|
|
int32_t transId;
|
|
ostringstream pmModOss;
|
|
pmModOss << " (pm" << Config::getLocalModuleID() << ')';
|
|
bool timeout = false;
|
|
// for (int i=0; i<NUM_TRIES; i++)
|
|
int try_count = 0;
|
|
|
|
while (!timeout)
|
|
{
|
|
processName = fProcessName;
|
|
processName += pmModOss.str();
|
|
processId = ::getpid();
|
|
sessionId = -1;
|
|
transId = -1;
|
|
int rc = BRMWrapper::getInstance()->getTableLock(fTableOID, processName, processId, sessionId, transId,
|
|
fTableLockID, tblLockErrMsg);
|
|
|
|
if ((rc == NO_ERROR) && (fTableLockID > 0))
|
|
{
|
|
fTableLocked = true;
|
|
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Table lock acquired for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
|
|
<< fTableLockID;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
else if (fTableLockID == 0)
|
|
{
|
|
// sleep and then go back and try getting table lock again
|
|
sleepMS(SLEEP_INTERVAL);
|
|
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Retrying to acquire table lock for table " << fTableName << "; OID-" << fTableOID;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error in acquiring table lock for table " << fTableName << "; OID-" << fTableOID << "; "
|
|
<< tblLockErrMsg;
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
|
|
return rc;
|
|
}
|
|
|
|
// if disableTimeOut is set then no timeout for table lock. Forever wait....
|
|
timeout = (disableTimeOut ? false : (++try_count >= NUM_TRIES));
|
|
}
|
|
|
|
ostringstream oss;
|
|
oss << "Unable to acquire lock for table " << fTableName << "; OID-" << fTableOID
|
|
<< "; table currently locked by process-" << processName << "; pid-" << processId << "; session-"
|
|
<< sessionId << "; txn-" << transId;
|
|
fLog->logMsg(oss.str(), ERR_TBLLOCK_GET_LOCK_LOCKED, MSGLVL_ERROR);
|
|
|
|
return ERR_TBLLOCK_GET_LOCK_LOCKED;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Change table lock state (to cleanup)
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::changeTableLockState()
|
|
{
|
|
// If executing distributed (mode1) or central command (mode2) then
|
|
// don't worry about table locks. The client front-end will manage locks.
|
|
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
return NO_ERROR;
|
|
}
|
|
|
|
std::string tblLockErrMsg;
|
|
bool bChanged = false;
|
|
|
|
int rc =
|
|
BRMWrapper::getInstance()->changeTableLockState(fTableLockID, BRM::CLEANUP, bChanged, tblLockErrMsg);
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
|
|
if (bChanged)
|
|
{
|
|
oss << "Table lock state changed to CLEANUP for table " << fTableName << "; OID-" << fTableOID
|
|
<< "; lockID-" << fTableLockID;
|
|
}
|
|
else
|
|
{
|
|
oss << "Table lock state not changed to CLEANUP for table " << fTableName << "; OID-" << fTableOID
|
|
<< "; lockID-" << fTableLockID << ". Table lot locked.";
|
|
}
|
|
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error in changing table state for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
|
|
<< fTableLockID << "; " << tblLockErrMsg;
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Releases DB table lock assigned to this TableInfo object.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::releaseTableLock()
|
|
{
|
|
// If executing distributed (mode1) or central command (mode2) then
|
|
// don't worry about table locks. The client front-end will manage locks.
|
|
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Bypass releasing table lock in distributed mode, "
|
|
"for table "
|
|
<< fTableName << "; OID-" << fTableOID;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
std::string tblLockErrMsg;
|
|
bool bReleased = false;
|
|
|
|
// Unlock the database table
|
|
int rc = BRMWrapper::getInstance()->releaseTableLock(fTableLockID, bReleased, tblLockErrMsg);
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
fTableLocked = false;
|
|
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
|
|
if (bReleased)
|
|
{
|
|
oss << "Table lock released for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
|
|
<< fTableLockID;
|
|
}
|
|
else
|
|
{
|
|
oss << "Table lock not released for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
|
|
<< fTableLockID << ". Table not locked.";
|
|
}
|
|
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error in releasing table lock for table " << fTableName << "; OID-" << fTableOID << "; lockID-"
|
|
<< fTableLockID << "; " << tblLockErrMsg;
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Delete bulk rollback metadata file.
|
|
//------------------------------------------------------------------------------
|
|
void TableInfo::deleteMetaDataRollbackFile()
|
|
{
|
|
// If executing distributed (mode1) or central command (mode2) then
|
|
// don't worry about table locks, or deleting meta data files. The
|
|
// client front-end will manage these tasks after all imports are finished.
|
|
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (!fKeepRbMetaFile)
|
|
{
|
|
// Treat any error as non-fatal, though we log it.
|
|
try
|
|
{
|
|
fRBMetaWriter.deleteFile();
|
|
}
|
|
catch (WeException& ex)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error deleting meta file; " << ex.what();
|
|
fLog->logMsg(oss.str(), ex.errorCode(), MSGLVL_ERROR);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Changes to "existing" DB files must be confirmed on HDFS system.
|
|
// This function triggers this action.
|
|
//------------------------------------------------------------------------------
|
|
// @bug 5572 - Add db file confirmation for HDFS
|
|
int TableInfo::confirmDBFileChanges()
|
|
{
|
|
// Unlike deleteTempDBFileChanges(), note that confirmDBFileChanges()
|
|
// executes regardless of the import mode. We go ahead and confirm
|
|
// the file changes at the end of a successful cpimport.bin.
|
|
if (idbdatafile::IDBPolicy::useHdfs())
|
|
{
|
|
ostringstream oss;
|
|
oss << "Confirming DB file changes for " << fTableName;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
|
|
std::string errMsg;
|
|
ConfirmHdfsDbFile confirmHdfs;
|
|
int rc = confirmHdfs.confirmDbFileListFromMetaFile(fTableOID, errMsg);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << "Unable to confirm changes to table " << fTableName << "; " << errMsg;
|
|
fLog->logMsg(ossErrMsg.str(), rc, MSGLVL_ERROR);
|
|
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Temporary swap files must be deleted on HDFS system.
|
|
// This function triggers this action.
|
|
//------------------------------------------------------------------------------
|
|
// @bug 5572 - Add db file confirmation for HDFS
|
|
void TableInfo::deleteTempDBFileChanges()
|
|
{
|
|
// If executing distributed (mode1) or central command (mode2) then
|
|
// no action necessary. The client front-end will initiate the deletion
|
|
// of the temp files, only after all the distributed imports have
|
|
// successfully completed.
|
|
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (idbdatafile::IDBPolicy::useHdfs())
|
|
{
|
|
ostringstream oss;
|
|
oss << "Deleting DB temp swap files for " << fTableName;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
|
|
std::string errMsg;
|
|
ConfirmHdfsDbFile confirmHdfs;
|
|
int rc = confirmHdfs.endDbFileListFromMetaFile(fTableOID, true, errMsg);
|
|
|
|
// Treat any error as non-fatal, though we log it.
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << "Unable to delete temp swap files for table " << fTableName << "; " << errMsg;
|
|
fLog->logMsg(ossErrMsg.str(), rc, MSGLVL_ERROR);
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Validates the correctness of the current HWMs for this table.
|
|
// The HWMs for all the 1 byte columns should be identical. Same goes
|
|
// for all the 2 byte columns, etc. The 2 byte column HWMs should be
|
|
// "roughly" (but not necessarily exactly) twice that of a 1 byte column.
|
|
// Same goes for the 4 byte column HWMs vs their 2 byte counterparts, etc.
|
|
// jobTable - table/column information to use with validation.
|
|
// We use jobTable.colList[] (if provided) instead of data memmber
|
|
// fColumns, because this function is called during preprocessing,
|
|
// before TableInfo.fColumns has been initialized with data from
|
|
// colList.
|
|
// segFileInfo - Vector of File objects carrying current DBRoot, partition,
|
|
// HWM, etc to be validated for the columns belonging to jobTable.
|
|
// stage - Current stage we are validating. "Starting" or "Ending".
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::validateColumnHWMs(const JobTable* jobTable, const std::vector<DBRootExtentInfo>& segFileInfo,
|
|
const char* stage)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
// Used to track first 1-byte, 2-byte, 4-byte, and 8-byte columns in table
|
|
int byte1First = -1;
|
|
int byte2First = -1;
|
|
int byte4First = -1;
|
|
int byte8First = -1;
|
|
int byte16First = -1;
|
|
|
|
// Make sure the HWMs for all 1-byte columns match; same for all 2-byte,
|
|
// 4-byte, and 8-byte columns as well.
|
|
for (unsigned k = 0; k < segFileInfo.size(); k++)
|
|
{
|
|
int k1 = 0;
|
|
|
|
// Validate HWMs in jobTable if we have it, else use fColumns.
|
|
const JobColumn& jobColK = ((jobTable != 0) ? jobTable->colList[k] : fColumns[k].column);
|
|
|
|
// Find the first 1-byte, 2-byte, 4-byte, and 8-byte columns.
|
|
// Use those as our reference HWM for the respective column widths.
|
|
switch (jobColK.width)
|
|
{
|
|
case 1:
|
|
{
|
|
if (byte1First == -1)
|
|
byte1First = k;
|
|
|
|
k1 = byte1First;
|
|
break;
|
|
}
|
|
|
|
case 2:
|
|
{
|
|
if (byte2First == -1)
|
|
byte2First = k;
|
|
|
|
k1 = byte2First;
|
|
break;
|
|
}
|
|
|
|
case 4:
|
|
{
|
|
if (byte4First == -1)
|
|
byte4First = k;
|
|
|
|
k1 = byte4First;
|
|
break;
|
|
}
|
|
|
|
case 8:
|
|
{
|
|
if (byte8First == -1)
|
|
byte8First = k;
|
|
|
|
k1 = byte8First;
|
|
break;
|
|
}
|
|
case 16:
|
|
{
|
|
if (byte16First == -1)
|
|
byte16First = k;
|
|
|
|
k1 = byte16First;
|
|
break;
|
|
}
|
|
default:
|
|
{
|
|
ostringstream oss;
|
|
oss << stage
|
|
<< " Unsupported width for"
|
|
" OID-"
|
|
<< jobColK.mapOid << "; column-" << jobColK.colName << "; width-" << jobColK.width;
|
|
fLog->logMsg(oss.str(), ERR_BRM_UNSUPP_WIDTH, MSGLVL_ERROR);
|
|
return ERR_BRM_UNSUPP_WIDTH;
|
|
}
|
|
} // end of switch based on column width.
|
|
|
|
// Validate HWMs in jobTable if we have it, else use fColumns.
|
|
const JobColumn& jobColK1 = ((jobTable != 0) ? jobTable->colList[k1] : fColumns[k1].column);
|
|
|
|
// std::cout << "dbg: comparing0 " << stage << " refcol-" << k1 <<
|
|
// "; wid-" << jobColK1.width << "; hwm-" << segFileInfo[k1].fLocalHwm <<
|
|
// " <to> col-" << k <<
|
|
// "; wid-" << jobColK.width << " ; hwm-"<<segFileInfo[k].fLocalHwm<<std::endl;
|
|
|
|
// Validate that the HWM for this column (k) matches that of the
|
|
// corresponding reference column with the same width.
|
|
if ((segFileInfo[k1].fDbRoot != segFileInfo[k].fDbRoot) ||
|
|
(segFileInfo[k1].fPartition != segFileInfo[k].fPartition) ||
|
|
(segFileInfo[k1].fSegment != segFileInfo[k].fSegment) ||
|
|
(segFileInfo[k1].fLocalHwm != segFileInfo[k].fLocalHwm))
|
|
{
|
|
ostringstream oss;
|
|
oss << stage
|
|
<< " HWMs do not match for"
|
|
" OID1-"
|
|
<< jobColK1.mapOid << "; column-" << jobColK1.colName << "; DBRoot-" << segFileInfo[k1].fDbRoot
|
|
<< "; partition-" << segFileInfo[k1].fPartition << "; segment-" << segFileInfo[k1].fSegment
|
|
<< "; hwm-" << segFileInfo[k1].fLocalHwm << "; width-" << jobColK1.width << ':' << std::endl
|
|
<< " and OID2-" << jobColK.mapOid << "; column-" << jobColK.colName << "; DBRoot-"
|
|
<< segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-"
|
|
<< segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << jobColK.width;
|
|
fLog->logMsg(oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR);
|
|
return ERR_BRM_HWMS_NOT_EQUAL;
|
|
}
|
|
|
|
// HWM DBRoot, partition, and segment number should match for all
|
|
// columns; so compare DBRoot, part#, and seg# with first column.
|
|
if ((segFileInfo[0].fDbRoot != segFileInfo[k].fDbRoot) ||
|
|
(segFileInfo[0].fPartition != segFileInfo[k].fPartition) ||
|
|
(segFileInfo[0].fSegment != segFileInfo[k].fSegment))
|
|
{
|
|
const JobColumn& jobCol0 = ((jobTable != 0) ? jobTable->colList[0] : fColumns[0].column);
|
|
|
|
ostringstream oss;
|
|
oss << stage
|
|
<< " HWM DBRoot,Part#, or Seg# do not match for"
|
|
" OID1-"
|
|
<< jobCol0.mapOid << "; column-" << jobCol0.colName << "; DBRoot-" << segFileInfo[0].fDbRoot
|
|
<< "; partition-" << segFileInfo[0].fPartition << "; segment-" << segFileInfo[0].fSegment
|
|
<< "; hwm-" << segFileInfo[0].fLocalHwm << "; width-" << jobCol0.width << ':' << std::endl
|
|
<< " and OID2-" << jobColK.mapOid << "; column-" << jobColK.colName << "; DBRoot-"
|
|
<< segFileInfo[k].fDbRoot << "; partition-" << segFileInfo[k].fPartition << "; segment-"
|
|
<< segFileInfo[k].fSegment << "; hwm-" << segFileInfo[k].fLocalHwm << "; width-" << jobColK.width;
|
|
fLog->logMsg(oss.str(), ERR_BRM_HWMS_NOT_EQUAL, MSGLVL_ERROR);
|
|
return ERR_BRM_HWMS_NOT_EQUAL;
|
|
}
|
|
} // end of loop to compare all 1-byte HWMs, 2-byte HWMs, etc.
|
|
|
|
// Validate/compare HWM for 1-byte column in relation to 2-byte column, etc.
|
|
// Without knowing the exact row count, we can't extrapolate the exact HWM
|
|
// for the wider column, but we can narrow it down to an expected range.
|
|
int refCol = 0;
|
|
int colIdx = 0;
|
|
|
|
// Validate/compare HWMs given a 1-byte column as a starting point
|
|
if (byte1First >= 0)
|
|
{
|
|
refCol = byte1First;
|
|
|
|
if ((rc = compareHWMs(byte1First, byte2First, 1, 2, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
|
|
if ((rc = compareHWMs(byte1First, byte4First, 1, 4, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
|
|
if ((rc = compareHWMs(byte1First, byte8First, 1, 8, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
|
|
if ((rc = compareHWMs(byte1First, byte16First, 1, 16, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
}
|
|
|
|
// Validate/compare HWMs given a 2-byte column as a starting point
|
|
if (byte2First >= 0)
|
|
{
|
|
refCol = byte2First;
|
|
|
|
if ((rc = compareHWMs(byte2First, byte4First, 2, 4, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
|
|
if ((rc = compareHWMs(byte2First, byte8First, 2, 8, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
|
|
if ((rc = compareHWMs(byte2First, byte16First, 2, 16, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
}
|
|
|
|
// Validate/compare HWMs given a 4-byte column as a starting point
|
|
if (byte4First >= 0)
|
|
{
|
|
refCol = byte4First;
|
|
|
|
if ((rc = compareHWMs(byte4First, byte8First, 4, 8, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
|
|
if ((rc = compareHWMs(byte4First, byte16First, 4, 16, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
}
|
|
if (byte8First >= 0)
|
|
{
|
|
refCol = byte8First;
|
|
if ((rc = compareHWMs(byte8First, byte16First, 8, 16, segFileInfo, colIdx) != NO_ERROR))
|
|
{
|
|
goto errorCheck;
|
|
}
|
|
}
|
|
|
|
// To avoid repeating this message 6 times in the preceding source code, we
|
|
// use the "dreaded" goto to branch to this single place for error handling.
|
|
errorCheck:
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
const JobColumn& jobColRef = ((jobTable != 0) ? jobTable->colList[refCol] : fColumns[refCol].column);
|
|
const JobColumn& jobColIdx = ((jobTable != 0) ? jobTable->colList[colIdx] : fColumns[colIdx].column);
|
|
|
|
ostringstream oss;
|
|
oss << stage
|
|
<< " HWMs are not in sync for"
|
|
" OID1-"
|
|
<< jobColRef.mapOid << "; column-" << jobColRef.colName << "; DBRoot-" << segFileInfo[refCol].fDbRoot
|
|
<< "; partition-" << segFileInfo[refCol].fPartition << "; segment-" << segFileInfo[refCol].fSegment
|
|
<< "; hwm-" << segFileInfo[refCol].fLocalHwm << "; width-" << jobColRef.width << ':' << std::endl
|
|
<< " and OID2-" << jobColIdx.mapOid << "; column-" << jobColIdx.colName << "; DBRoot-"
|
|
<< segFileInfo[colIdx].fDbRoot << "; partition-" << segFileInfo[colIdx].fPartition << "; segment-"
|
|
<< segFileInfo[colIdx].fSegment << "; hwm-" << segFileInfo[colIdx].fLocalHwm << "; width-"
|
|
<< jobColIdx.width;
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Initialize the bulk rollback metadata writer for this table.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::initBulkRollbackMetaData()
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
try
|
|
{
|
|
fRBMetaWriter.init(fTableOID, fTableName);
|
|
}
|
|
catch (WeException& ex)
|
|
{
|
|
fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
|
|
rc = ex.errorCode();
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Saves snapshot of extentmap into a bulk rollback meta data file, for
|
|
// use in a bulk rollback, if the current cpimport.bin job should fail.
|
|
// The source code in RBMetaWriter::saveBulkRollbackMetaData() used to
|
|
// reside in this TableInfo function. But much of the source code was
|
|
// factored out to create RBMetaWriter::saveBulkRollbackMetaData(), so
|
|
// that the function would reside in the shared library for reuse by DML.
|
|
// PARAMETERS:
|
|
// job - current job
|
|
// segFileInfo - Vector of File objects carrying starting DBRoot, partition,
|
|
// etc, for each column belonging to tableNo.
|
|
// dbRootHWMInfoVecCol - vector of last local HWM info for each DBRoot
|
|
// (asssigned to current PM) for each column in "this" table.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::saveBulkRollbackMetaData(Job& job, const std::vector<DBRootExtentInfo>& segFileInfo,
|
|
const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoVecCol)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
std::vector<Column> cols;
|
|
std::vector<OID> dctnryOids;
|
|
|
|
// Loop through the columns in the specified table
|
|
for (size_t i = 0; i < job.jobTableList[fTableId].colList.size(); i++)
|
|
{
|
|
JobColumn& jobCol = job.jobTableList[fTableId].colList[i];
|
|
|
|
Column col;
|
|
col.colNo = i;
|
|
col.colWidth = jobCol.width;
|
|
col.colType = jobCol.weType;
|
|
col.colDataType = jobCol.dataType;
|
|
col.dataFile.oid = jobCol.mapOid;
|
|
col.dataFile.fid = jobCol.mapOid;
|
|
col.dataFile.hwm = segFileInfo[i].fLocalHwm; // starting HWM
|
|
col.dataFile.pFile = 0;
|
|
col.dataFile.fPartition = segFileInfo[i].fPartition; // starting Part#
|
|
col.dataFile.fSegment = segFileInfo[i].fSegment; // starting seg#
|
|
col.dataFile.fDbRoot = segFileInfo[i].fDbRoot; // starting DBRoot
|
|
col.compressionType = jobCol.compressionType;
|
|
cols.push_back(col);
|
|
|
|
OID dctnryOid = 0;
|
|
|
|
if (jobCol.colType == COL_TYPE_DICT)
|
|
dctnryOid = jobCol.dctnry.dctnryOid;
|
|
|
|
dctnryOids.push_back(dctnryOid);
|
|
|
|
} // end of loop through columns
|
|
|
|
fRBMetaWriter.setUIDGID(this);
|
|
|
|
try
|
|
{
|
|
fRBMetaWriter.saveBulkRollbackMetaData(cols, dctnryOids, dbRootHWMInfoVecCol);
|
|
}
|
|
catch (WeException& ex)
|
|
{
|
|
fLog->logMsg(ex.what(), ex.errorCode(), MSGLVL_ERROR);
|
|
rc = ex.errorCode();
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Synchronize system catalog auto-increment next value with BRM.
|
|
// This function is called at the end of normal processing to get the system
|
|
// catalog back in line with the latest auto increment next value generated by
|
|
// BRM.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::synchronizeAutoInc()
|
|
{
|
|
for (unsigned i = 0; i < fColumns.size(); ++i)
|
|
{
|
|
if (fColumns[i].column.autoIncFlag)
|
|
{
|
|
// TBD: Do we rollback flush cache error for autoinc.
|
|
// Not sure we should bail out and rollback on a
|
|
// ERR_BLKCACHE_FLUSH_LIST error, but we currently
|
|
// rollback for "any" updateNextValue() error
|
|
int rc = fColumns[i].finishAutoInc();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
|
|
break; // okay to break; only 1 autoinc column per table
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Rollback changes made to "this" table by the current import job, delete the
|
|
// meta-data files, and release the table lock. This function only applies to
|
|
// mode3 import. Table lock and bulk rollbacks are managed by parent cpimport
|
|
// (file splitter) process for mode1 and mode2.
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::rollbackWork()
|
|
{
|
|
// Close any column or store files left open by abnormal termination.
|
|
// We want to do this before reopening the files and doing a bulk rollback.
|
|
closeOpenDbFiles();
|
|
|
|
// Abort "local" bulk rollback if a DBRoot from the start of the job, is
|
|
// now missing. User should run cleartablelock to execute a rollback on
|
|
// this PM "and" the PM where the DBRoot was moved to.
|
|
std::vector<uint16_t> dbRootIds;
|
|
Config::getRootIdList(dbRootIds);
|
|
|
|
for (unsigned int j = 0; j < fOrigDbRootIds.size(); j++)
|
|
{
|
|
bool bFound = false;
|
|
|
|
for (unsigned int k = 0; k < dbRootIds.size(); k++)
|
|
{
|
|
if (fOrigDbRootIds[j] == dbRootIds[k])
|
|
{
|
|
bFound = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (!bFound)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Mode3 bulk rollback not performed for table " << fTableName << "; DBRoot" << fOrigDbRootIds[j]
|
|
<< " moved from this PM during bulk load. "
|
|
<< " Run cleartablelock to rollback and release the table lock "
|
|
<< "across PMs.";
|
|
int rc = ERR_BULK_ROLLBACK_MISS_ROOT;
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// Restore/rollback the DB files if we got far enough to begin processing
|
|
// this table.
|
|
int rc = NO_ERROR;
|
|
|
|
if (hasProcessingBegun())
|
|
{
|
|
BulkRollbackMgr rbMgr(fTableOID, fTableLockID, fTableName, fProcessName, fLog);
|
|
|
|
rc = rbMgr.rollback(fKeepRbMetaFile);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error rolling back table " << fTableName << "; " << rbMgr.getErrorMsg();
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// Delete the meta data files after rollback is complete
|
|
deleteMetaDataRollbackFile();
|
|
|
|
// Release the table lock
|
|
rc = releaseTableLock();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Table lock not cleared for table " << fTableName;
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Allocate extent from BRM (through the stripe allocator).
|
|
//------------------------------------------------------------------------------
|
|
int TableInfo::allocateBRMColumnExtent(OID columnOID, uint16_t dbRoot, uint32_t& partition, uint16_t& segment,
|
|
BRM::LBID_t& startLbid, int& allocSize, HWM& hwm, std::string& errMsg)
|
|
{
|
|
int rc = fExtentStrAlloc.allocateExtent(columnOID, dbRoot, partition, segment, startLbid, allocSize, hwm,
|
|
errMsg);
|
|
// fExtentStrAlloc.print();
|
|
|
|
return rc;
|
|
}
|
|
|
|
bool TableInfo::readFromSTDIN()
|
|
{
|
|
return fReadFromStdin;
|
|
}
|
|
|
|
} // namespace WriteEngine
|
|
// end of namespace
|