mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-20 09:07:44 +03:00
1638 lines
55 KiB
C++
1638 lines
55 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*******************************************************************************
|
|
* $Id: we_bulkload.cpp 4730 2013-08-08 21:41:13Z chao $
|
|
*
|
|
*******************************************************************************/
|
|
/** @file */
|
|
|
|
#define WE_BULKLOAD_DLLEXPORT
|
|
#include "we_bulkload.h"
|
|
#undef WE_BULKLOAD_DLLEXPORT
|
|
|
|
#include <cmath>
|
|
#include <cstdlib>
|
|
#include <climits>
|
|
#include <glob.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
#include <vector>
|
|
#include <sstream>
|
|
|
|
#include <boost/filesystem.hpp>
|
|
#include <boost/uuid/uuid.hpp>
|
|
#include <boost/uuid/uuid_generators.hpp>
|
|
|
|
#include <pwd.h>
|
|
|
|
#include "we_bulkstatus.h"
|
|
#include "we_rbmetawriter.h"
|
|
#include "we_colopbulk.h"
|
|
#include "we_columninfocompressed.h"
|
|
#include "we_config.h"
|
|
#include "we_dbrootextenttracker.h"
|
|
#include "writeengine.h"
|
|
#include "sys/time.h"
|
|
#include "sys/types.h"
|
|
#include "dataconvert.h"
|
|
#include "idbcompress.h"
|
|
#include "calpontsystemcatalog.h"
|
|
#include "we_ddlcommandclient.h"
|
|
#include "mcsconfig.h"
|
|
|
|
using namespace std;
|
|
using namespace boost;
|
|
using namespace dataconvert;
|
|
|
|
namespace
|
|
{
|
|
const std::string IMPORT_PATH_STDIN("STDIN");
|
|
const std::string IMPORT_PATH_CWD(".");
|
|
const std::string LOG_SUFFIX = ".log"; // Job log file suffix
|
|
const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix
|
|
} // namespace
|
|
|
|
// extern WriteEngine::BRMWrapper* brmWrapperPtr;
|
|
namespace WriteEngine
|
|
{
|
|
/* static */ std::vector<std::shared_ptr<TableInfo>> BulkLoad::fTableInfo;
|
|
/* static */ boost::mutex* BulkLoad::fDDLMutex = 0;
|
|
|
|
/* static */ const std::string BulkLoad::DIR_BULK_JOB("job");
|
|
/* static */ const std::string BulkLoad::DIR_BULK_TEMP_JOB("tmpjob");
|
|
/* static */ const std::string BulkLoad::DIR_BULK_IMPORT("/data/import/");
|
|
/* static */ bool BulkLoad::fNoConsoleOutput = false;
|
|
|
|
//------------------------------------------------------------------------------
|
|
// A thread to periodically call dbrm to see if a user is
|
|
// shutting down the system or has put the system into write
|
|
// suspend mode. DBRM has 2 flags to check in this case, the
|
|
// ROLLBACK flag, and the FORCE flag. These flags will be
|
|
// reported when we ask for the Shutdown Pending flag (which we
|
|
// ignore at this point). Even if the user is putting the system
|
|
// into write suspend mode, this call will return the flags we
|
|
// are interested in. If ROLLBACK is set, we cancel normally.
|
|
// If FORCE is set, we can't rollback.
|
|
struct CancellationThread
|
|
{
|
|
CancellationThread(BulkLoad* pBulkLoad) : fpBulkLoad(pBulkLoad)
|
|
{
|
|
}
|
|
BulkLoad* fpBulkLoad;
|
|
void operator()()
|
|
{
|
|
bool bRollback = false;
|
|
bool bForce = false;
|
|
int iShutdown;
|
|
|
|
while (fpBulkLoad->getContinue())
|
|
{
|
|
usleep(1000000); // 1 seconds
|
|
// Check to see if someone has ordered a shutdown or suspend with
|
|
// rollback or force.
|
|
iShutdown = BRMWrapper::getInstance()->isShutdownPending(bRollback, bForce);
|
|
|
|
if (iShutdown != ERR_BRM_GET_SHUTDOWN)
|
|
{
|
|
if (bRollback)
|
|
{
|
|
if (iShutdown == ERR_BRM_SHUTDOWN)
|
|
{
|
|
if (!BulkLoad::disableConsoleOutput())
|
|
cout << "System stop has been ordered. Rollback" << endl;
|
|
}
|
|
else
|
|
{
|
|
if (!BulkLoad::disableConsoleOutput())
|
|
cout << "Database writes have been suspended. Rollback" << endl;
|
|
}
|
|
|
|
BulkStatus::setJobStatus(EXIT_FAILURE);
|
|
}
|
|
else if (bForce)
|
|
{
|
|
if (!BulkLoad::disableConsoleOutput())
|
|
cout << "Immediate system stop has been ordered. "
|
|
<< "No rollback" << endl;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Constructor
|
|
//------------------------------------------------------------------------------
|
|
BulkLoad::BulkLoad()
|
|
: fColOp(new ColumnOpBulk())
|
|
, fColDelim('\0')
|
|
, fNoOfBuffers(-1)
|
|
, fBufferSize(-1)
|
|
, fFileVbufSize(-1)
|
|
, fMaxErrors(-1)
|
|
, fNoOfParseThreads(3)
|
|
, fNoOfReadThreads(1)
|
|
, fKeepRbMetaFiles(false)
|
|
, fNullStringMode(false)
|
|
, fEnclosedByChar('\0')
|
|
, // not enabled unless user overrides enclosed by char
|
|
fEscapeChar('\0')
|
|
, fTotalTime(0.0)
|
|
, fBulkMode(BULK_MODE_LOCAL)
|
|
, fbTruncationAsError(false)
|
|
, fImportDataMode(IMPORT_DATA_TEXT)
|
|
, fbContinue(false)
|
|
, fDisableTimeOut(false)
|
|
, fUUID(boost::uuids::nil_generator()())
|
|
, fTimeZone(dataconvert::systemTimeZoneOffset())
|
|
, fUsername("mysql") // MCOL-4328 default file owner
|
|
{
|
|
fTableInfo.clear();
|
|
setDebugLevel(DEBUG_0);
|
|
|
|
fDDLMutex = new boost::mutex();
|
|
memset(&fStartTime, 0, sizeof(timeval));
|
|
memset(&fEndTime, 0, sizeof(timeval));
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Destructor
|
|
//------------------------------------------------------------------------------
|
|
BulkLoad::~BulkLoad()
|
|
{
|
|
fTableInfo.clear();
|
|
delete fDDLMutex;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Set alternate directory path for import data files. If the specified
|
|
// path is "STDIN", then the import data will be read from stdin.
|
|
// Note that we check for read "and" write access to the import directory
|
|
// path so that we can not only read the input files, but also write the
|
|
// *.bad and *.err files to that directory.
|
|
// PARAMETERS:
|
|
// loadDir - import directory path
|
|
// errMsg - return error msg if failed return code is returned
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::setAlternateImportDir(const std::string& loadDir, std::string& errMsg)
|
|
{
|
|
if (loadDir == IMPORT_PATH_STDIN)
|
|
{
|
|
fAlternateImportDir = loadDir;
|
|
}
|
|
else
|
|
{
|
|
if (access(loadDir.c_str(), R_OK | W_OK) < 0)
|
|
{
|
|
int errnum = errno;
|
|
ostringstream oss;
|
|
oss << "Error gaining r/w access to import path " << loadDir << ": " << strerror(errnum);
|
|
errMsg = oss.str();
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
if (loadDir == IMPORT_PATH_CWD)
|
|
{
|
|
fAlternateImportDir = loadDir;
|
|
}
|
|
else
|
|
{
|
|
if (loadDir.c_str()[loadDir.size() - 1] == '/')
|
|
fAlternateImportDir = loadDir;
|
|
else
|
|
fAlternateImportDir = loadDir + "/";
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Load a job information
|
|
// PARAMETERS:
|
|
// fullName - full filename for job description file
|
|
// bUseTempJobFile - are we using a temporary job XML file
|
|
// argc - command line arg count
|
|
// argv - command line arguments
|
|
// bLogInfo2ToConsole - Log info2 msgs to the console
|
|
// bValidateColumnList- Validate that all the columns for each table have
|
|
// a corresponding <Column> or <DefaultColumn> tag.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::loadJobInfo(const string& fullName, bool bUseTempJobFile, int argc, char** argv,
|
|
bool bLogInfo2ToConsole, bool bValidateColumnList)
|
|
{
|
|
fJobFileName = fullName;
|
|
fRootDir = Config::getBulkRoot();
|
|
fJobInfo.setTimeZone(fTimeZone);
|
|
|
|
if (!exists(fullName.c_str()))
|
|
{
|
|
fLog.logMsg(" file " + fullName + " does not exist", ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
|
|
return ERR_FILE_NOT_EXIST;
|
|
}
|
|
|
|
std::string errMsg;
|
|
int rc = fJobInfo.loadJobXmlFile(fullName, bUseTempJobFile, bValidateColumnList, errMsg);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Error loading job file " << fullName << "; " << errMsg;
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
Job& curJob = fJobInfo.getJob();
|
|
string logFile, errlogFile;
|
|
logFile = std::string(MCSLOGDIR) + "/cpimport/" + "Job_" + Convertor::int2Str(curJob.id) + LOG_SUFFIX;
|
|
errlogFile =
|
|
std::string(MCSLOGDIR) + "/cpimport/" + "Job_" + Convertor::int2Str(curJob.id) + ERR_LOG_SUFFIX;
|
|
|
|
if (disableConsoleOutput())
|
|
fLog.setLogFileName(logFile.c_str(), errlogFile.c_str(), false);
|
|
else
|
|
fLog.setLogFileName(logFile.c_str(), errlogFile.c_str(), (int)bLogInfo2ToConsole);
|
|
|
|
if (!(disableConsoleOutput()))
|
|
{
|
|
if (!BulkLoad::disableConsoleOutput())
|
|
cout << "Log file for this job: " << logFile << std::endl;
|
|
|
|
fLog.logMsg("successfully loaded job file " + fullName, MSGLVL_INFO1);
|
|
}
|
|
|
|
if (argc > 1)
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Command line options: ";
|
|
|
|
for (int k = 1; k < argc; k++)
|
|
{
|
|
if (!strcmp(argv[k], "\t")) // special case to print a <TAB>
|
|
oss << "'\\t'"
|
|
<< " ";
|
|
else
|
|
oss << argv[k] << " ";
|
|
}
|
|
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
// Validate that each table has 1 or more columns referenced in the xml file
|
|
for (unsigned i = 0; i < curJob.jobTableList.size(); i++)
|
|
{
|
|
if (curJob.jobTableList[i].colList.size() == 0)
|
|
{
|
|
rc = ERR_INVALID_PARAM;
|
|
fLog.logMsg(
|
|
"No column definitions in job description file for "
|
|
"table " +
|
|
curJob.jobTableList[i].tblName,
|
|
rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
// MCOL-5021
|
|
execplan::CalpontSystemCatalog::OID tableAUXColOid;
|
|
std::string tblName;
|
|
std::string curTblName = curJob.jobTableList[i].tblName;
|
|
|
|
// Parse out <tablename> from [<schemaname>.]<tablename> string
|
|
string::size_type startName = curTblName.rfind('.');
|
|
|
|
if (startName == std::string::npos)
|
|
tblName.assign(curTblName);
|
|
else
|
|
tblName.assign(curTblName.substr(startName + 1));
|
|
|
|
execplan::CalpontSystemCatalog::TableName table(curJob.schema, tblName);
|
|
|
|
try
|
|
{
|
|
boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
|
|
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
|
|
tableAUXColOid = cat->tableAUXColumnOID(table);
|
|
}
|
|
catch (logging::IDBExcept& ie)
|
|
{
|
|
rc = ERR_UNKNOWN;
|
|
std::ostringstream oss;
|
|
|
|
if (ie.errorCode() == logging::ERR_TABLE_NOT_IN_CATALOG)
|
|
{
|
|
oss << "Table " << table.toString();
|
|
oss << "does not exist in the system catalog.";
|
|
}
|
|
else
|
|
{
|
|
oss << "Error getting AUX column OID for table " << table.toString();
|
|
oss << " due to: " << ie.what();
|
|
}
|
|
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
catch(std::exception& ex)
|
|
{
|
|
rc = ERR_UNKNOWN;
|
|
std::ostringstream oss;
|
|
oss << "Error getting AUX column OID for table " << table.toString();
|
|
oss << " due to: " << ex.what();
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
catch(...)
|
|
{
|
|
rc = ERR_UNKNOWN;
|
|
std::ostringstream oss;
|
|
oss << "Error getting AUX column OID for table " << table.toString();
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
// MCOL-5021 Valid AUX column OID for a table is > 3000
|
|
// Tables that were created before this feature was added will have
|
|
// tableAUXColOid = 0
|
|
if (tableAUXColOid > 3000)
|
|
{
|
|
JobColumn curColumn("aux", tableAUXColOid, execplan::AUX_COL_DATATYPE_STRING,
|
|
execplan::AUX_COL_WIDTH, execplan::AUX_COL_WIDTH,
|
|
execplan::AUX_COL_COMPRESSION_TYPE, execplan::AUX_COL_COMPRESSION_TYPE,
|
|
execplan::AUX_COL_MINVALUE, execplan::AUX_COL_MAXVALUE, true, 1);
|
|
curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
|
|
curJob.jobTableList[i].colList.push_back(curColumn);
|
|
JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, curJob.jobTableList[i].colList.size() - 1);
|
|
curJob.jobTableList[i].fFldRefs.push_back(fieldRef);
|
|
}
|
|
}
|
|
|
|
// Validate that the user's xml file has been regenerated since the
|
|
// required tblOid attribute was added to the Table tag for table locking.
|
|
for (unsigned i = 0; i < curJob.jobTableList.size(); i++)
|
|
{
|
|
if (curJob.jobTableList[i].mapOid == 0)
|
|
{
|
|
rc = ERR_XML_PARSE;
|
|
fLog.logMsg("Outdated job file " + fullName + "; missing required 'tblOid' table attribute." +
|
|
" Please regenerate this xml file.",
|
|
rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
for (unsigned kT = 0; kT < curJob.jobTableList.size(); kT++)
|
|
{
|
|
for (unsigned kC = 0; kC < curJob.jobTableList[kT].colList.size(); kC++)
|
|
{
|
|
if (!compress::CompressInterface::isCompressionAvail(
|
|
curJob.jobTableList[kT].colList[kC].compressionType))
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Specified compression type (" << curJob.jobTableList[kT].colList[kC].compressionType
|
|
<< ") for table " << curJob.jobTableList[kT].tblName << " and column "
|
|
<< curJob.jobTableList[kT].colList[kC].colName << " is not available for use.";
|
|
rc = ERR_COMP_UNAVAIL_TYPE;
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
}
|
|
|
|
// If binary import, do not allow <IgnoreField> tags in the Job file
|
|
if ((fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL) || (fImportDataMode == IMPORT_DATA_BIN_SAT_NULL))
|
|
{
|
|
for (unsigned kT = 0; kT < curJob.jobTableList.size(); kT++)
|
|
{
|
|
if (curJob.jobTableList[kT].fIgnoredFields.size() > 0)
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "<IgnoreField> tag present in Job file for table " << curJob.jobTableList[kT].tblName
|
|
<< "; this is not allowed for binary imports.";
|
|
rc = ERR_BULK_BINARY_IGNORE_FLD;
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
}
|
|
|
|
stopTimer();
|
|
|
|
std::ostringstream ossXMLTime;
|
|
ossXMLTime << "Job file loaded, run time for this step : " << getTotalRunTime() << " seconds";
|
|
fLog.logMsg(ossXMLTime.str(), MSGLVL_INFO1);
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Spawns and joins the Read and Parsing threads to import the data.
|
|
// PARAMETERS:
|
|
// none
|
|
// RETURN:
|
|
// none
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoad::spawnWorkers()
|
|
{
|
|
// We're fixin' to launch threads. This lets anybody who cares (i.e.
|
|
// checkCancellation) know that read and parse threads are running.
|
|
fbContinue = true;
|
|
|
|
// Spawn a thread to check for user cancellation via calpont console
|
|
// But only in mode 3 (local mode)
|
|
boost::thread cancelThread;
|
|
CancellationThread cancelationThread(this);
|
|
|
|
if (getBulkLoadMode() == BULK_MODE_LOCAL)
|
|
{
|
|
cancelThread = boost::thread(cancelationThread);
|
|
}
|
|
|
|
// Spawn read threads
|
|
for (int i = 0; i < fNoOfReadThreads; ++i)
|
|
{
|
|
fReadThreads.create_thread(boost::bind(&BulkLoad::read, this, (int)i));
|
|
}
|
|
|
|
fLog.logMsg("No of Read Threads Spawned = " + Convertor::int2Str(fNoOfReadThreads), MSGLVL_INFO1);
|
|
|
|
// Spawn parse threads
|
|
for (int i = 0; i < fNoOfParseThreads; ++i)
|
|
{
|
|
fParseThreads.create_thread(boost::bind(&BulkLoad::parse, this, (int)i));
|
|
}
|
|
|
|
fLog.logMsg("No of Parse Threads Spawned = " + Convertor::int2Str(fNoOfParseThreads), MSGLVL_INFO1);
|
|
|
|
fReadThreads.join_all();
|
|
fParseThreads.join_all();
|
|
fbContinue = false;
|
|
|
|
if (getBulkLoadMode() == BULK_MODE_LOCAL)
|
|
{
|
|
cancelThread.join();
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Pre process job. Determine DBRoot/segment file, HWM etc where we are
|
|
// to start adding rows, create ColumnInfo object for each column. Create
|
|
// initial segment file if necessary. This could happen in shared-nothing
|
|
// where CREATE TABLE only creates the initial segment file on one of the
|
|
// PMs. The first time rows are added on the other PMs, an initial segment
|
|
// file must be created. (This could also happen "if" we ever decide to
|
|
// allow the user to drop all partitions for a DBRoot, including the last
|
|
// partition.)
|
|
// PreProcessing also includes creating the bulk rollback back up files,
|
|
// initializing auto-increment, sanity checking the consistency of the HWM
|
|
// across columns, and opening the starting column and dictionary store
|
|
// files.
|
|
// PARAMETERS:
|
|
// job - current job
|
|
// tableNo - table no
|
|
// tableInfo - TableInfo object corresponding to tableNo table.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tableInfo)
|
|
{
|
|
int rc = NO_ERROR, minWidth = 9999; // give a big number
|
|
HWM minHWM = 999999; // rp 9/25/07 Bug 473
|
|
ColStruct curColStruct;
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType;
|
|
|
|
// Initialize portions of TableInfo object
|
|
tableInfo->setBufferSize(fBufferSize);
|
|
tableInfo->setFileBufferSize(fFileVbufSize);
|
|
tableInfo->setTableId(tableNo);
|
|
tableInfo->setColDelimiter(fColDelim);
|
|
tableInfo->setJobFileName(fJobFileName);
|
|
tableInfo->setJobId(job.id);
|
|
tableInfo->setNullStringMode(fNullStringMode);
|
|
tableInfo->setEnclosedByChar(fEnclosedByChar);
|
|
tableInfo->setEscapeChar(fEscapeChar);
|
|
tableInfo->setImportDataMode(fImportDataMode);
|
|
tableInfo->setTimeZone(fTimeZone);
|
|
tableInfo->setJobUUID(fUUID);
|
|
|
|
// MCOL-4328 Get username gid and uid if they are set
|
|
// We inject uid and gid into TableInfo and All ColumnInfo-s later.
|
|
struct passwd* pwd = nullptr;
|
|
errno = 0;
|
|
if (fUsername.length() && (pwd = getpwnam(fUsername.c_str())) == nullptr)
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Error getting pwd for " << fUsername << " with errno " << errno;
|
|
fLog.logMsg(oss.str(), MSGLVL_ERROR);
|
|
return ERR_FILE_CHOWN;
|
|
}
|
|
|
|
if (pwd)
|
|
tableInfo->setUIDGID(pwd->pw_uid, pwd->pw_gid);
|
|
|
|
if (fMaxErrors != -1)
|
|
tableInfo->setMaxErrorRows(fMaxErrors);
|
|
else
|
|
tableInfo->setMaxErrorRows(job.jobTableList[tableNo].maxErrNum);
|
|
|
|
// @bug 3929: cpimport.bin error messaging using up too much memory.
|
|
// Validate that max allowed error count is within valid range
|
|
long long maxErrNum = tableInfo->getMaxErrorRows();
|
|
|
|
if (maxErrNum > MAX_ALLOW_ERROR_COUNT)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Max allowed error count specified as " << maxErrNum << " for table "
|
|
<< job.jobTableList[tableNo].tblName << "; this exceeds limit of " << MAX_ALLOW_ERROR_COUNT
|
|
<< "; resetting to " << MAX_ALLOW_ERROR_COUNT;
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO2);
|
|
maxErrNum = MAX_ALLOW_ERROR_COUNT;
|
|
}
|
|
|
|
tableInfo->setMaxErrorRows(maxErrNum);
|
|
|
|
//------------------------------------------------------------------------
|
|
// First loop thru the columns for the "tableNo" table in jobTableList[].
|
|
// Get the HWM information for each column.
|
|
//------------------------------------------------------------------------
|
|
std::vector<int> colWidths;
|
|
std::vector<DBRootExtentInfo> segFileInfo;
|
|
std::vector<DBRootExtentTracker*> dbRootExtTrackerVec;
|
|
std::vector<BRM::EmDbRootHWMInfo_v> dbRootHWMInfoColVec(job.jobTableList[tableNo].colList.size());
|
|
DBRootExtentTracker* pRefDBRootExtentTracker = 0;
|
|
bool bNoStartExtentOnThisPM = false;
|
|
bool bEmptyPM = false;
|
|
|
|
for (size_t i = 0; i < job.jobTableList[tableNo].colList.size(); i++)
|
|
{
|
|
const JobColumn& curJobCol = job.jobTableList[tableNo].colList[i];
|
|
|
|
// convert column data type
|
|
if (curJobCol.typeName.length() > 0 && fColOp->getColDataType(curJobCol.typeName.c_str(), colDataType))
|
|
{
|
|
job.jobTableList[tableNo].colList[i].dataType = curColStruct.colDataType = colDataType;
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Column type " << curJobCol.typeName << " is not valid ";
|
|
fLog.logMsg(oss.str(), ERR_INVALID_PARAM, MSGLVL_ERROR);
|
|
return ERR_INVALID_PARAM;
|
|
}
|
|
|
|
curColStruct.colWidth = curJobCol.width;
|
|
Convertor::convertColType(&curColStruct);
|
|
|
|
job.jobTableList[tableNo].colList[i].weType = curColStruct.colType;
|
|
// set width to correct column width
|
|
job.jobTableList[tableNo].colList[i].width = curColStruct.colWidth;
|
|
job.jobTableList[tableNo].colList[i].emptyVal = getEmptyRowValue(
|
|
job.jobTableList[tableNo].colList[i].dataType, job.jobTableList[tableNo].colList[i].width);
|
|
|
|
// check HWM for column file
|
|
rc = BRMWrapper::getInstance()->getDbRootHWMInfo(curJobCol.mapOid, dbRootHWMInfoColVec[i]);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "Error getting last DBRoot/HWMs for column file " << curJobCol.mapOid << "; "
|
|
<< ec.errorString(rc);
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
colWidths.push_back(job.jobTableList[tableNo].colList[i].width);
|
|
} // end of 1st for-loop through the list of columns (get starting HWM)
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Second loop thru the columns for the "tableNo" table in jobTableList[].
|
|
// Create DBRootExtentTracker, and select starting DBRoot.
|
|
// Determine the smallest width column(s), and save that as minHWM.
|
|
// We save additional HWM information acquired from BRM, in segFileInfo,
|
|
// for later use.
|
|
//--------------------------------------------------------------------------
|
|
for (size_t i = 0; i < job.jobTableList[tableNo].colList.size(); i++)
|
|
{
|
|
const JobColumn& curJobCol = job.jobTableList[tableNo].colList[i];
|
|
|
|
// Find DBRoot/segment file where we want to start adding rows
|
|
DBRootExtentTracker* pDBRootExtentTracker =
|
|
new DBRootExtentTracker(curJobCol.mapOid, colWidths, dbRootHWMInfoColVec, i, &fLog);
|
|
|
|
if (i == 0)
|
|
pRefDBRootExtentTracker = pDBRootExtentTracker;
|
|
|
|
dbRootExtTrackerVec.push_back(pDBRootExtentTracker);
|
|
|
|
// Start adding rows to DBRoot/segment file that is selected
|
|
DBRootExtentInfo dbRootExtent;
|
|
|
|
if (i == 0) // select starting DBRoot/segment for column[0]
|
|
{
|
|
std::string trkErrMsg;
|
|
rc =
|
|
pDBRootExtentTracker->selectFirstSegFile(dbRootExtent, bNoStartExtentOnThisPM, bEmptyPM, trkErrMsg);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
fLog.logMsg(trkErrMsg, rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
else // select starting DBRoot/segment based on column[0] selection
|
|
{
|
|
// to ensure all columns start with the same DBRoot/segment
|
|
pDBRootExtentTracker->assignFirstSegFile(*pRefDBRootExtentTracker, // reference column[0] tracker
|
|
dbRootExtent);
|
|
}
|
|
|
|
if (job.jobTableList[tableNo].colList[i].width < minWidth)
|
|
{
|
|
// save the minimum hwm -- rp 9/25/07 Bug 473
|
|
minWidth = job.jobTableList[tableNo].colList[i].width;
|
|
minHWM = dbRootExtent.fLocalHwm;
|
|
}
|
|
|
|
// Save column segment file info for use in subsequent loop
|
|
segFileInfo.push_back(dbRootExtent);
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Validate that the starting HWMs for all the columns are in sync
|
|
//--------------------------------------------------------------------------
|
|
rc = tableInfo->validateColumnHWMs(&job.jobTableList[tableNo], segFileInfo, "Starting");
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Create bulk rollback meta data file
|
|
//--------------------------------------------------------------------------
|
|
ostringstream oss11;
|
|
oss11 << "Initializing import: "
|
|
<< "Table-" << job.jobTableList[tableNo].tblName << "...";
|
|
fLog.logMsg(oss11.str(), MSGLVL_INFO2);
|
|
|
|
rc = saveBulkRollbackMetaData(job, tableInfo.get(), segFileInfo, dbRootHWMInfoColVec);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Third loop thru the columns for the "tableNo" table in jobTableList[].
|
|
// In this pass through the columns we create the ColumnInfo object,
|
|
// open the applicable column and dictionary store files, and seek to
|
|
// the block where we will begin adding data.
|
|
//--------------------------------------------------------------------------
|
|
unsigned int fixedBinaryRecLen = 0;
|
|
|
|
for (size_t i = 0; i < job.jobTableList[tableNo].colList.size(); i++)
|
|
{
|
|
uint16_t dbRoot = segFileInfo[i].fDbRoot;
|
|
uint32_t partition = segFileInfo[i].fPartition;
|
|
uint16_t segment = segFileInfo[i].fSegment;
|
|
HWM oldHwm = segFileInfo[i].fLocalHwm;
|
|
|
|
DBRootExtentTracker* pDBRootExtentTracker = 0;
|
|
|
|
if (dbRootExtTrackerVec.size() > 0)
|
|
pDBRootExtentTracker = dbRootExtTrackerVec[i];
|
|
|
|
// Create a ColumnInfo for the next column, and add to tableInfo
|
|
ColumnInfo* info = 0;
|
|
|
|
if (job.jobTableList[tableNo].colList[i].compressionType)
|
|
info = new ColumnInfoCompressed(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker,
|
|
tableInfo.get());
|
|
// tableInfo->rbMetaWriter());
|
|
else
|
|
info = new ColumnInfo(&fLog, i, job.jobTableList[tableNo].colList[i], pDBRootExtentTracker, tableInfo.get());
|
|
|
|
if (pwd)
|
|
info->setUIDGID(pwd->pw_uid, pwd->pw_gid);
|
|
|
|
// For auto increment column, we need to get the starting value
|
|
if (info->column.autoIncFlag)
|
|
{
|
|
rc = preProcessAutoInc(job.jobTableList[tableNo].tblName, info);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// For binary input mode, sum up the columns widths to get fixed rec len
|
|
if ((fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL) || (fImportDataMode == IMPORT_DATA_BIN_SAT_NULL))
|
|
{
|
|
if (job.jobTableList[tableNo].fFldRefs[i].fFldColType == BULK_FLDCOL_COLUMN_FIELD)
|
|
{
|
|
fixedBinaryRecLen += info->column.definedWidth;
|
|
}
|
|
}
|
|
|
|
// Skip minimum blocks before starting import; minwidth columns skip to
|
|
// next block. Wider columns skip based on multiple of width. If this
|
|
// skipping of blocks requires a new extent, then we extend the column.
|
|
HWM hwm = (minHWM + 1) * (info->column.width / minWidth);
|
|
info->relativeColWidthFactor(info->column.width / minWidth);
|
|
|
|
if ((bEmptyPM) || (bNoStartExtentOnThisPM))
|
|
{
|
|
// HWM not found in prev loop; can't get LBID. Will create initial
|
|
// extent on this PM later in this job, if we have valid rows to add
|
|
if (bEmptyPM)
|
|
{
|
|
// No starting DB file on this PM
|
|
ostringstream oss3;
|
|
oss3 << "Currently no extents on dbroot" << dbRoot << " for column OID " << info->column.mapOid
|
|
<< "; will create starting extent";
|
|
fLog.logMsg(oss3.str(), MSGLVL_INFO2);
|
|
}
|
|
// Skip to subsequent physical partition if current HWM extent
|
|
// for this "dbroot" is disabled.
|
|
else // bNoStartExtentOnThisPM is true
|
|
{
|
|
// Starting DB file on this PM is disabled
|
|
ostringstream oss3;
|
|
oss3 << "Current HWM extent is disabled on dbroot" << dbRoot << " for column OID "
|
|
<< info->column.mapOid << "; will create starting extent";
|
|
fLog.logMsg(oss3.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
// Pass blocks to be skipped at start of file "if" we decide to
|
|
// employ block skipping for the first extent.
|
|
hwm = info->column.width / minWidth;
|
|
|
|
// We don't have a starting DB file on this PM, or the starting HWM
|
|
// extent is disabled. In either case, we will wait and create a
|
|
// new DB file to receive any new rows, only after we make sure we
|
|
// have rows to insert.
|
|
info->setupDelayedFileCreation(dbRoot, partition, segment, hwm, bEmptyPM);
|
|
}
|
|
else
|
|
{
|
|
// Establish starting HWM and LBID for this job.
|
|
// Keep in mind we have initial block skipping to account for.
|
|
bool bSkippedToNewExtent = false;
|
|
BRM::LBID_t lbid;
|
|
|
|
RETURN_ON_ERROR(preProcessHwmLbid(info, minWidth, partition, segment, hwm, lbid, bSkippedToNewExtent));
|
|
|
|
// Setup import to start loading into starting HWM DB file
|
|
RETURN_ON_ERROR(info->setupInitialColumnExtent(dbRoot, partition, segment,
|
|
job.jobTableList[tableNo].tblName, lbid, oldHwm, hwm,
|
|
bSkippedToNewExtent, bSkippedToNewExtent || oldHwm < 1));
|
|
}
|
|
|
|
tableInfo->addColumn(info);
|
|
|
|
} // end of 2nd for-loop through the list of columns
|
|
|
|
if ((fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL) || (fImportDataMode == IMPORT_DATA_BIN_SAT_NULL))
|
|
{
|
|
ostringstream oss12;
|
|
oss12 << "Table " << job.jobTableList[tableNo].tblName
|
|
<< " will be "
|
|
"imported in binary mode with fixed record length: "
|
|
<< fixedBinaryRecLen << " bytes; ";
|
|
|
|
if (fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL)
|
|
oss12 << "NULL values accepted";
|
|
else
|
|
oss12 << "NULL values saturated";
|
|
|
|
fLog.logMsg(oss12.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
// Initialize BulkLoadBuffers after we have added all the columns
|
|
rc = tableInfo->initializeBuffers(fNoOfBuffers, job.jobTableList[tableNo].fFldRefs, fixedBinaryRecLen);
|
|
if (rc)
|
|
return rc;
|
|
|
|
fTableInfo.push_back(std::shared_ptr<TableInfo>(tableInfo));
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Saves snapshot of extentmap into a bulk rollback meta data file, for
|
|
// use in a bulk rollback, if the current cpimport.bin job should fail.
|
|
// PARAMETERS:
|
|
// job - current job
|
|
// tableInfo - TableInfo object corresponding to tableNo table.
|
|
// segFileInfo - Vector of File objects carrying starting DBRoot, partition,
|
|
// etc, for the columns belonging to tableNo.
|
|
// dbRootHWMInfoColVec - Vector of vectors carrying extent/HWM info for each
|
|
// dbroot for each column.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::saveBulkRollbackMetaData(Job& job, TableInfo* tableInfo,
|
|
const std::vector<DBRootExtentInfo>& segFileInfo,
|
|
const std::vector<BRM::EmDbRootHWMInfo_v>& dbRootHWMInfoColVec)
|
|
{
|
|
return tableInfo->saveBulkRollbackMetaData(job, segFileInfo, dbRootHWMInfoColVec);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Initialize auto-increment column for specified schema and table.
|
|
// PARAMETERS:
|
|
// fullTableName - Schema and table name separated by a period.
|
|
// colInfo - ColumnInfo associated with auto-increment column.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::preProcessAutoInc(const std::string& fullTableName, ColumnInfo* colInfo)
|
|
{
|
|
int rc = colInfo->initAutoInc(fullTableName);
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Determine starting HWM and LBID, after applying block skipping to HWM.
|
|
// PARAMETERS:
|
|
// info - ColumnInfo of column we are working with
|
|
// minWidth - minimum width among all columns for this table
|
|
// partition - partition of projected starting HWM
|
|
// segment - file segment number of projected starting HWM
|
|
// hwm (input/output) - input: projected starting HWM after block skipping
|
|
// output: adjusted starting HWM
|
|
// lbid output: LBID associated with adjusted HWM
|
|
// bSkippedToNewExtent- output:
|
|
// true -> normal block skipping use case
|
|
// false-> block skipped crossed out of hwm extent
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::preProcessHwmLbid(const ColumnInfo* info, int minWidth, uint32_t partition, uint16_t segment,
|
|
HWM& hwm, // input/output
|
|
BRM::LBID_t& lbid, // output
|
|
bool& bSkippedToNewExtent) // output
|
|
{
|
|
int rc = NO_ERROR;
|
|
bSkippedToNewExtent = false;
|
|
|
|
// Get starting LBID for the HWM block; if we can't get the start-
|
|
// ing LBID, it means initial block skipping crossed extent boundary
|
|
rc = BRMWrapper::getInstance()->getStartLbid(info->column.mapOid, partition, segment, (int)hwm, lbid);
|
|
|
|
// If HWM Lbid is missing, take alternative action to see what to do.
|
|
// Block skipping has caused us to advance out of the current HWM extent.
|
|
if (rc != NO_ERROR)
|
|
{
|
|
bSkippedToNewExtent = true;
|
|
|
|
lbid = INVALID_LBID;
|
|
|
|
int blocksPerExtent = (BRMWrapper::getInstance()->getExtentRows() * info->column.width) / BYTE_PER_BLOCK;
|
|
|
|
// Look for LBID associated with block at end of current extent
|
|
uint32_t numBlocks = (((hwm + 1) / blocksPerExtent) * blocksPerExtent);
|
|
|
|
hwm = numBlocks - 1;
|
|
rc = BRMWrapper::getInstance()->getStartLbid(info->column.mapOid, partition, segment, (int)hwm, lbid);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "Error getting HWM start LBID "
|
|
"for previous last extent in column file OID-"
|
|
<< info->column.mapOid << "; partition-" << partition << "; segment-" << segment << "; hwm-" << hwm
|
|
<< "; " << ec.errorString(rc);
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::processJob()
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::enableProfiling(fNoOfReadThreads, fNoOfParseThreads);
|
|
#endif
|
|
int rc = NO_ERROR;
|
|
Job curJob;
|
|
size_t i;
|
|
|
|
curJob = fJobInfo.getJob();
|
|
|
|
// For the following parms, we use the value read from the Job XML file if
|
|
// a cmd line override value was not already assigned by cpimport.cpp.
|
|
if (fNoOfBuffers == -1)
|
|
fNoOfBuffers = curJob.numberOfReadBuffers;
|
|
|
|
if (fBufferSize == -1)
|
|
fBufferSize = curJob.readBufferSize;
|
|
|
|
if (fFileVbufSize == -1)
|
|
fFileVbufSize = curJob.writeBufferSize;
|
|
|
|
if (fColDelim == '\0')
|
|
fColDelim = curJob.fDelimiter;
|
|
|
|
// std::cout << "bulkload::fEnclosedByChar<" << fEnclosedByChar << '>' <<
|
|
// std::endl << "bulkload::fEscapeChar<" << fEscapeChar << '>' <<
|
|
// std::endl << "job.fEnclosedByChar<" <<curJob.fEnclosedByChar<< '>' <<
|
|
// std::endl << "job.fEscapeChar<" << curJob.fEscapeChar << '>' <<
|
|
// std::endl;
|
|
if (fEnclosedByChar == '\0')
|
|
{
|
|
// std::cout << "Using enclosed char from xml file" << std::endl;
|
|
fEnclosedByChar = curJob.fEnclosedByChar;
|
|
}
|
|
|
|
if (fEscapeChar == '\0')
|
|
{
|
|
// std::cout << "Using escape char from xml file" << std::endl;
|
|
fEscapeChar = curJob.fEscapeChar;
|
|
}
|
|
|
|
// If EnclosedBy char is given, then we need an escape character.
|
|
// We default to '\' if we didn't get one from xml file or cmd line.
|
|
if (fEscapeChar == '\0')
|
|
{
|
|
// std::cout << "Using default escape char" << std::endl;
|
|
fEscapeChar = '\\';
|
|
}
|
|
|
|
// std::cout << "bulkload::fEnclosedByChar<" << fEnclosedByChar << '>' <<
|
|
// std::endl << "bulkload::fEscapeChar<" << fEscapeChar << '>' << std::endl;
|
|
|
|
// Bug1315 - check whether DBRoots are RW mounted.
|
|
std::vector<std::string> dbRootPathList;
|
|
Config::getDBRootPathList(dbRootPathList);
|
|
|
|
for (unsigned int counter = 0; counter < dbRootPathList.size(); counter++)
|
|
{
|
|
if (access(dbRootPathList[counter].c_str(), R_OK | W_OK) < 0)
|
|
{
|
|
rc = ERR_FILE_NOT_EXIST;
|
|
ostringstream oss;
|
|
oss << "Error accessing DBRoot[" << counter << "] " << dbRootPathList[counter] << "; "
|
|
<< strerror(errno);
|
|
fLog.logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// Init total cumulative run time with time it took to load xml file
|
|
double totalRunTime = getTotalRunTime();
|
|
fLog.logMsg("PreProcessing check starts", MSGLVL_INFO1);
|
|
startTimer();
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Validate that only 1 table is specified for import if using STDIN
|
|
//--------------------------------------------------------------------------
|
|
if ((fAlternateImportDir == IMPORT_PATH_STDIN) && (curJob.jobTableList.size() > 1))
|
|
{
|
|
rc = ERR_INVALID_PARAM;
|
|
fLog.logMsg("Only 1 table can be imported per job when using STDIN", rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Validate the existence of the import data files
|
|
//--------------------------------------------------------------------------
|
|
std::vector<std::shared_ptr<TableInfo>> tables;
|
|
|
|
for (i = 0; i < curJob.jobTableList.size(); i++)
|
|
{
|
|
std::shared_ptr<TableInfo> tableInfo(new TableInfo(&fLog, fTxnID, fProcessName, curJob.jobTableList[i].mapOid,
|
|
curJob.jobTableList[i].tblName, fKeepRbMetaFiles));
|
|
|
|
if ((fBulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
tableInfo->setBulkLoadMode(fBulkMode, fBRMRptFileName);
|
|
|
|
tableInfo->setErrorDir(string(getErrorDir()));
|
|
tableInfo->setTruncationAsError(getTruncationAsError());
|
|
rc = manageImportDataFileList(curJob, i, tableInfo.get());
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
tableInfo->fBRMReporter.sendErrMsgToFile(tableInfo->fBRMRptFileName);
|
|
return rc;
|
|
}
|
|
|
|
tables.push_back(tableInfo);
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Before we go any further, we lock all the tables
|
|
//--------------------------------------------------------------------------
|
|
for (i = 0; i < curJob.jobTableList.size(); i++)
|
|
{
|
|
rc = tables[i]->acquireTableLock(fDisableTimeOut);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Try releasing the table locks we already acquired.
|
|
// Note that loop is k<i since tables[i] lock failed.
|
|
for (unsigned k = 0; k < i; k++)
|
|
{
|
|
tables[k]->releaseTableLock(); // ignore return code in this case
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
// If we have a lock, then init MetaWriter, so that it can delete any
|
|
// leftover backup meta data files that collide with the ones we are
|
|
// going to create.
|
|
rc = tables[i]->initBulkRollbackMetaData();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// Try releasing the table locks we already acquired.
|
|
// Note that loop is k<i= since tables[i] lock worked
|
|
for (unsigned k = 0; k <= i; k++)
|
|
{
|
|
tables[k]->releaseTableLock(); // ignore return code in this case
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Perform necessary preprocessing for each table
|
|
//--------------------------------------------------------------------------
|
|
for (i = 0; i < curJob.jobTableList.size(); i++)
|
|
{
|
|
// If table already marked as complete then we are skipping the
|
|
// table because there were no input files to process.
|
|
if (tables[i]->getStatusTI() == WriteEngine::PARSE_COMPLETE)
|
|
continue;
|
|
|
|
rc = preProcess(curJob, i, tables[i]);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
std::string errMsg = "Error in pre-processing the job file for table " + curJob.jobTableList[i].tblName;
|
|
tables[i]->fBRMReporter.addToErrMsgEntry(errMsg);
|
|
fLog.logMsg(errMsg, rc, MSGLVL_CRITICAL);
|
|
|
|
// Okay to release the locks for the tables we did not get to
|
|
for (unsigned k = i + 1; k < tables.size(); k++)
|
|
{
|
|
tables[k]->releaseTableLock(); // ignore return code in this case
|
|
}
|
|
|
|
// Okay to release the locks for any tables we preprocessed.
|
|
// We will not have done anything to change these tables yet,
|
|
// so all we need to do is release the locks.
|
|
for (unsigned k = 0; k <= i; k++)
|
|
{
|
|
tables[k]->deleteMetaDataRollbackFile();
|
|
tables[k]->releaseTableLock(); // ignore return code
|
|
}
|
|
|
|
// Ignore the return code for now; more important to base rc on the
|
|
// success or failure of the previous work
|
|
|
|
// BUG 4398: distributed cpimport calls takeSnapshot for modes 1 & 2
|
|
if ((fBulkMode != BULK_MODE_REMOTE_SINGLE_SRC) && (fBulkMode != BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
BRMWrapper::getInstance()->takeSnapshot();
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
stopTimer();
|
|
fLog.logMsg("PreProcessing check completed", MSGLVL_INFO1);
|
|
|
|
std::ostringstream ossPrepTime;
|
|
ossPrepTime << "preProcess completed, run time for this step : " << getTotalRunTime() << " seconds";
|
|
fLog.logMsg(ossPrepTime.str(), MSGLVL_INFO1);
|
|
totalRunTime += getTotalRunTime();
|
|
|
|
startTimer();
|
|
|
|
spawnWorkers();
|
|
|
|
if (BulkStatus::getJobStatus() == EXIT_FAILURE)
|
|
{
|
|
rc = ERR_UNKNOWN;
|
|
}
|
|
|
|
// Regardless of JobStatus, we rollback any tables that are left locked
|
|
int rollback_rc = rollbackLockedTables();
|
|
|
|
if ((rc == NO_ERROR) && (rollback_rc != NO_ERROR))
|
|
{
|
|
rc = rollback_rc;
|
|
}
|
|
|
|
// Ignore the return code for now; more important to base rc on the
|
|
// success or failure of the previous work
|
|
|
|
// BUG 4398: distributed cpimport now calls takeSnapshot for modes 1 & 2
|
|
if ((fBulkMode != BULK_MODE_REMOTE_SINGLE_SRC) && (fBulkMode != BULK_MODE_REMOTE_MULTIPLE_SRC))
|
|
{
|
|
BRMWrapper::getInstance()->takeSnapshot();
|
|
}
|
|
|
|
stopTimer();
|
|
totalRunTime += getTotalRunTime();
|
|
|
|
std::ostringstream ossTotalRunTime;
|
|
ossTotalRunTime << "Bulk load completed, total run time : " << totalRunTime << " seconds" << std::endl;
|
|
fLog.logMsg(ossTotalRunTime.str(), MSGLVL_INFO1);
|
|
|
|
#ifdef PROFILE
|
|
Stats::printProfilingResults();
|
|
#endif
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Deconstruct the list of 1 or more import files for the specified table,
|
|
// and validate the existence of the specified files.
|
|
// PARAMETERS:
|
|
// job - current job
|
|
// tableNo - table no
|
|
// tableInfo - TableInfo object corresponding to tableNo table.
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::manageImportDataFileList(Job& job, int tableNo, TableInfo* tableInfo)
|
|
{
|
|
std::vector<std::string> loadFilesList;
|
|
bool bUseStdin = false;
|
|
|
|
// Take loadFileName from command line argument override "if" one exists,
|
|
// else we take from the Job xml file
|
|
std::string loadFileName;
|
|
|
|
if (fCmdLineImportFiles.size() > (unsigned)tableNo)
|
|
loadFileName = fCmdLineImportFiles[tableNo];
|
|
else
|
|
loadFileName = job.jobTableList[tableNo].loadFileName;
|
|
|
|
if (fAlternateImportDir == IMPORT_PATH_STDIN)
|
|
{
|
|
bUseStdin = true;
|
|
fLog.logMsg("Using STDIN for input data", MSGLVL_INFO2);
|
|
|
|
int rc = buildImportDataFileList(std::string(), loadFileName, loadFilesList);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
|
|
// BUG 4737 - in Mode 1, all data coming from STDIN, ignore input files
|
|
if ((loadFilesList.size() > 1) && (fBulkMode != BULK_MODE_REMOTE_SINGLE_SRC))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Table " << tableInfo->getTableName()
|
|
<< " specifies multiple "
|
|
"load files; This is not allowed when using STDIN";
|
|
fLog.logMsg(oss.str(), ERR_INVALID_PARAM, MSGLVL_ERROR);
|
|
tableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
return ERR_INVALID_PARAM;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
std::string importDir;
|
|
|
|
if (!fS3Key.empty())
|
|
{
|
|
loadFilesList.push_back(loadFileName);
|
|
}
|
|
else
|
|
{
|
|
if (fAlternateImportDir == IMPORT_PATH_CWD) // current working dir
|
|
{
|
|
char cwdBuf[4096];
|
|
importDir = ::getcwd(cwdBuf, sizeof(cwdBuf));
|
|
importDir += '/';
|
|
}
|
|
else if (fAlternateImportDir.size() > 0) // -f path
|
|
{
|
|
importDir = fAlternateImportDir;
|
|
}
|
|
else // <BULKROOT>/data/import
|
|
{
|
|
importDir = fRootDir;
|
|
importDir += DIR_BULK_IMPORT;
|
|
}
|
|
|
|
// Break down loadFileName into vector of file names in case load-
|
|
// FileName contains a list of files or 1 or more wildcards.
|
|
int rc = buildImportDataFileList(importDir, loadFileName, loadFilesList);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// No filenames is considered a fatal error, except for remote mode2.
|
|
// For remote mode2 we just mark the table as complete since we will
|
|
// have no data to load, but we don't consider this as an error.
|
|
if (loadFilesList.size() == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "No import files found. "
|
|
<< "default dir: " << importDir << " importFileName: " << loadFileName;
|
|
|
|
if (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)
|
|
{
|
|
tableInfo->setLoadFilesInput(bUseStdin, (!fS3Key.empty()), loadFilesList, fS3Host, fS3Key, fS3Secret,
|
|
fS3Bucket, fS3Region);
|
|
tableInfo->markTableComplete();
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
return NO_ERROR;
|
|
}
|
|
else
|
|
{
|
|
fLog.logMsg(oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
|
|
return ERR_FILE_NOT_EXIST;
|
|
}
|
|
}
|
|
|
|
// Verify that input data files exist.
|
|
// We also used to check to make sure the input file is not empty, and
|
|
// if it were, we threw an error at this point, but we removed that
|
|
// check. With shared-nothing, an empty file is now acceptable.
|
|
if (fS3Key.empty())
|
|
{
|
|
for (unsigned ndx = 0; ndx < loadFilesList.size(); ndx++)
|
|
{
|
|
// in addition to being more portable due to the use of boost, this change
|
|
// actually fixes an inherent bug with cpimport reading from a named pipe.
|
|
// Only the first open call gets any data passed through the pipe so the
|
|
// here that used to do an open to test for existence meant cpimport would
|
|
// never get data from the pipe.
|
|
boost::filesystem::path pathFile(loadFilesList[ndx]);
|
|
|
|
if (!boost::filesystem::exists(pathFile))
|
|
{
|
|
ostringstream oss;
|
|
oss << "input data file " << loadFilesList[ndx] << " does not exist";
|
|
fLog.logMsg(oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
|
|
tableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
return ERR_FILE_NOT_EXIST;
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "input data file " << loadFilesList[ndx];
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
tableInfo->setLoadFilesInput(bUseStdin, (!fS3Key.empty()), loadFilesList, fS3Host, fS3Key, fS3Secret,
|
|
fS3Bucket, fS3Region);
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Break up the filename string (which may contain a list of file names)
|
|
// into a vector of strings, with each non-fully-qualified string being
|
|
// prefixed by the path specified by "location".
|
|
// PARAMETERS:
|
|
// location - path prefix
|
|
// filename - list of file names
|
|
// loadFiles- vector of file names extracted from filename string
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::buildImportDataFileList(const std::string& location, const std::string& filename,
|
|
std::vector<std::string>& loadFiles)
|
|
{
|
|
char* filenames = new char[filename.size() + 1];
|
|
strcpy(filenames, filename.c_str());
|
|
|
|
char* str;
|
|
char* token;
|
|
|
|
for (str = filenames;; str = NULL)
|
|
{
|
|
token = strtok(str, ", |");
|
|
|
|
if (token == NULL)
|
|
break;
|
|
|
|
// If the token (filename) is fully qualified, then use the filename
|
|
// as-is, else prepend the location (path prefix)
|
|
boost::filesystem::path p(token);
|
|
std::string fullPath;
|
|
|
|
if (p.has_root_path())
|
|
{
|
|
fullPath = token;
|
|
}
|
|
else
|
|
{
|
|
fullPath = location;
|
|
fullPath += token;
|
|
}
|
|
|
|
|
|
// If running mode2, then support a filename with wildcards
|
|
if (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)
|
|
{
|
|
bool bExpandFileName = false;
|
|
|
|
size_t fpos = fullPath.find_first_of("[*?");
|
|
|
|
if (fpos != std::string::npos)
|
|
{
|
|
bExpandFileName = true;
|
|
}
|
|
else // expand a directory name
|
|
{
|
|
struct stat curStat;
|
|
|
|
if ((stat(fullPath.c_str(), &curStat) == 0) && (S_ISDIR(curStat.st_mode)))
|
|
{
|
|
bExpandFileName = true;
|
|
fullPath += "/*";
|
|
}
|
|
}
|
|
|
|
// If wildcard(s) present use glob() function to expand into a list
|
|
if (bExpandFileName)
|
|
{
|
|
glob_t globBuf;
|
|
memset(&globBuf, 0, sizeof(globBuf));
|
|
int globFlags = GLOB_ERR | GLOB_MARK;
|
|
int rc = glob(fullPath.c_str(), globFlags, 0, &globBuf);
|
|
|
|
if (rc != 0)
|
|
{
|
|
if (rc == GLOB_NOMATCH)
|
|
{
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error expanding filename " << fullPath;
|
|
|
|
if (rc == GLOB_NOSPACE)
|
|
oss << "; out of memory";
|
|
else if (rc == GLOB_ABORTED)
|
|
oss << "; error reading directory";
|
|
else if (rc == GLOB_NOSYS)
|
|
oss << "; globbing not implemented";
|
|
else
|
|
oss << "; rc-" << rc;
|
|
|
|
fLog.logMsg(oss.str(), ERR_FILE_GLOBBING, MSGLVL_ERROR);
|
|
|
|
delete[] filenames;
|
|
return ERR_FILE_GLOBBING;
|
|
}
|
|
}
|
|
|
|
// Include all non-directory files in the import file list
|
|
std::string fullPath2;
|
|
|
|
for (unsigned int k = 0; k < globBuf.gl_pathc; k++)
|
|
{
|
|
fullPath2 = globBuf.gl_pathv[k];
|
|
|
|
if (!fullPath2.empty())
|
|
{
|
|
if (fullPath2[fullPath2.length() - 1] != '/')
|
|
{
|
|
loadFiles.push_back(fullPath2);
|
|
}
|
|
}
|
|
}
|
|
} // wild card present
|
|
else
|
|
{
|
|
loadFiles.push_back(fullPath);
|
|
}
|
|
} // mode2
|
|
else
|
|
{
|
|
loadFiles.push_back(fullPath);
|
|
} // not mode2
|
|
|
|
} // loop through filename tokens
|
|
|
|
delete[] filenames;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Clear table locks, and rollback any tables that are
|
|
// still locked through session manager.
|
|
// PARAMETERS:
|
|
// none
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::rollbackLockedTables()
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
// See if there are any DB tables that were left in a locked state
|
|
bool lockedTableFound = false;
|
|
|
|
for (unsigned i = 0; i < fTableInfo.size(); i++)
|
|
{
|
|
if (fTableInfo[i]->isTableLocked())
|
|
{
|
|
lockedTableFound = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// If 1 or more tables failed to load, then report the lock
|
|
// state of each table we were importing.
|
|
if (lockedTableFound)
|
|
{
|
|
// Report the tables that were successfully loaded
|
|
for (unsigned i = 0; i < fTableInfo.size(); i++)
|
|
{
|
|
if (!fTableInfo[i]->isTableLocked())
|
|
{
|
|
ostringstream oss;
|
|
oss << "Table " << fTableInfo[i]->getTableName() << " was successfully loaded. ";
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
}
|
|
|
|
// Report the tables that were not successfully loaded
|
|
for (unsigned i = 0; i < fTableInfo.size(); i++)
|
|
{
|
|
if (fTableInfo[i]->isTableLocked())
|
|
{
|
|
if (fTableInfo[i]->hasProcessingBegun())
|
|
{
|
|
ostringstream oss;
|
|
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
|
|
<< " was not successfully loaded. Rolling back.";
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Table " << fTableInfo[i]->getTableName() << " (OID-" << fTableInfo[i]->getTableOID() << ")"
|
|
<< " did not start loading. No rollback necessary.";
|
|
fLog.logMsg(oss.str(), MSGLVL_INFO1);
|
|
}
|
|
|
|
rc = rollbackLockedTable(*fTableInfo[i]);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Clear table lock, and rollback the specified table that is still locked.
|
|
// This function only comes into play for a mode3, since the tablelock and
|
|
// bulk rollbacks are managed by the parent (cpipmort file splitter) process
|
|
// in the case of mode1 and mode2 bulk loads.
|
|
// PARAMETERS:
|
|
// tableInfo - the table to be released and rolled back
|
|
// RETURN:
|
|
// NO_ERROR if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoad::rollbackLockedTable(TableInfo& tableInfo)
|
|
{
|
|
return tableInfo.rollbackWork();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Update next autoincrement value for specified column OID.
|
|
// PARAMETERS:
|
|
// columnOid - column OID of interest
|
|
// nextAutoIncVal - next autoincrement value to assign to tableOID
|
|
// RETURN:
|
|
// 0 if success
|
|
// other if fail
|
|
//------------------------------------------------------------------------------
|
|
/* static */
|
|
int BulkLoad::updateNextValue(OID columnOid, uint64_t nextAutoIncVal)
|
|
{
|
|
// The odds of us ever having 2 updateNextValue() calls going on in parallel
|
|
// are slim and none. But it's theoretically possible if we had an import
|
|
// job for 2 tables; so we put a mutex here just in case the DDLClient code
|
|
// won't work well with 2 competing WE_DDLCommandClient objects in the same
|
|
// process (ex: if there is any static data in WE_DDLCommandClient).
|
|
boost::mutex::scoped_lock lock(*fDDLMutex);
|
|
WE_DDLCommandClient ddlCommandClt;
|
|
unsigned int rc = ddlCommandClt.UpdateSyscolumnNextval(columnOid, nextAutoIncVal);
|
|
|
|
return (int)rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
bool BulkLoad::addErrorMsg2BrmUpdater(const std::string& tablename, const ostringstream& oss)
|
|
{
|
|
int size = fTableInfo.size();
|
|
|
|
if (size == 0)
|
|
return false;
|
|
|
|
for (int tableId = 0; tableId < size; tableId++)
|
|
{
|
|
if (fTableInfo[tableId]->getTableName() == tablename)
|
|
{
|
|
fTableInfo[tableId]->fBRMReporter.addToErrMsgEntry(oss.str());
|
|
return true;
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// DESCRIPTION:
|
|
// Set job UUID. Used by Query Telemetry to identify a unique import
|
|
// job across PMs
|
|
// PARAMETERS:
|
|
// jobUUID - the job UUID
|
|
// RETURN:
|
|
// void
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoad::setJobUUID(const std::string& jobUUID)
|
|
{
|
|
fUUID = boost::uuids::string_generator()(jobUUID);
|
|
}
|
|
|
|
void BulkLoad::setDefaultJobUUID()
|
|
{
|
|
if (fUUID.is_nil())
|
|
fUUID = boost::uuids::random_generator()();
|
|
}
|
|
|
|
} // namespace WriteEngine
|