1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00
Serguey Zefirov 38fd96a663 fix(memory leaks): MCOL-5791 - get rid of memory leaks in plugin code
There were numerous memory leaks in plugin's code and associated code.
During typical run of MTR tests it leaked around 65 megabytes of
objects. As a result they may severely affect long-lived connections.

This patch fixes (almost) all leaks found in the plugin. The exceptions
are two leaks associated with SHOW CREATE TABLE columnstore_table and
getting information of columns of columnstore-handled table. These
should be fixed on the server side and work is on the way.
2024-12-04 10:59:12 +03:00

1657 lines
56 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*******************************************************************************
* $Id: we_bulkload.cpp 4730 2013-08-08 21:41:13Z chao $
*
*******************************************************************************/
/** @file */
#define WE_BULKLOAD_DLLEXPORT
#include "we_bulkload.h"
#undef WE_BULKLOAD_DLLEXPORT
#include <cmath>
#include <cstdlib>
#include <climits>
#include <glob.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <vector>
#include <sstream>
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/convenience.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <pwd.h>
#include "we_bulkstatus.h"
#include "we_rbmetawriter.h"
#include "we_colopbulk.h"
#include "we_columninfocompressed.h"
#include "we_config.h"
#include "we_dbrootextenttracker.h"
#include "writeengine.h"
#include "sys/time.h"
#include "sys/types.h"
#include "dataconvert.h"
#include "idbcompress.h"
#include "calpontsystemcatalog.h"
#include "we_ddlcommandclient.h"
#include "mcsconfig.h"
using namespace std;
using namespace boost;
using namespace dataconvert;
namespace
{
const std::string IMPORT_PATH_STDIN("STDIN");
const std::string IMPORT_PATH_CWD(".");
const std::string LOG_SUFFIX = ".log"; // Job log file suffix
const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix
} // namespace
// extern WriteEngine::BRMWrapper* brmWrapperPtr;
namespace WriteEngine
{
/* static */ std::vector<std::shared_ptr<TableInfo>> BulkLoad::fTableInfo;
/* static */ boost::mutex* BulkLoad::fDDLMutex = 0;
/* static */ const std::string BulkLoad::DIR_BULK_JOB("job");
/* static */ const std::string BulkLoad::DIR_BULK_TEMP_JOB("tmpjob");
/* static */ const std::string BulkLoad::DIR_BULK_IMPORT("/data/import/");
/* static */ bool BulkLoad::fNoConsoleOutput = false;
//------------------------------------------------------------------------------
// A thread to periodically call dbrm to see if a user is
// shutting down the system or has put the system into write
// suspend mode. DBRM has 2 flags to check in this case, the
// ROLLBACK flag, and the FORCE flag. These flags will be
// reported when we ask for the Shutdown Pending flag (which we
// ignore at this point). Even if the user is putting the system
// into write suspend mode, this call will return the flags we
// are interested in. If ROLLBACK is set, we cancel normally.
// If FORCE is set, we can't rollback.
struct CancellationThread
{
CancellationThread(BulkLoad* pBulkLoad) : fpBulkLoad(pBulkLoad)
{
}
BulkLoad* fpBulkLoad;
void operator()()
{
bool bRollback = false;
bool bForce = false;
int iShutdown;
while (fpBulkLoad->getContinue())
{
usleep(1000000); // 1 seconds
// Check to see if someone has ordered a shutdown or suspend with
// rollback or force.
iShutdown = BRMWrapper::getInstance()->isShutdownPending(bRollback, bForce);
if (iShutdown != ERR_BRM_GET_SHUTDOWN)
{
if (bRollback)
{
if (iShutdown == ERR_BRM_SHUTDOWN)
{
if (!BulkLoad::disableConsoleOutput())
cout << "System stop has been ordered. Rollback" << endl;
}
else
{
if (!BulkLoad::disableConsoleOutput())
cout << "Database writes have been suspended. Rollback" << endl;
}
BulkStatus::setJobStatus(EXIT_FAILURE);
}
else if (bForce)
{
if (!BulkLoad::disableConsoleOutput())
cout << "Immediate system stop has been ordered. "
<< "No rollback" << endl;
}
}
}
}
};
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
BulkLoad::BulkLoad()
: fColOp(new ColumnOpBulk())
, fColDelim('\0')
, fNoOfBuffers(-1)
, fBufferSize(-1)
, fFileVbufSize(-1)
, fMaxErrors(-1)
, fNoOfParseThreads(3)
, fNoOfReadThreads(1)
, fKeepRbMetaFiles(false)
, fNullStringMode(false)
, fEnclosedByChar('\0')
, // not enabled unless user overrides enclosed by char
fEscapeChar('\0')
, fTotalTime(0.0)
, fBulkMode(BULK_MODE_LOCAL)
, fbTruncationAsError(false)
, fImportDataMode(IMPORT_DATA_TEXT)
, fbContinue(false)
, fDisableTimeOut(false)
, fUUID(boost::uuids::nil_generator()())
, fTimeZone(dataconvert::systemTimeZoneOffset())
, fUsername("mysql") // MCOL-4328 default file owner
{
fTableInfo.clear();
setDebugLevel(DEBUG_0);
fDDLMutex = new boost::mutex();
memset(&fStartTime, 0, sizeof(timeval));
memset(&fEndTime, 0, sizeof(timeval));
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
BulkLoad::~BulkLoad()
{
fTableInfo.clear();
delete fDDLMutex;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Set alternate directory path for import data files. If the specified
// path is "STDIN", then the import data will be read from stdin.
// Note that we check for read "and" write access to the import directory
// path so that we can not only read the input files, but also write the
// *.bad and *.err files to that directory.
// PARAMETERS:
// loadDir - import directory path
// errMsg - return error msg if failed return code is returned
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::setAlternateImportDir(const std::string& loadDir, std::string& errMsg)
{
if (loadDir == IMPORT_PATH_STDIN)
{
fAlternateImportDir = loadDir;
}
else
{
if (access(loadDir.c_str(), R_OK | W_OK) < 0)
{
int errnum = errno;
ostringstream oss;
oss << "Error gaining r/w access to import path " << loadDir << ": " << strerror(errnum);
errMsg = oss.str();
return ERR_FILE_OPEN;
}
if (loadDir == IMPORT_PATH_CWD)
{
fAlternateImportDir = loadDir;
}
else
{
if (loadDir.c_str()[loadDir.size() - 1] == '/')
fAlternateImportDir = loadDir;
else
fAlternateImportDir = loadDir + "/";
}
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Load a job information
// PARAMETERS:
// fullName - full filename for job description file
// bUseTempJobFile - are we using a temporary job XML file
// argc - command line arg count
// argv - command line arguments
// bLogInfo2ToConsole - Log info2 msgs to the console
// bValidateColumnList- Validate that all the columns for each table have
// a corresponding <Column> or <DefaultColumn> tag.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc, char** argv,
bool bLogInfo2ToConsole, bool bValidateColumnList)
{
fJobFileName = fullName;
fRootDir = Config::getBulkRoot();
fJobInfo.setTimeZone(fTimeZone);
if (!exists(fullName.c_str()))
{
fLog.logMsg(" file " + fullName + " does not exist", ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
return ERR_FILE_NOT_EXIST;
}
std::string errMsg;
int rc = fJobInfo.loadJobXmlFile(fullName, bUseTempJobFile, bValidateColumnList, errMsg);
if (rc != NO_ERROR)
{
std::ostringstream oss;
oss << "Error loading job file " << fullName << "; " << errMsg;
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
Job& curJob = fJobInfo.getJob();
string logFile, errlogFile;
logFile = std::string(MCSLOGDIR) + "/cpimport/" + "Job_" + Convertor::int2Str(curJob.id) + LOG_SUFFIX;
errlogFile =
std::string(MCSLOGDIR) + "/cpimport/" + "Job_" + Convertor::int2Str(curJob.id) + ERR_LOG_SUFFIX;
if (disableConsoleOutput())
fLog.setLogFileName(logFile.c_str(), errlogFile.c_str(), false);
else
fLog.setLogFileName(logFile.c_str(), errlogFile.c_str(), (int)bLogInfo2ToConsole);
if (!(disableConsoleOutput()))
{
if (!BulkLoad::disableConsoleOutput())
cout << "Log file for this job: " << logFile << std::endl;
fLog.logMsg("successfully loaded job file " + fullName, MSGLVL_INFO1);
}
if (argc > 1)
{
std::ostringstream oss;
oss << "Command line options: ";
for (int k = 1; k < argc; k++)
{
if (!strcmp(argv[k], "\t")) // special case to print a <TAB>
oss << "'\\t'"
<< " ";
else
oss << argv[k] << " ";
}
fLog.logMsg(oss.str(), MSGLVL_INFO2);
}
// Validate that each table has 1 or more columns referenced in the xml file
for (unsigned i = 0; i < curJob.jobTableList.size(); i++)
{
if (curJob.jobTableList[i].colList.size() == 0)
{
rc = ERR_INVALID_PARAM;
fLog.logMsg(
"No column definitions in job description file for "
"table " +
curJob.jobTableList[i].tblName,
rc, MSGLVL_ERROR);
return rc;
}
// MCOL-5021
execplan::CalpontSystemCatalog::OID tableAUXColOid;
std::string tblName;
std::string curTblName = curJob.jobTableList[i].tblName;
// Parse out <tablename> from [<schemaname>.]<tablename> string
string::size_type startName = curTblName.rfind('.');
if (startName == std::string::npos)
tblName.assign(curTblName);
else
tblName.assign(curTblName.substr(startName + 1));
execplan::CalpontSystemCatalog::TableName table(curJob.schema, tblName);
try
{
boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
tableAUXColOid = cat->tableAUXColumnOID(table);
}
catch (logging::IDBExcept& ie)
{
rc = ERR_UNKNOWN;
std::ostringstream oss;
if (ie.errorCode() == logging::ERR_TABLE_NOT_IN_CATALOG)
{
oss << "Table " << table.toString();
oss << "does not exist in the system catalog.";
}
else
{
oss << "Error getting AUX column OID for table " << table.toString();
oss << " due to: " << ie.what();
}
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
catch (std::exception& ex)
{
rc = ERR_UNKNOWN;
std::ostringstream oss;
oss << "Error getting AUX column OID for table " << table.toString();
oss << " due to: " << ex.what();
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
catch (...)
{
rc = ERR_UNKNOWN;
std::ostringstream oss;
oss << "Error getting AUX column OID for table " << table.toString();
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
JobColumn curColumn("aux", tableAUXColOid, execplan::AUX_COL_DATATYPE_STRING, execplan::AUX_COL_WIDTH,
execplan::AUX_COL_WIDTH, execplan::AUX_COL_COMPRESSION_TYPE,
execplan::AUX_COL_COMPRESSION_TYPE, execplan::AUX_COL_MINVALUE,
execplan::AUX_COL_MAXVALUE, true, 1);
curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
curJob.jobTableList[i].colList.push_back(curColumn);
JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, curJob.jobTableList[i].colList.size() - 1);
curJob.jobTableList[i].fFldRefs.push_back(fieldRef);
}
}
// Validate that the user's xml file has been regenerated since the
// required tblOid attribute was added to the Table tag for table locking.
for (unsigned i = 0; i < curJob.jobTableList.size(); i++)
{
if (curJob.jobTableList[i].mapOid == 0)
{
rc = ERR_XML_PARSE;
fLog.logMsg("Outdated job file " + fullName + "; missing required 'tblOid' table attribute." +
" Please regenerate this xml file.",
rc, MSGLVL_ERROR);
return rc;
}
}
for (unsigned kT = 0; kT < curJob.jobTableList.size(); kT++)
{
for (unsigned kC = 0; kC < curJob.jobTableList[kT].colList.size(); kC++)
{
if (!compress::CompressInterface::isCompressionAvail(
curJob.jobTableList[kT].colList[kC].compressionType))
{
std::ostringstream oss;
oss << "Specified compression type (" << curJob.jobTableList[kT].colList[kC].compressionType
<< ") for table " << curJob.jobTableList[kT].tblName << " and column "
<< curJob.jobTableList[kT].colList[kC].colName << " is not available for use.";
rc = ERR_COMP_UNAVAIL_TYPE;
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
}
// If binary import, do not allow <IgnoreField> tags in the Job file
if ((fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL) || (fImportDataMode == IMPORT_DATA_BIN_SAT_NULL))
{
for (unsigned kT = 0; kT < curJob.jobTableList.size(); kT++)
{
if (curJob.jobTableList[kT].fIgnoredFields.size() > 0)
{
std::ostringstream oss;
oss << "<IgnoreField> tag present in Job file for table " << curJob.jobTableList[kT].tblName
<< "; this is not allowed for binary imports.";
rc = ERR_BULK_BINARY_IGNORE_FLD;
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
}
stopTimer();
std::ostringstream ossXMLTime;
ossXMLTime << "Job file loaded, run time for this step : " << getTotalRunTime() << " seconds";
fLog.logMsg(ossXMLTime.str(), MSGLVL_INFO1);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Spawns and joins the Read and Parsing threads to import the data.
// PARAMETERS:
// none
// RETURN:
// none
//------------------------------------------------------------------------------
void BulkLoad::spawnWorkers()
{
// We're fixin' to launch threads. This lets anybody who cares (i.e.
// checkCancellation) know that read and parse threads are running.
fbContinue = true;
// Spawn a thread to check for user cancellation via calpont console
// But only in mode 3 (local mode)
boost::thread cancelThread;
CancellationThread cancelationThread(this);
if (getBulkLoadMode() == BULK_MODE_LOCAL)
{
cancelThread = boost::thread(cancelationThread);
}
// Spawn read threads
for (int i = 0; i < fNoOfReadThreads; ++i)
{
fReadThreads.create_thread(boost::bind(&BulkLoad::read, this, (int)i));
}
fLog.logMsg("No of Read Threads Spawned = " + Convertor::int2Str(fNoOfReadThreads), MSGLVL_INFO1);
// Spawn parse threads
for (int i = 0; i < fNoOfParseThreads; ++i)
{
fParseThreads.create_thread(boost::bind(&BulkLoad::parse, this, (int)i));
}
fLog.logMsg("No of Parse Threads Spawned = " + Convertor::int2Str(fNoOfParseThreads), MSGLVL_INFO1);
fReadThreads.join_all();
fParseThreads.join_all();
fbContinue = false;
if (getBulkLoadMode() == BULK_MODE_LOCAL)
{
cancelThread.join();
}
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Pre process job. Determine DBRoot/segment file, HWM etc where we are
// to start adding rows, create ColumnInfo object for each column. Create
// initial segment file if necessary. This could happen in shared-nothing
// where CREATE TABLE only creates the initial segment file on one of the
// PMs. The first time rows are added on the other PMs, an initial segment
// file must be created. (This could also happen "if" we ever decide to
// allow the user to drop all partitions for a DBRoot, including the last
// partition.)
// PreProcessing also includes creating the bulk rollback back up files,
// initializing auto-increment, sanity checking the consistency of the HWM
// across columns, and opening the starting column and dictionary store
// files.
// PARAMETERS:
// job - current job
// tableNo - table no
// tableInfo - TableInfo object corresponding to tableNo table.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tableInfo)
{
int rc = NO_ERROR, minWidth = 9999; // give a big number
HWM minHWM = 999999; // rp 9/25/07 Bug 473
ColStruct curColStruct;
execplan::CalpontSystemCatalog::ColDataType colDataType;
// Initialize portions of TableInfo object
tableInfo->setBufferSize(fBufferSize);
tableInfo->setFileBufferSize(fFileVbufSize);
tableInfo->setTableId(tableNo);
tableInfo->setColDelimiter(fColDelim);
tableInfo->setJobFileName(fJobFileName);
tableInfo->setJobId(job.id);
tableInfo->setNullStringMode(fNullStringMode);
tableInfo->setEnclosedByChar(fEnclosedByChar);
tableInfo->setEscapeChar(fEscapeChar);
tableInfo->setImportDataMode(fImportDataMode);
tableInfo->setTimeZone(fTimeZone);
tableInfo->setJobUUID(fUUID);
// MCOL-4328 Get username gid and uid if they are set
// We inject uid and gid into TableInfo and All ColumnInfo-s later.
struct passwd* pwd = nullptr;
errno = 0;
if (fUsername.length() && (pwd = getpwnam(fUsername.c_str())) == nullptr)
{
std::ostringstream oss;
oss << "Error getting pwd for " << fUsername << " with errno " << errno;
fLog.logMsg(oss.str(), MSGLVL_ERROR);
return ERR_FILE_CHOWN;
}
if (pwd)
tableInfo->setUIDGID(pwd->pw_uid, pwd->pw_gid);
if (fMaxErrors != -1)
tableInfo->setMaxErrorRows(fMaxErrors);
else
tableInfo->setMaxErrorRows(job.jobTableList[tableNo].maxErrNum);
// @bug 3929: cpimport.bin error messaging using up too much memory.
// Validate that max allowed error count is within valid range
long long maxErrNum = tableInfo->getMaxErrorRows();
if (maxErrNum > MAX_ALLOW_ERROR_COUNT)
{
ostringstream oss;
oss << "Max allowed error count specified as " << maxErrNum << " for table "
<< job.jobTableList[tableNo].tblName << "; this exceeds limit of " << MAX_ALLOW_ERROR_COUNT
<< "; resetting to " << MAX_ALLOW_ERROR_COUNT;
fLog.logMsg(oss.str(), MSGLVL_INFO2);
maxErrNum = MAX_ALLOW_ERROR_COUNT;
}
tableInfo->setMaxErrorRows(maxErrNum);
//------------------------------------------------------------------------
// First loop thru the columns for the "tableNo" table in jobTableList[].
// Get the HWM information for each column.
//------------------------------------------------------------------------
std::vector<int> colWidths;
std::vector<DBRootExtentInfo> segFileInfo;
std::vector<DBRootExtentTracker*> dbRootExtTrackerVec;
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(job.jobTableList[tableNo].colList.size());
DBRootExtentTracker* pRefDBRootExtentTracker = 0;
bool bNoStartExtentOnThisPM = false;
bool bEmptyPM = false;
for (size_t i = 0; i < job.jobTableList[tableNo].colList.size(); i++)
{
const JobColumn& curJobCol = job.jobTableList[tableNo].colList[i];
// convert column data type
if (curJobCol.typeName.length() > 0 && fColOp->getColDataType(curJobCol.typeName.c_str(), colDataType))
{
job.jobTableList[tableNo].colList[i].dataType = curColStruct.colDataType = colDataType;
}
else
{
ostringstream oss;
oss << "Column type " << curJobCol.typeName << " is not valid ";
fLog.logMsg(oss.str(), ERR_INVALID_PARAM, MSGLVL_ERROR);
return ERR_INVALID_PARAM;
}
curColStruct.colWidth = curJobCol.width;
Convertor::convertColType(&curColStruct);
job.jobTableList[tableNo].colList[i].weType = curColStruct.colType;
// set width to correct column width
job.jobTableList[tableNo].colList[i].width = curColStruct.colWidth;
job.jobTableList[tableNo].colList[i].emptyVal = getEmptyRowValue(
job.jobTableList[tableNo].colList[i].dataType, job.jobTableList[tableNo].colList[i].width);
// check HWM for column file
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(curJobCol.mapOid, dbRootHWMInfoColVec[i]);
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "Error getting last DBRoot/HWMs for column file " << curJobCol.mapOid << "; "
<< ec.errorString(rc);
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
colWidths.push_back(job.jobTableList[tableNo].colList[i].width);
} // end of 1st for-loop through the list of columns (get starting HWM)
//--------------------------------------------------------------------------
// Second loop thru the columns for the "tableNo" table in jobTableList[].
// Create DBRootExtentTracker, and select starting DBRoot.
// Determine the smallest width column(s), and save that as minHWM.
// We save additional HWM information acquired from BRM, in segFileInfo,
// for later use.
//--------------------------------------------------------------------------
for (size_t i = 0; i < job.jobTableList[tableNo].colList.size(); i++)
{
const JobColumn& curJobCol = job.jobTableList[tableNo].colList[i];
// Find DBRoot/segment file where we want to start adding rows
DBRootExtentTracker* pDBRootExtentTracker =
new DBRootExtentTracker(curJobCol.mapOid, colWidths, dbRootHWMInfoColVec, i, &fLog);
if (i == 0)
pRefDBRootExtentTracker = pDBRootExtentTracker;
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
// Start adding rows to DBRoot/segment file that is selected
DBRootExtentInfo dbRootExtent;
if (i == 0) // select starting DBRoot/segment for column[0]
{
std::string trkErrMsg;
rc =
pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bNoStartExtentOnThisPM, bEmptyPM, trkErrMsg);
if (rc != NO_ERROR)
{
fLog.logMsg(trkErrMsg, rc, MSGLVL_ERROR);
return rc;
}
}
else // select starting DBRoot/segment based on column[0] selection
{
// to ensure all columns start with the same DBRoot/segment
pDBRootExtentTracker->assignFirstSegFile(*pRefDBRootExtentTracker, // reference column[0] tracker
dbRootExtent);
}
if (job.jobTableList[tableNo].colList[i].width < minWidth)
{
// save the minimum hwm -- rp 9/25/07 Bug 473
minWidth = job.jobTableList[tableNo].colList[i].width;
minHWM = dbRootExtent.fLocalHwm;
}
// Save column segment file info for use in subsequent loop
segFileInfo.push_back(dbRootExtent);
}
//--------------------------------------------------------------------------
// Validate that the starting HWMs for all the columns are in sync
//--------------------------------------------------------------------------
rc = tableInfo->validateColumnHWMs(&job.jobTableList[tableNo], segFileInfo, "Starting");
if (rc != NO_ERROR)
{
return rc;
}
//--------------------------------------------------------------------------
// Create bulk rollback meta data file
//--------------------------------------------------------------------------
ostringstream oss11;
oss11 << "Initializing import: "
<< "Table-" << job.jobTableList[tableNo].tblName << "...";
fLog.logMsg(oss11.str(), MSGLVL_INFO2);
rc = saveBulkRollbackMetaData(job, tableInfo.get(), segFileInfo, dbRootHWMInfoColVec);
if (rc != NO_ERROR)
{
return rc;
}
//--------------------------------------------------------------------------
// Third loop thru the columns for the "tableNo" table in jobTableList[].
// In this pass through the columns we create the ColumnInfo object,
// open the applicable column and dictionary store files, and seek to
// the block where we will begin adding data.
//--------------------------------------------------------------------------
unsigned int fixedBinaryRecLen = 0;
for (size_t i = 0; i < job.jobTableList[tableNo].colList.size(); i++)
{
uint16_t dbRoot = segFileInfo[i].fDbRoot;
uint32_t partition = segFileInfo[i].fPartition;
uint16_t segment = segFileInfo[i].fSegment;
HWM oldHwm = segFileInfo[i].fLocalHwm;
DBRootExtentTracker* pDBRootExtentTracker = 0;
if (dbRootExtTrackerVec.size() > 0)
pDBRootExtentTracker = dbRootExtTrackerVec[i];
// Create a ColumnInfo for the next column, and add to tableInfo
ColumnInfo* info = 0;
if (job.jobTableList[tableNo].colList[i].compressionType)
info = new ColumnInfoCompressed(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker,
tableInfo.get());
// tableInfo->rbMetaWriter());
else
info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo.get());
if (pwd)
info->setUIDGID(pwd->pw_uid, pwd->pw_gid);
// For auto increment column, we need to get the starting value
if (info->column.autoIncFlag)
{
rc = preProcessAutoInc(job.jobTableList[tableNo].tblName, info);
if (rc != NO_ERROR)
{
return rc;
}
}
// For binary input mode, sum up the columns widths to get fixed rec len
if ((fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL) || (fImportDataMode == IMPORT_DATA_BIN_SAT_NULL))
{
if (job.jobTableList[tableNo].fFldRefs[i].fFldColType == BULK_FLDCOL_COLUMN_FIELD)
{
fixedBinaryRecLen += info->column.definedWidth;
}
}
// Skip minimum blocks before starting import; minwidth columns skip to
// next block. Wider columns skip based on multiple of width. If this
// skipping of blocks requires a new extent, then we extend the column.
HWM hwm = (minHWM + 1) * (info->column.width / minWidth);
info->relativeColWidthFactor(info->column.width / minWidth);
if ((bEmptyPM) || (bNoStartExtentOnThisPM))
{
// HWM not found in prev loop; can't get LBID. Will create initial
// extent on this PM later in this job, if we have valid rows to add
if (bEmptyPM)
{
// No starting DB file on this PM
ostringstream oss3;
oss3 << "Currently no extents on dbroot" << dbRoot << " for column OID " << info->column.mapOid
<< "; will create starting extent";
fLog.logMsg(oss3.str(), MSGLVL_INFO2);
}
// Skip to subsequent physical partition if current HWM extent
// for this "dbroot" is disabled.
else // bNoStartExtentOnThisPM is true
{
// Starting DB file on this PM is disabled
ostringstream oss3;
oss3 << "Current HWM extent is disabled on dbroot" << dbRoot << " for column OID "
<< info->column.mapOid << "; will create starting extent";
fLog.logMsg(oss3.str(), MSGLVL_INFO2);
}
// Pass blocks to be skipped at start of file "if" we decide to
// employ block skipping for the first extent.
hwm = info->column.width / minWidth;
// We don't have a starting DB file on this PM, or the starting HWM
// extent is disabled. In either case, we will wait and create a
// new DB file to receive any new rows, only after we make sure we
// have rows to insert.
info->setupDelayedFileCreation(dbRoot, partition, segment, hwm, bEmptyPM);
}
else
{
// Establish starting HWM and LBID for this job.
// Keep in mind we have initial block skipping to account for.
bool bSkippedToNewExtent = false;
BRM::LBID_t lbid;
RETURN_ON_ERROR(preProcessHwmLbid(info, minWidth, partition, segment, hwm, lbid, bSkippedToNewExtent));
// Setup import to start loading into starting HWM DB file
RETURN_ON_ERROR(info->setupInitialColumnExtent(dbRoot, partition, segment,
job.jobTableList[tableNo].tblName, lbid, oldHwm, hwm,
bSkippedToNewExtent, bSkippedToNewExtent || oldHwm < 1));
}
tableInfo->addColumn(info);
} // end of 2nd for-loop through the list of columns
if ((fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL) || (fImportDataMode == IMPORT_DATA_BIN_SAT_NULL))
{
ostringstream oss12;
oss12 << "Table " << job.jobTableList[tableNo].tblName
<< " will be "
"imported in binary mode with fixed record length: "
<< fixedBinaryRecLen << " bytes; ";
if (fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL)
oss12 << "NULL values accepted";
else
oss12 << "NULL values saturated";
fLog.logMsg(oss12.str(), MSGLVL_INFO2);
}
// Initialize BulkLoadBuffers after we have added all the columns
rc = tableInfo->initializeBuffers(fNoOfBuffers, job.jobTableList[tableNo].fFldRefs, fixedBinaryRecLen);
if (rc)
return rc;
fTableInfo.push_back(std::shared_ptr<TableInfo>(tableInfo));
return NO_ERROR;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Saves snapshot of extentmap into a bulk rollback meta data file, for
// use in a bulk rollback, if the current cpimport.bin job should fail.
// PARAMETERS:
// job - current job
// tableInfo - TableInfo object corresponding to tableNo table.
// segFileInfo - Vector of File objects carrying starting DBRoot, partition,
// etc, for the columns belonging to tableNo.
// dbRootHWMInfoColVec - Vector of vectors carrying extent/HWM info for each
// dbroot for each column.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::saveBulkRollbackMetaData(Job& job, TableInfo* tableInfo,
const std::vector<DBRootExtentInfo>& segFileInfo,
const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoColVec)
{
return tableInfo->saveBulkRollbackMetaData(job, segFileInfo, dbRootHWMInfoColVec);
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Initialize auto-increment column for specified schema and table.
// PARAMETERS:
// fullTableName - Schema and table name separated by a period.
// colInfo - ColumnInfo associated with auto-increment column.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::preProcessAutoInc(const std::string& fullTableName, ColumnInfo* colInfo)
{
int rc = colInfo->initAutoInc(fullTableName);
return rc;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Determine starting HWM and LBID, after applying block skipping to HWM.
// PARAMETERS:
// info - ColumnInfo of column we are working with
// minWidth - minimum width among all columns for this table
// partition - partition of projected starting HWM
// segment - file segment number of projected starting HWM
// hwm (input/output) - input: projected starting HWM after block skipping
// output: adjusted starting HWM
// lbid output: LBID associated with adjusted HWM
// bSkippedToNewExtent- output:
// true -> normal block skipping use case
// false-> block skipped crossed out of hwm extent
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::preProcessHwmLbid(const ColumnInfo* info, int minWidth, uint32_t partition, uint16_t segment,
HWM& hwm, // input/output
BRM::LBID_t& lbid, // output
bool& bSkippedToNewExtent) // output
{
int rc = NO_ERROR;
bSkippedToNewExtent = false;
// Get starting LBID for the HWM block; if we can't get the start-
// ing LBID, it means initial block skipping crossed extent boundary
rc = BRMWrapper::getInstance()->getStartLbid(info->column.mapOid, partition, segment, (int)hwm, lbid);
// If HWM Lbid is missing, take alternative action to see what to do.
// Block skipping has caused us to advance out of the current HWM extent.
if (rc != NO_ERROR)
{
bSkippedToNewExtent = true;
lbid = INVALID_LBID;
int blocksPerExtent = (BRMWrapper::getInstance()->getExtentRows() * info->column.width) / BYTE_PER_BLOCK;
// Look for LBID associated with block at end of current extent
uint32_t numBlocks = (((hwm + 1) / blocksPerExtent) * blocksPerExtent);
hwm = numBlocks - 1;
rc = BRMWrapper::getInstance()->getStartLbid(info->column.mapOid, partition, segment, (int)hwm, lbid);
if (rc != NO_ERROR)
{
WErrorCodes ec;
ostringstream oss;
oss << "Error getting HWM start LBID "
"for previous last extent in column file OID-"
<< info->column.mapOid << "; partition-" << partition << "; segment-" << segment << "; hwm-" << hwm
<< "; " << ec.errorString(rc);
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
return rc;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::processJob()
{
#ifdef PROFILE
Stats::enableProfiling(fNoOfReadThreads, fNoOfParseThreads);
#endif
int rc = NO_ERROR;
Job curJob;
size_t i;
curJob = fJobInfo.getJob();
// For the following parms, we use the value read from the Job XML file if
// a cmd line override value was not already assigned by cpimport.cpp.
if (fNoOfBuffers == -1)
fNoOfBuffers = curJob.numberOfReadBuffers;
if (fBufferSize == -1)
fBufferSize = curJob.readBufferSize;
if (fFileVbufSize == -1)
fFileVbufSize = curJob.writeBufferSize;
if (fColDelim == '\0')
fColDelim = curJob.fDelimiter;
// std::cout << "bulkload::fEnclosedByChar<" << fEnclosedByChar << '>' <<
// std::endl << "bulkload::fEscapeChar<" << fEscapeChar << '>' <<
// std::endl << "job.fEnclosedByChar<" <<curJob.fEnclosedByChar<< '>' <<
// std::endl << "job.fEscapeChar<" << curJob.fEscapeChar << '>' <<
// std::endl;
if (fEnclosedByChar == '\0')
{
// std::cout << "Using enclosed char from xml file" << std::endl;
fEnclosedByChar = curJob.fEnclosedByChar;
}
if (fEscapeChar == '\0')
{
// std::cout << "Using escape char from xml file" << std::endl;
fEscapeChar = curJob.fEscapeChar;
}
// If EnclosedBy char is given, then we need an escape character.
// We default to '\' if we didn't get one from xml file or cmd line.
if (fEscapeChar == '\0')
{
// std::cout << "Using default escape char" << std::endl;
fEscapeChar = '\\';
}
// std::cout << "bulkload::fEnclosedByChar<" << fEnclosedByChar << '>' <<
// std::endl << "bulkload::fEscapeChar<" << fEscapeChar << '>' << std::endl;
// Bug1315 - check whether DBRoots are RW mounted.
std::vector<std::string> dbRootPathList;
Config::getDBRootPathList(dbRootPathList);
for (unsigned int counter = 0; counter < dbRootPathList.size(); counter++)
{
if (access(dbRootPathList[counter].c_str(), R_OK | W_OK) < 0)
{
rc = ERR_FILE_NOT_EXIST;
ostringstream oss;
oss << "Error accessing DBRoot[" << counter << "] " << dbRootPathList[counter] << "; "
<< strerror(errno);
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
// Init total cumulative run time with time it took to load xml file
double totalRunTime = getTotalRunTime();
fLog.logMsg("PreProcessing check starts", MSGLVL_INFO1);
startTimer();
//--------------------------------------------------------------------------
// Validate that only 1 table is specified for import if using STDIN
//--------------------------------------------------------------------------
if ((fAlternateImportDir == IMPORT_PATH_STDIN) && (curJob.jobTableList.size() > 1))
{
rc = ERR_INVALID_PARAM;
fLog.logMsg("Only 1 table can be imported per job when using STDIN", rc, MSGLVL_ERROR);
return rc;
}
//--------------------------------------------------------------------------
// Validate the existence of the import data files
//--------------------------------------------------------------------------
std::vector<std::shared_ptr<TableInfo>> tables;
for (i = 0; i < curJob.jobTableList.size(); i++)
{
std::shared_ptr<TableInfo> tableInfo(new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid,
curJob.jobTableList[i].tblName, fKeepRbMetaFiles));
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
tableInfo->setBulkLoadMode(fBulkMode, fBRMRptFileName);
tableInfo->setErrorDir(string(getErrorDir()));
tableInfo->setTruncationAsError(getTruncationAsError());
rc = manageImportDataFileList(curJob, i, tableInfo.get());
if (rc != NO_ERROR)
{
tableInfo->fBRMReporter.sendErrMsgToFile(tableInfo->fBRMRptFileName);
return rc;
}
tables.push_back(tableInfo);
}
//--------------------------------------------------------------------------
// Before we go any further, we lock all the tables
//--------------------------------------------------------------------------
for (i = 0; i < curJob.jobTableList.size(); i++)
{
rc = tables[i]->acquireTableLock(fDisableTimeOut);
if (rc != NO_ERROR)
{
// Try releasing the table locks we already acquired.
// Note that loop is k<i since tables[i] lock failed.
for (unsigned k = 0; k < i; k++)
{
tables[k]->releaseTableLock(); // ignore return code in this case
}
return rc;
}
// If we have a lock, then init MetaWriter, so that it can delete any
// leftover backup meta data files that collide with the ones we are
// going to create.
rc = tables[i]->initBulkRollbackMetaData();
if (rc != NO_ERROR)
{
// Try releasing the table locks we already acquired.
// Note that loop is k<i= since tables[i] lock worked
for (unsigned k = 0; k <= i; k++)
{
tables[k]->releaseTableLock(); // ignore return code in this case
}
return rc;
}
}
//--------------------------------------------------------------------------
// Perform necessary preprocessing for each table
//--------------------------------------------------------------------------
for (i = 0; i < curJob.jobTableList.size(); i++)
{
// If table already marked as complete then we are skipping the
// table because there were no input files to process.
if (tables[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE)
continue;
rc = preProcess(curJob, i, tables[i]);
if (rc != NO_ERROR)
{
std::string errMsg = "Error in pre-processing the job file for table " + curJob.jobTableList[i].tblName;
tables[i]->fBRMReporter.addToErrMsgEntry(errMsg);
fLog.logMsg(errMsg, rc, MSGLVL_CRITICAL);
// Okay to release the locks for the tables we did not get to
for (unsigned k = i + 1; k < tables.size(); k++)
{
tables[k]->releaseTableLock(); // ignore return code in this case
}
// Okay to release the locks for any tables we preprocessed.
// We will not have done anything to change these tables yet,
// so all we need to do is release the locks.
for (unsigned k = 0; k <= i; k++)
{
tables[k]->deleteMetaDataRollbackFile();
tables[k]->releaseTableLock(); // ignore return code
}
// Ignore the return code for now; more important to base rc on the
// success or failure of the previous work
// BUG 4398: distributed cpimport calls takeSnapshot for modes 1 & 2
if ((fBulkMode != BULK_MODE_REMOTE_SINGLE_SRC) && (fBulkMode != BULK_MODE_REMOTE_MULTIPLE_SRC))
{
BRMWrapper::getInstance()->takeSnapshot();
}
return rc;
}
}
stopTimer();
fLog.logMsg("PreProcessing check completed", MSGLVL_INFO1);
std::ostringstream ossPrepTime;
ossPrepTime << "preProcess completed, run time for this step : " << getTotalRunTime() << " seconds";
fLog.logMsg(ossPrepTime.str(), MSGLVL_INFO1);
totalRunTime += getTotalRunTime();
startTimer();
spawnWorkers();
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
{
rc = ERR_UNKNOWN;
}
// Regardless of JobStatus, we rollback any tables that are left locked
int rollback_rc = rollbackLockedTables();
if ((rc == NO_ERROR) && (rollback_rc != NO_ERROR))
{
rc = rollback_rc;
}
// Ignore the return code for now; more important to base rc on the
// success or failure of the previous work
// BUG 4398: distributed cpimport now calls takeSnapshot for modes 1 & 2
if ((fBulkMode != BULK_MODE_REMOTE_SINGLE_SRC) && (fBulkMode != BULK_MODE_REMOTE_MULTIPLE_SRC))
{
BRMWrapper::getInstance()->takeSnapshot();
}
stopTimer();
totalRunTime += getTotalRunTime();
std::ostringstream ossTotalRunTime;
ossTotalRunTime << "Bulk load completed, total run time : " << totalRunTime << " seconds" << std::endl;
fLog.logMsg(ossTotalRunTime.str(), MSGLVL_INFO1);
#ifdef PROFILE
Stats::printProfilingResults();
#endif
return rc;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Deconstruct the list of 1 or more import files for the specified table,
// and validate the existence of the specified files.
// PARAMETERS:
// job - current job
// tableNo - table no
// tableInfo - TableInfo object corresponding to tableNo table.
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::manageImportDataFileList(Job& job, int tableNo, TableInfo* tableInfo)
{
std::vector<std::string> loadFilesList;
bool bUseStdin = false;
// Check if all the import files are the same type.
const auto& fileNameA = (fCmdLineImportFiles.empty()) ? "" : fCmdLineImportFiles.front();
bool allFilesHaveSameType =
!fCmdLineImportFiles.empty() &&
std::all_of(std::next(fCmdLineImportFiles.begin()), fCmdLineImportFiles.end(),
[&fileNameA](auto& fileName) { return fileName.rfind(fileNameA) != std::string::npos; });
if (!fCmdLineImportFiles.empty() && !allFilesHaveSameType)
{
ostringstream oss;
oss << "Input files have different types.";
fLog.logMsg(oss.str(), ERR_FILE_TYPE_DIFF, MSGLVL_ERROR);
return ERR_FILE_TYPE_DIFF;
}
const bool isParquet = allFilesHaveSameType && fileNameA.rfind(".parquet") != std::string::npos;
if (isParquet)
{
setImportDataMode(IMPORT_DATA_PARQUET);
}
// Take loadFileName from command line argument override "if" one exists,
// else we take from the Job xml file
std::string loadFileName;
if (fCmdLineImportFiles.size() > (unsigned)tableNo)
loadFileName = fCmdLineImportFiles[tableNo];
else
loadFileName = job.jobTableList[tableNo].loadFileName;
if (fAlternateImportDir == IMPORT_PATH_STDIN)
{
bUseStdin = true;
fLog.logMsg("Using STDIN for input data", MSGLVL_INFO2);
int rc = buildImportDataFileList(std::string(), loadFileName, loadFilesList);
if (rc != NO_ERROR)
{
return rc;
}
// BUG 4737 - in Mode 1, all data coming from STDIN, ignore input files
if ((loadFilesList.size() > 1) && (fBulkMode != BULK_MODE_REMOTE_SINGLE_SRC))
{
ostringstream oss;
oss << "Table " << tableInfo->getTableName()
<< " specifies multiple "
"load files; This is not allowed when using STDIN";
fLog.logMsg(oss.str(), ERR_INVALID_PARAM, MSGLVL_ERROR);
tableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return ERR_INVALID_PARAM;
}
}
else
{
std::string importDir;
if (!fS3Key.empty())
{
loadFilesList.push_back(loadFileName);
}
else
{
if (fAlternateImportDir == IMPORT_PATH_CWD) // current working dir
{
char cwdBuf[4096];
importDir = ::getcwd(cwdBuf, sizeof(cwdBuf));
importDir += '/';
}
else if (fAlternateImportDir.size() > 0) // -f path
{
importDir = fAlternateImportDir;
}
else // <BULKROOT>/data/import
{
importDir = fRootDir;
importDir += DIR_BULK_IMPORT;
}
// Break down loadFileName into vector of file names in case load-
// FileName contains a list of files or 1 or more wildcards.
int rc = buildImportDataFileList(importDir, loadFileName, loadFilesList);
if (rc != NO_ERROR)
{
return rc;
}
}
// No filenames is considered a fatal error, except for remote mode2.
// For remote mode2 we just mark the table as complete since we will
// have no data to load, but we don't consider this as an error.
if (loadFilesList.size() == 0)
{
ostringstream oss;
oss << "No import files found. "
<< "default dir: " << importDir << " importFileName: " << loadFileName;
if (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)
{
tableInfo->setLoadFilesInput(bUseStdin, (!fS3Key.empty()), loadFilesList, fS3Host, fS3Key, fS3Secret,
fS3Bucket, fS3Region);
tableInfo->markTableComplete();
fLog.logMsg(oss.str(), MSGLVL_INFO1);
return NO_ERROR;
}
else
{
fLog.logMsg(oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
return ERR_FILE_NOT_EXIST;
}
}
// Verify that input data files exist.
// We also used to check to make sure the input file is not empty, and
// if it were, we threw an error at this point, but we removed that
// check. With shared-nothing, an empty file is now acceptable.
if (fS3Key.empty())
{
for (unsigned ndx = 0; ndx < loadFilesList.size(); ndx++)
{
// in addition to being more portable due to the use of boost, this change
// actually fixes an inherent bug with cpimport reading from a named pipe.
// Only the first open call gets any data passed through the pipe so the
// here that used to do an open to test for existence meant cpimport would
// never get data from the pipe.
boost::filesystem::path pathFile(loadFilesList[ndx]);
if (!boost::filesystem::exists(pathFile))
{
ostringstream oss;
oss << "input data file " << loadFilesList[ndx] << " does not exist";
fLog.logMsg(oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
tableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return ERR_FILE_NOT_EXIST;
}
else
{
ostringstream oss;
oss << "input data file " << loadFilesList[ndx];
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
}
}
}
tableInfo->setLoadFilesInput(bUseStdin, (!fS3Key.empty()), loadFilesList, fS3Host, fS3Key, fS3Secret,
fS3Bucket, fS3Region);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Break up the filename string (which may contain a list of file names)
// into a vector of strings, with each non-fully-qualified string being
// prefixed by the path specified by "location".
// PARAMETERS:
// location - path prefix
// filename - list of file names
// loadFiles- vector of file names extracted from filename string
// RETURN:
// NO_ERROR if success
//------------------------------------------------------------------------------
int BulkLoad::buildImportDataFileList(const std::string& location, const std::string& filename,
std::vector<std::string>& loadFiles)
{
char* filenames = new char[filename.size() + 1];
strcpy(filenames, filename.c_str());
char* str;
char* token;
for (str = filenames;; str = NULL)
{
token = strtok(str, ", |");
if (token == NULL)
break;
// If the token (filename) is fully qualified, then use the filename
// as-is, else prepend the location (path prefix)
boost::filesystem::path p(token);
std::string fullPath;
if (p.has_root_path())
{
fullPath = token;
}
else
{
fullPath = location;
fullPath += token;
}
// If running mode2, then support a filename with wildcards
if (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)
{
bool bExpandFileName = false;
size_t fpos = fullPath.find_first_of("[*?");
if (fpos != std::string::npos)
{
bExpandFileName = true;
}
else // expand a directory name
{
struct stat curStat;
if ((stat(fullPath.c_str(), &curStat) == 0) && (S_ISDIR(curStat.st_mode)))
{
bExpandFileName = true;
fullPath += "/*";
}
}
// If wildcard(s) present use glob() function to expand into a list
if (bExpandFileName)
{
glob_t globBuf;
memset(&globBuf, 0, sizeof(globBuf));
int globFlags = GLOB_ERR | GLOB_MARK;
int rc = glob(fullPath.c_str(), globFlags, 0, &globBuf);
if (rc != 0)
{
if (rc == GLOB_NOMATCH)
{
continue;
}
else
{
ostringstream oss;
oss << "Error expanding filename " << fullPath;
if (rc == GLOB_NOSPACE)
oss << "; out of memory";
else if (rc == GLOB_ABORTED)
oss << "; error reading directory";
else if (rc == GLOB_NOSYS)
oss << "; globbing not implemented";
else
oss << "; rc-" << rc;
fLog.logMsg(oss.str(), ERR_FILE_GLOBBING, MSGLVL_ERROR);
delete[] filenames;
return ERR_FILE_GLOBBING;
}
}
// Include all non-directory files in the import file list
std::string fullPath2;
for (unsigned int k = 0; k < globBuf.gl_pathc; k++)
{
fullPath2 = globBuf.gl_pathv[k];
if (!fullPath2.empty())
{
if (fullPath2[fullPath2.length() - 1] != '/')
{
loadFiles.push_back(fullPath2);
}
}
}
} // wild card present
else
{
loadFiles.push_back(fullPath);
}
} // mode2
else
{
loadFiles.push_back(fullPath);
} // not mode2
} // loop through filename tokens
delete[] filenames;
return NO_ERROR;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Clear table locks, and rollback any tables that are
// still locked through session manager.
// PARAMETERS:
// none
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::rollbackLockedTables()
{
int rc = NO_ERROR;
// See if there are any DB tables that were left in a locked state
bool lockedTableFound = false;
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (fTableInfo[i]->isTableLocked())
{
lockedTableFound = true;
break;
}
}
// If 1 or more tables failed to load, then report the lock
// state of each table we were importing.
if (lockedTableFound)
{
// Report the tables that were successfully loaded
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (!fTableInfo[i]->isTableLocked())
{
ostringstream oss;
oss << "Table " << fTableInfo[i]->getTableName() << " was successfully loaded. ";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
}
// Report the tables that were not successfully loaded
for (unsigned i = 0; i < fTableInfo.size(); i++)
{
if (fTableInfo[i]->isTableLocked())
{
if (fTableInfo[i]->hasProcessingBegun())
{
ostringstream oss;
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
<< " was not successfully loaded. Rolling back.";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
else
{
ostringstream oss;
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
<< " did not start loading. No rollback necessary.";
fLog.logMsg(oss.str(), MSGLVL_INFO1);
}
rc = rollbackLockedTable(*fTableInfo[i]);
if (rc != NO_ERROR)
{
break;
}
}
}
}
return rc;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Clear table lock, and rollback the specified table that is still locked.
// This function only comes into play for a mode3, since the tablelock and
// bulk rollbacks are managed by the parent (cpipmort file splitter) process
// in the case of mode1 and mode2 bulk loads.
// PARAMETERS:
// tableInfo - the table to be released and rolled back
// RETURN:
// NO_ERROR if success
// other if fail
//------------------------------------------------------------------------------
int BulkLoad::rollbackLockedTable(TableInfo& tableInfo)
{
return tableInfo.rollbackWork();
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Update next autoincrement value for specified column OID.
// PARAMETERS:
// columnOid - column OID of interest
// nextAutoIncVal - next autoincrement value to assign to tableOID
// RETURN:
// 0 if success
// other if fail
//------------------------------------------------------------------------------
/* static */
int BulkLoad::updateNextValue(OID columnOid, uint64_t nextAutoIncVal)
{
// The odds of us ever having 2 updateNextValue() calls going on in parallel
// are slim and none. But it's theoretically possible if we had an import
// job for 2 tables; so we put a mutex here just in case the DDLClient code
// won't work well with 2 competing WE_DDLCommandClient objects in the same
// process (ex: if there is any static data in WE_DDLCommandClient).
boost::mutex::scoped_lock lock(*fDDLMutex);
WE_DDLCommandClient ddlCommandClt;
unsigned int rc = ddlCommandClt.UpdateSyscolumnNextval(columnOid, nextAutoIncVal);
return (int)rc;
}
//------------------------------------------------------------------------------
bool BulkLoad::addErrorMsg2BrmUpdater(const std::string& tablename, const ostringstream& oss)
{
int size = fTableInfo.size();
if (size == 0)
return false;
for (int tableId = 0; tableId < size; tableId++)
{
if (fTableInfo[tableId]->getTableName() == tablename)
{
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
return true;
}
}
return false;
}
//------------------------------------------------------------------------------
// DESCRIPTION:
// Set job UUID. Used by Query Telemetry to identify a unique import
// job across PMs
// PARAMETERS:
// jobUUID - the job UUID
// RETURN:
// void
//------------------------------------------------------------------------------
void BulkLoad::setJobUUID(const std::string& jobUUID)
{
fUUID = boost::uuids::string_generator()(jobUUID);
}
void BulkLoad::setDefaultJobUUID()
{
if (fUUID.is_nil())
fUUID = boost::uuids::random_generator()();
}
} // namespace WriteEngine