1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

Feature/mcol 4882 cpimport skip rows (#3594)

* feat(cpimport): MCOL-4882 add a parameter to skip header rows

* chore(cpimport): MCOL-4882 Use boost::program_options to arguments parsing

* feat(cpimport.bin): MCOL-4882 Add missing changes

* add test

* fix clang

* add missing cmdline argument

* fix bug

* Fix double lines skipping

* Fix incorrect --silent (-N) parsing

* fix default --max-errors processing

* fix overwriting default username

* move initialization to members declaration
This commit is contained in:
Alexey Antipovsky
2025-07-11 21:35:43 +02:00
committed by GitHub
parent 1c8d5ec04e
commit 78c1b5034d
30 changed files with 1379 additions and 1469 deletions

View File

@ -9,6 +9,7 @@ set(we_bulk_STAT_SRCS
we_bulkload.cpp
we_bulkloadbuffer.cpp
we_bulkstatus.cpp
we_cmdargs.cpp
we_colopbulk.cpp
we_colbuf.cpp
we_colbufcompressed.cpp
@ -28,7 +29,7 @@ set(we_bulk_STAT_SRCS
add_definitions(-D_FILE_OFFSET_BITS=64)
columnstore_static_library(we_bulk ${we_bulk_STAT_SRCS})
columnstore_link(we_bulk ${NETSNMP_LIBRARIES} loggingcpp)
columnstore_link(we_bulk ${NETSNMP_LIBRARIES} loggingcpp boost_program_options)
remove_definitions(-D_FILE_OFFSET_BITS=64)

View File

@ -49,6 +49,7 @@
#include "dataconvert.h"
#include "mcsconfig.h"
#include "mariadb_my_sys.h"
#include "we_cmdargs.h"
using namespace std;
using namespace WriteEngine;
@ -56,8 +57,8 @@ using namespace execplan;
namespace
{
char* pgmName = 0;
const std::string IMPORT_PATH_CWD(".");
unique_ptr<WECmdArgs> cmdArgs;
bool bDebug = false;
uint32_t cpimportJobId = 0;
@ -88,103 +89,6 @@ const char* taskLabels[] = {"",
"processing data"};
} // namespace
//------------------------------------------------------------------------------
// Print command line usage
//------------------------------------------------------------------------------
void printUsage()
{
cout << endl
<< "Simple usage using positional parameters "
"(no XML job file):"
<< endl
<< " cpimport.bin dbName tblName [loadFile] [-j jobID] " << endl
<< " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl
<< " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl
<< " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] "
"[-d debugLevel] [-i] "
<< endl
<< " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl
<< " [-U username]" << endl
<< endl;
cout << endl
<< "Traditional usage without positional parameters "
"(XML job file required):"
<< endl
<< " cpimport.bin -j jobID " << endl
<< " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl
<< " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl
<< " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] "
"[-d debugLevel] [-i] "
<< endl
<< " [-p path] [-l loadFile]" << endl
<< " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl
<< " [-U username]" << endl
<< endl;
cout << " Positional parameters:" << endl
<< " dbName Name of database to load" << endl
<< " tblName Name of table to load" << endl
<< " loadFile Optional input file name in current directory, " << "unless a fully" << endl
<< " qualified name is given. If not given, " << "input read from stdin." << endl
<< endl;
cout << " Options:" << endl
<< " -b Number of read buffers" << endl
<< " -c Application read buffer size (in bytes)" << endl
<< " -d Print different level (1-3) debug message " << endl
<< " -e Maximum number of allowable errors per table" << endl
<< " -f Data file directory path; " << endl
<< " In simple usage:" << endl
<< " Default is current working directory." << endl
<< " -f option only applies if loadFile is specified." << endl
<< " In traditional usage: " << endl
<< " Default is <BulkRoot>/data/import." << endl
<< " 'STDIN' (all caps) redirects input from stdin." << endl
<< " -h Print this message" << endl
<< " -i Print extended info to console, else this info only goes "
"to log file."
<< endl
<< " -j Job id. In simple usage, default is the table OID." << endl
<< " -l Name of input file to be loaded, relative to -f path," << endl
<< " unless a fully qualified input file name is given." << endl
<< " -n NullOption (0-treat the string NULL as data (default);" << endl
<< " 1-treat the string NULL as a NULL value)" << endl
<< " -p Path for XML job description file" << endl
<< " -r Number of readers" << endl
<< " -s 'c' is the delimiter between column values" << endl
<< " -w Number of parsers" << endl
<< " -B I/O library read buffer size (in bytes)" << endl
<< " -E Enclosed by character if field values are enclosed" << endl
<< " -C Escape character used in conjunction with 'enclosed by' " << "character," << endl
<< " or as part of NULL escape sequence ('\\N'); default is '\\'" << endl
<< " -I Binary import; binaryOpt 1-import NULL values" << endl
<< " 2-saturate NULL values" << endl
<< " -S Treat string truncations as errors" << endl
<< " -D Disable timeout when waiting for table lock" << endl
<< " -N Disable console output" << endl
<< " -L send *.err and *.bad (reject) files here" << endl
<< " -T Timezone used for TIMESTAMP datatype" << endl
<< " Possible values: \"SYSTEM\" (default)" << endl
<< " : Offset in the form +/-HH:MM" << endl
<< endl
<< " -y S3 Authentication Key (for S3 imports)" << endl
<< " -K S3 Authentication Secret (for S3 imports)" << endl
<< " -t S3 Bucket (for S3 imports)" << endl
<< " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl
<< " -g S3 Regions (for S3 imports)" << endl
<< " -U username of new data files owner. Default is mysql" << endl;
cout << " Example1:" << endl
<< " cpimport.bin -j 1234" << endl
<< " Example2: Some column values are enclosed within double quotes." << endl
<< " cpimport.bin -j 3000 -E '\"'" << endl
<< " Example3: Import a nation table without a Job XML file" << endl
<< " cpimport.bin -j 301 tpch nation nation.tbl" << endl;
exit(EXIT_SUCCESS);
}
//------------------------------------------------------------------------------
// Signal handler to catch SIGTERM signal to terminate the process
//------------------------------------------------------------------------------
@ -227,40 +131,6 @@ void handleSigAbrt(int /*i*/)
BulkStatus::setJobStatus(EXIT_FAILURE);
}
//------------------------------------------------------------------------------
// If error occurs during startup, this function is called to log the specified
// message and terminate the process.
//------------------------------------------------------------------------------
void startupError(const std::string& errMsg, bool showHint)
{
BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId);
// Log to console
if (!BulkLoad::disableConsoleOutput())
cerr << errMsg << endl;
if (showHint)
{
std::ostringstream oss;
oss << "Try '" << pgmName << " -h' for more information.";
if (!BulkLoad::disableConsoleOutput())
cerr << oss.str() << endl;
}
// Log to syslog
logging::Message::Args errMsgArgs;
errMsgArgs.add(errMsg);
SimpleSysLog::instance()->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0087);
std::string jobIdStr("0");
logging::Message::Args endMsgArgs;
endMsgArgs.add(jobIdStr);
endMsgArgs.add("FAILED");
SimpleSysLog::instance()->logMsg(endMsgArgs, logging::LOG_TYPE_INFO, logging::M0082);
exit(EXIT_FAILURE);
}
//------------------------------------------------------------------------------
// Initialize signal handling
//------------------------------------------------------------------------------
@ -307,540 +177,6 @@ void setupSignalHandlers()
sigaction(SIGABRT, &act, 0);
}
//------------------------------------------------------------------------------
// Parse the command line arguments
//------------------------------------------------------------------------------
void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJobIdStr,
std::string& sXMLJobDir, std::string& sModuleIDandPID, bool& bLogInfo2ToConsole,
std::string& xmlGenSchema, std::string& xmlGenTable, bool& bValidateColumnList)
{
std::string importPath;
std::string rptFileName;
int option;
bool bImportFileArg = false;
BulkModeType bulkMode = BULK_MODE_LOCAL;
std::string jobUUID;
while ((option = getopt(argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:y:K:t:H:g:U:")) !=
EOF)
{
switch (option)
{
case 'b': // -b: no. of read buffers
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
{
startupError(std::string("Option -b is invalid or out of range."), true);
}
int noOfReadBuffers = lValue;
curJob.setReadBufferCount(noOfReadBuffers);
break;
}
case 'c': // -c: read buffer size
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
{
startupError(std::string("Option -c is invalid or out of range."), true);
}
int readBufferSize = lValue;
curJob.setReadBufferSize(readBufferSize);
break;
}
case 'd': // -d: debug level
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
{
startupError(std::string("Option -d is invalid or out of range."), true);
}
int debugLevel = lValue;
if (debugLevel > 0 && debugLevel <= 3)
{
bDebug = true;
curJob.setAllDebug((DebugLevel)debugLevel);
if (!BulkLoad::disableConsoleOutput())
cout << "\nDebug level is set to " << debugLevel << endl;
}
break;
}
case 'e': // -e: max allowed errors
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX))
{
startupError(std::string("Option -e is invalid or out of range."), true);
}
int maxErrors = lValue;
curJob.setMaxErrorCount(maxErrors);
break;
}
case 'f': // -f: import path
{
importPath = optarg;
std::string setAltErrMsg;
if (curJob.setAlternateImportDir(importPath, setAltErrMsg) != NO_ERROR)
startupError(setAltErrMsg, false);
break;
}
case 'h': // -h: help
{
printUsage();
break;
}
case 'i': // -i: log info to console
{
bLogInfo2ToConsole = true;
break;
}
case 'j': // -j: jobID
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX))
{
startupError(std::string("Option -j is invalid or out of range."), true);
}
sJobIdStr = optarg;
break;
}
case 'k': // -k: hidden option to keep (not delete)
{
// bulk rollback meta-data files
curJob.setKeepRbMetaFiles(true);
break;
}
case 'l': // -l: import load file(s)
{
bImportFileArg = true;
curJob.addToCmdLineImportFileList(std::string(optarg));
break;
}
case 'm': // -m: bulk load mode
{
bulkMode = (BulkModeType)atoi(optarg);
if ((bulkMode != BULK_MODE_REMOTE_SINGLE_SRC) && (bulkMode != BULK_MODE_REMOTE_MULTIPLE_SRC) &&
(bulkMode != BULK_MODE_LOCAL))
{
startupError(std::string("Invalid bulk mode; can be 1,2, or 3"), true);
}
break;
}
case 'n': // -n: treat "NULL" as null
{
int nullStringMode = atoi(optarg);
if ((nullStringMode != 0) && (nullStringMode != 1))
{
startupError(std::string("Invalid NULL option; value can be 0 or 1"), true);
}
if (nullStringMode)
curJob.setNullStringMode(true);
else
curJob.setNullStringMode(false);
break;
}
case 'p': // -p: Job XML path
{
sXMLJobDir = optarg;
break;
}
case 'r': // -r: num read threads
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
{
startupError(std::string("Option -r is invalid or out of range."), true);
}
int numOfReaders = lValue;
#if !defined(__LP64__) && !defined(_MSC_VER)
if (numOfReaders > 1)
{
cerr << "Note: resetting number of read threads to maximum" << endl;
numOfReaders = 1;
}
#endif
curJob.setNoOfReadThreads(numOfReaders);
if (!BulkLoad::disableConsoleOutput())
cout << "number of read threads : " << numOfReaders << endl;
break;
}
case 's': // -s: column delimiter
{
char delim;
if (!strcmp(optarg, "\\t"))
{
delim = '\t';
if (!BulkLoad::disableConsoleOutput())
cout << "Column delimiter : " << "\\t" << endl;
}
else
{
delim = optarg[0];
if (delim == '\t') // special case to print a <TAB>
{
if (!BulkLoad::disableConsoleOutput())
cout << "Column delimiter : '\\t'" << endl;
}
else
{
if (!BulkLoad::disableConsoleOutput())
cout << "Column delimiter : " << delim << endl;
}
}
curJob.setColDelimiter(delim);
break;
}
case 'u': // -u: import job UUID
{
jobUUID = optarg;
curJob.setJobUUID(jobUUID);
break;
}
case 'w': // -w: num parse threads
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
{
startupError(std::string("Option -w is invalid or out of range."), true);
}
int numOfParser = lValue;
#if !defined(__LP64__) && !defined(_MSC_VER)
if (numOfParser > 3)
{
cerr << "Note: resetting number of parse threads to maximum" << endl;
numOfParser = 3;
}
#endif
curJob.setNoOfParseThreads(numOfParser);
if (!BulkLoad::disableConsoleOutput())
cout << "number of parse threads : " << numOfParser << endl;
break;
}
case 'B': // -B: setvbuf read size
{
errno = 0;
long lValue = strtol(optarg, 0, 10);
if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX))
{
startupError(std::string("Option -B is invalid or out of range."), true);
}
int vbufReadSize = lValue;
curJob.setVbufReadSize(vbufReadSize);
break;
}
case 'C': // -C: enclosed escape char
{
curJob.setEscapeChar(optarg[0]);
if (!BulkLoad::disableConsoleOutput())
cout << "Escape Character : " << optarg[0] << endl;
break;
}
case 'E': // -E: enclosed by char
{
curJob.setEnclosedByChar(optarg[0]);
if (!BulkLoad::disableConsoleOutput())
cout << "Enclosed by Character : " << optarg[0] << endl;
break;
}
case 'I': // -I: Binary import mode
{
ImportDataMode importMode = (ImportDataMode)atoi(optarg);
if ((importMode != IMPORT_DATA_BIN_ACCEPT_NULL) && (importMode != IMPORT_DATA_BIN_SAT_NULL))
{
startupError(std::string("Invalid binary import option; value can be 1"
"(accept NULL values) or 2(saturate NULL values)"),
true);
}
curJob.setImportDataMode(importMode);
break;
}
case 'L': // -L: Error log directory
{
curJob.setErrorDir(optarg);
break;
}
case 'P': // -P: Calling moduleid
{
// and PID
sModuleIDandPID = optarg;
break;
}
case 'R': // -R: distributed mode
{
// report file
rptFileName = optarg;
break;
}
case 'S': // -S: Char & VarChar data
{
// greater than col def
curJob.setTruncationAsError(true); // are reported as err
break;
}
case 'T':
{
std::string timeZone = optarg;
long offset;
if (dataconvert::timeZoneToOffset(timeZone.c_str(), timeZone.size(), &offset))
{
startupError(std::string("Value for option -T is invalid"), true);
}
curJob.setTimeZone(offset);
break;
}
case 'X': // Hidden extra options
{
if (!strcmp(optarg, "AllowMissingColumn"))
bValidateColumnList = false;
break;
}
case 'D': // disable table lock waiting timeout
{
curJob.disableTimeOut(true);
break;
}
case 'N': // silent the output to console
{
BulkLoad::disableConsoleOutput(true);
break;
}
case 'y':
{
curJob.setS3Key(optarg);
break;
}
case 'K':
{
curJob.setS3Secret(optarg);
break;
}
case 't':
{
curJob.setS3Bucket(optarg);
break;
}
case 'H':
{
curJob.setS3Host(optarg);
break;
}
case 'g':
{
curJob.setS3Region(optarg);
break;
}
case 'U':
{
curJob.setUsername(optarg);
break;
}
default:
{
ostringstream oss;
oss << "Unrecognized command line option (" << option << ")";
startupError(oss.str(), true);
}
}
}
curJob.setDefaultJobUUID();
// Inconsistent to specify -f STDIN with -l importFile
if ((bImportFileArg) && (importPath == "STDIN"))
{
startupError(std::string("-f STDIN is invalid with -l importFile."), true);
}
// If distributed mode, make sure report filename is specified and that we
// can create the file using the specified path.
if ((bulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (bulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC))
{
if (rptFileName.empty())
{
startupError(std::string("Bulk modes 1 and 2 require -R rptFileName."), true);
}
else
{
std::ofstream rptFile(rptFileName.c_str());
if (rptFile.fail())
{
std::ostringstream oss;
oss << "Unable to open report file " << rptFileName;
startupError(oss.str(), false);
}
rptFile.close();
}
curJob.setBulkLoadMode(bulkMode, rptFileName);
}
// Get positional arguments, User can provide:
// 1. no positional parameters
// 2. Two positional parameters (schema and table names)
// 3. Three positional parameters (schema, table, and import file name)
if (optind < argc) // see if db schema name is given
{
xmlGenSchema = argv[optind]; // 1st pos parm
optind++;
if (optind < argc) // see if table name is given
{
// Validate invalid options in conjunction with 2-3 positional
// parameter mode, which means we are using temp Job XML file.
if (bImportFileArg)
{
startupError(std::string("-l importFile is invalid with positional parameters"), true);
}
if (!sXMLJobDir.empty())
{
startupError(std::string("-p path is invalid with positional parameters."), true);
}
if (importPath == "STDIN")
{
startupError(std::string("-f STDIN is invalid with positional parameters."), true);
}
xmlGenTable = argv[optind]; // 2nd pos parm
optind++;
if (optind < argc) // see if input file name is given
{
// 3rd pos parm
curJob.addToCmdLineImportFileList(std::string(argv[optind]));
// Default to CWD if loadfile name given w/o -f path
if (importPath.empty())
{
std::string setAltErrMsg;
if (curJob.setAlternateImportDir(std::string("."), setAltErrMsg) != NO_ERROR)
startupError(setAltErrMsg, false);
}
}
else
{
// Invalid to specify -f if no load file name given
if (!importPath.empty())
{
startupError(std::string("-f requires 3rd positional parameter (load file name)."), true);
}
// Default to STDIN if no import file name given
std::string setAltErrMsg;
if (curJob.setAlternateImportDir(std::string("STDIN"), setAltErrMsg) != NO_ERROR)
startupError(setAltErrMsg, false);
}
}
else
{
startupError(std::string("No table name specified with schema."), true);
}
}
else
{
// JobID is a required parameter with no positional parm mode,
// because we need the jobid to identify the input job xml file.
if (sJobIdStr.empty())
{
startupError(std::string("No JobID specified."), true);
}
}
}
//------------------------------------------------------------------------------
// Print the path of the input load file(s), and the name of the job xml file.
//------------------------------------------------------------------------------
@ -857,8 +193,7 @@ void printInputSource(const std::string& alternateImportDir, const std::string&
if (alternateImportDir == IMPORT_PATH_CWD)
{
char cwdBuf[4096];
char* bufPtr = &cwdBuf[0];
bufPtr = ::getcwd(cwdBuf, sizeof(cwdBuf));
char* bufPtr = ::getcwd(cwdBuf, sizeof(cwdBuf));
if (!(BulkLoad::disableConsoleOutput()))
cout << "Input file(s) will be read from : " << bufPtr << endl;
@ -900,14 +235,14 @@ void getTableOID(const std::string& xmlGenSchema, const std::string& xmlGenTable
std::ostringstream oss;
oss << "Unable to set default JobID; " << "Error getting OID for table " << tbl.schema << '.' << tbl.table
<< ": " << ex.what();
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
catch (...)
{
std::ostringstream oss;
oss << "Unable to set default JobID; " << "Unknown error getting OID for table " << tbl.schema << '.'
<< tbl.table;
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
std::ostringstream oss;
@ -950,7 +285,7 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob
{
std::ostringstream oss;
oss << "cpimport.bin error creating temporary Job XML file name: " << xmlErrMsg;
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
printInputSource(alternateImportDir, sFileName.string(), S3Bucket);
@ -970,7 +305,7 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob
{
std::ostringstream oss;
oss << "No columns for " << xmlGenSchema << '.' << xmlGenTable;
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
}
catch (runtime_error& ex)
@ -979,7 +314,7 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob
oss << "cpimport.bin runtime exception constructing temporary "
"Job XML file: "
<< ex.what();
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
catch (exception& ex)
{
@ -987,13 +322,13 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob
oss << "cpimport.bin exception constructing temporary "
"Job XML file: "
<< ex.what();
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
catch (...)
{
startupError(std::string("cpimport.bin "
"unknown exception constructing temporary Job XML file"),
false);
cmdArgs->startupError(std::string("cpimport.bin "
"unknown exception constructing temporary Job XML file"),
false);
}
genProc.writeXMLFile(sFileName.string());
@ -1009,9 +344,9 @@ void verifyNode()
// Validate running on a PM
if (localModuleType != "pm")
{
startupError(std::string("Exiting, "
"cpimport.bin can only be run on a PM node"),
true);
cmdArgs->startupError(std::string("Exiting, "
"cpimport.bin can only be run on a PM node"),
true);
}
}
@ -1049,34 +384,22 @@ int main(int argc, char** argv)
setlocale(LC_NUMERIC, "C");
// Initialize singleton instance of syslogging
if (argc > 0)
pgmName = argv[0];
logging::IDBErrorInfo::instance();
SimpleSysLog::instance()->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_BULK));
// Log job initiation unless user is asking for help
cmdArgs = make_unique<WECmdArgs>(argc, argv);
std::ostringstream ossArgList;
bool bHelpFlag = false;
for (int m = 1; m < argc; m++)
{
if (strcmp(argv[m], "-h") == 0)
{
bHelpFlag = true;
break;
}
if (!strcmp(argv[m], "\t")) // special case to print a <TAB>
ossArgList << "'\\t'" << ' ';
else
ossArgList << argv[m] << ' ';
}
if (!bHelpFlag)
{
logInitiateMsg(ossArgList.str().c_str());
}
logInitiateMsg(ossArgList.str().c_str());
BulkLoad curJob;
string sJobIdStr;
@ -1099,8 +422,8 @@ int main(int argc, char** argv)
task = TASK_CMD_LINE_PARSING;
string xmlGenSchema;
string xmlGenTable;
parseCmdLineArgs(argc, argv, curJob, sJobIdStr, sXMLJobDir, sModuleIDandPID, bLogInfo2ToConsole,
xmlGenSchema, xmlGenTable, bValidateColumnList);
cmdArgs->fillParams(curJob, sJobIdStr, sXMLJobDir, sModuleIDandPID, bLogInfo2ToConsole, xmlGenSchema,
xmlGenTable, bValidateColumnList);
//--------------------------------------------------------------------------
// Save basename portion of program path from argv[0]
@ -1154,9 +477,9 @@ int main(int argc, char** argv)
if (!BRMWrapper::getInstance()->isSystemReady())
{
startupError(std::string("System is not ready. Verify that ColumnStore is up and ready "
"before running cpimport."),
false);
cmdArgs->startupError(std::string("System is not ready. Verify that ColumnStore is up and ready "
"before running cpimport."),
false);
}
if (bDebug)
@ -1173,7 +496,7 @@ int main(int argc, char** argv)
WErrorCodes ec;
std::ostringstream oss;
oss << ec.errorString(brmReadWriteStatus) << " cpimport.bin is terminating.";
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
if (bDebug)
@ -1190,7 +513,7 @@ int main(int argc, char** argv)
WErrorCodes ec;
std::ostringstream oss;
oss << ec.errorString(brmShutdownPending) << " cpimport.bin is terminating.";
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
if (bDebug)
@ -1207,7 +530,7 @@ int main(int argc, char** argv)
WErrorCodes ec;
std::ostringstream oss;
oss << ec.errorString(brmSuspendPending) << " cpimport.bin is terminating.";
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
if (bDebug)
@ -1268,7 +591,7 @@ int main(int argc, char** argv)
{
std::ostringstream oss;
oss << "cpimport.bin error creating Job XML file name: " << xmlErrMsg;
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
printInputSource(curJob.getAlternateImportDir(), sFileName.string(), curJob.getS3Bucket());
@ -1300,13 +623,14 @@ int main(int argc, char** argv)
}
rc = BRMWrapper::getInstance()->newCpimportJob(cpimportJobId);
// TODO kemm: pass cpimportJobId to WECmdArgs
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error in creating new cpimport job on Controller node; " << ec.errorString(rc)
<< "; cpimport is terminating.";
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
//--------------------------------------------------------------------------
@ -1321,7 +645,7 @@ int main(int argc, char** argv)
WErrorCodes ec;
std::ostringstream oss;
oss << "Error in loading job information; " << ec.errorString(rc) << "; cpimport.bin is terminating.";
startupError(oss.str(), false);
cmdArgs->startupError(oss.str(), false);
}
if (bDebug)
@ -1353,7 +677,7 @@ int main(int argc, char** argv)
if (task != TASK_PROCESS_DATA)
{
startupError(exceptionMsg, false);
cmdArgs->startupError(exceptionMsg, false);
}
rc = ERR_UNKNOWN;
@ -1379,7 +703,7 @@ int main(int argc, char** argv)
failMsg += exceptionMsg;
}
endMsgArgs.add(failMsg.c_str());
endMsgArgs.add(failMsg);
}
else
{

View File

@ -72,7 +72,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix
namespace WriteEngine
{
/* static */ std::vector<std::shared_ptr<TableInfo>> BulkLoad::fTableInfo;
/* static */ boost::mutex* BulkLoad::fDDLMutex = 0;
/* static */ boost::mutex* BulkLoad::fDDLMutex = new boost::mutex();
/* static */ const std::string BulkLoad::DIR_BULK_JOB("job");
/* static */ const std::string BulkLoad::DIR_BULK_TEMP_JOB("tmpjob");
@ -140,35 +140,8 @@ struct CancellationThread
// Constructor
//------------------------------------------------------------------------------
BulkLoad::BulkLoad()
: fColOp(new ColumnOpBulk())
, fColDelim('\0')
, fNoOfBuffers(-1)
, fBufferSize(-1)
, fFileVbufSize(-1)
, fMaxErrors(-1)
, fNoOfParseThreads(3)
, fNoOfReadThreads(1)
, fKeepRbMetaFiles(false)
, fNullStringMode(false)
, fEnclosedByChar('\0')
, // not enabled unless user overrides enclosed by char
fEscapeChar('\0')
, fTotalTime(0.0)
, fBulkMode(BULK_MODE_LOCAL)
, fbTruncationAsError(false)
, fImportDataMode(IMPORT_DATA_TEXT)
, fbContinue(false)
, fDisableTimeOut(false)
, fUUID(boost::uuids::nil_generator()())
, fTimeZone(dataconvert::systemTimeZoneOffset())
, fUsername("mysql") // MCOL-4328 default file owner
{
fTableInfo.clear();
setDebugLevel(DEBUG_0);
fDDLMutex = new boost::mutex();
memset(&fStartTime, 0, sizeof(timeval));
memset(&fEndTime, 0, sizeof(timeval));
}
//------------------------------------------------------------------------------
@ -540,6 +513,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tabl
tableInfo->setImportDataMode(fImportDataMode);
tableInfo->setTimeZone(fTimeZone);
tableInfo->setJobUUID(fUUID);
tableInfo->setSkipRows(fSkipRows);
// MCOL-4328 Get username gid and uid if they are set
// We inject uid and gid into TableInfo and All ColumnInfo-s later.
@ -1002,6 +976,11 @@ int BulkLoad::processJob()
fEscapeChar = '\\';
}
if (fSkipRows == 0)
{
fSkipRows = curJob.fSkipRows;
}
// std::cout << "bulkload::fEnclosedByChar<" << fEnclosedByChar << '>' <<
// std::endl << "bulkload::fEscapeChar<" << fEscapeChar << '>' << std::endl;

View File

@ -29,7 +29,7 @@
#include <sys/time.h>
#include <we_log.h>
#include <we_colop.h>
#include <we_colopbulk.h>
#include <we_xmljob.h>
#include <we_convertor.h>
#include <writeengine.h>
@ -48,12 +48,7 @@
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/uuid/uuid.hpp>
#if 0 // defined(_MSC_VER) && defined(WE_BULKLOAD_DLLEXPORT)
#define EXPORT __declspec(dllexport)
#else
#define EXPORT
#endif
#include <boost/uuid/nil_generator.hpp>
/** Namespace WriteEngine */
namespace WriteEngine
@ -65,18 +60,18 @@ class BulkLoad : public FileOp
/**
* @brief BulkLoad constructor
*/
EXPORT BulkLoad();
BulkLoad();
/**
* @brief BulkLoad destructor
*/
EXPORT ~BulkLoad() override;
~BulkLoad() override;
/**
* @brief Load job information
*/
EXPORT int loadJobInfo(const std::string& fullFileName, bool bUseTempJobFile, int argc, char** argv,
bool bLogInfo2ToConsole, bool bValidateColumnList);
int loadJobInfo(const std::string& fullFileName, bool bUseTempJobFile, int argc, char** argv,
bool bLogInfo2ToConsole, bool bValidateColumnList);
/**
* @brief Pre process jobs to validate and assign values to the job structure
@ -91,7 +86,7 @@ class BulkLoad : public FileOp
/**
* @brief Process job
*/
EXPORT int processJob();
int processJob();
/**
* @brief Set Debug level for this BulkLoad object and any data members
@ -126,12 +121,13 @@ class BulkLoad : public FileOp
return fUUID;
}
EXPORT int setAlternateImportDir(const std::string& loadDir, std::string& errMsg);
int setAlternateImportDir(const std::string& loadDir, std::string& errMsg);
void setImportDataMode(ImportDataMode importMode);
void setColDelimiter(char delim);
void setBulkLoadMode(BulkModeType bulkMode, const std::string& rptFileName);
void setEnclosedByChar(char enChar);
void setEscapeChar(char esChar);
void setSkipRows(size_t skipRows);
void setKeepRbMetaFiles(bool keepMeta);
void setMaxErrorCount(unsigned int maxErrors);
void setNoOfParseThreads(int parseThreads);
@ -181,7 +177,7 @@ class BulkLoad : public FileOp
//--------------------------------------------------------------------------
XMLJob fJobInfo; // current job information
boost::scoped_ptr<ColumnOp> fColOp; // column operation
boost::scoped_ptr<ColumnOp> fColOp{new ColumnOpBulk()}; // column operation
std::string fRootDir; // job process root directory
std::string fJobFileName; // job description file name
@ -189,49 +185,50 @@ class BulkLoad : public FileOp
Log fLog; // logger
int fNumOfParser; // total number of parser
char fColDelim; // delimits col values within a row
char fColDelim{0}; // delimits col values within a row
int fNoOfBuffers; // Number of read buffers
int fBufferSize; // Read buffer size
int fFileVbufSize; // Internal file system buffer size
long long fMaxErrors; // Max allowable errors per job
int fNoOfBuffers{-1}; // Number of read buffers
int fBufferSize{-1}; // Read buffer size
int fFileVbufSize{-1}; // Internal file system buffer size
long long fMaxErrors{-1}; // Max allowable errors per job
std::string fAlternateImportDir; // Alternate bulk import directory
std::string fErrorDir; // Opt. where error records record
std::string fProcessName; // Application process name
static std::vector<std::shared_ptr<TableInfo>> fTableInfo; // Vector of Table information
int fNoOfParseThreads; // Number of parse threads
int fNoOfReadThreads; // Number of read threads
int fNoOfParseThreads{3}; // Number of parse threads
int fNoOfReadThreads{1}; // Number of read threads
boost::thread_group fReadThreads; // Read thread group
boost::thread_group fParseThreads; // Parse thread group
boost::mutex fReadMutex; // Manages table selection by each
// read thread
// read thread
boost::mutex fParseMutex; // Manages table/buffer/column
// selection by each parsing thread
BRM::TxnID fTxnID; // TransID acquired from SessionMgr
bool fKeepRbMetaFiles; // Keep/delete bulkRB metadata files
bool fNullStringMode; // Treat "NULL" as NULL value
char fEnclosedByChar; // Char used to enclose column value
char fEscapeChar; // Escape char within enclosed value
timeval fStartTime; // job start time
timeval fEndTime; // job end time
double fTotalTime; // elapsed time for current phase
std::vector<std::string> fCmdLineImportFiles; // Import Files from cmd line
BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3)
std::string fBRMRptFileName; // Name of distributed mode rpt file
bool fbTruncationAsError; // Treat string truncation as error
ImportDataMode fImportDataMode; // Importing text or binary data
bool fbContinue; // true when read and parse r running
// selection by each parsing thread
BRM::TxnID fTxnID; // TransID acquired from SessionMgr
bool fKeepRbMetaFiles{false}; // Keep/delete bulkRB metadata files
bool fNullStringMode{false}; // Treat "NULL" as NULL value
char fEnclosedByChar{0}; // Char used to enclose column value
char fEscapeChar{0}; // Escape char within enclosed value
size_t fSkipRows{0}; // Header rows to skip
timeval fStartTime{0, 0}; // job start time
timeval fEndTime{0, 0}; // job end time
double fTotalTime{0.0}; // elapsed time for current phase
std::vector<std::string> fCmdLineImportFiles; // Import Files from cmd line
BulkModeType fBulkMode{BULK_MODE_LOCAL}; // Distributed bulk mode (1,2, or 3)
std::string fBRMRptFileName; // Name of distributed mode rpt file
bool fbTruncationAsError{false}; // Treat string truncation as error
ImportDataMode fImportDataMode{IMPORT_DATA_TEXT}; // Importing text or binary data
bool fbContinue{false}; // true when read and parse r running
//
static boost::mutex* fDDLMutex; // Insure only 1 DDL op at a time
EXPORT static const std::string DIR_BULK_JOB; // Bulk job directory
EXPORT static const std::string DIR_BULK_TEMP_JOB; // Dir for tmp job files
static const std::string DIR_BULK_JOB; // Bulk job directory
static const std::string DIR_BULK_TEMP_JOB; // Dir for tmp job files
static const std::string DIR_BULK_IMPORT; // Bulk job import dir
static const std::string DIR_BULK_LOG; // Bulk job log directory
bool fDisableTimeOut; // disable timeout when waiting for table lock
boost::uuids::uuid fUUID; // job UUID
bool fDisableTimeOut{false}; // disable timeout when waiting for table lock
boost::uuids::uuid fUUID{boost::uuids::nil_generator()()}; // job UUID
static bool fNoConsoleOutput; // disable output to console
long fTimeZone; // Timezone offset (in seconds) relative to UTC,
long fTimeZone{dataconvert::systemTimeZoneOffset()};// Timezone offset (in seconds) relative to UTC,
// to use for TIMESTAMP data type. For example,
// for EST which is UTC-5:00, offset will be -18000s.
std::string fS3Key; // S3 Key
@ -239,7 +236,7 @@ class BulkLoad : public FileOp
std::string fS3Host; // S3 Host
std::string fS3Bucket; // S3 Bucket
std::string fS3Region; // S3 Region
std::string fUsername; // data files owner name mysql by default
std::string fUsername{"mysql"}; // data files owner name mysql by default
//--------------------------------------------------------------------------
// Private Functions
@ -417,6 +414,11 @@ inline void BulkLoad::setEscapeChar(char esChar)
fEscapeChar = esChar;
}
inline void BulkLoad::setSkipRows(size_t skipRows)
{
fSkipRows = skipRows;
}
inline void BulkLoad::setImportDataMode(ImportDataMode importMode)
{
fImportDataMode = importMode;

View File

@ -2047,8 +2047,8 @@ int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, int tokenPos, RID s
}
int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
size_t* parse_length, RID& totalReadRows, RID& correctTotalRows,
const boost::ptr_vector<ColumnInfo>& columnsInfo,
size_t* parse_length, size_t& skipRows, RID& totalReadRows,
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
unsigned int allowedErrCntThisCall)
{
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
@ -2119,7 +2119,7 @@ int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const ch
if (fImportDataMode == IMPORT_DATA_TEXT)
{
tokenize(columnsInfo, allowedErrCntThisCall);
tokenize(columnsInfo, allowedErrCntThisCall, skipRows);
}
else
{
@ -2150,8 +2150,9 @@ int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const ch
// correctTotalRows (input/output) - total valid row count from tokenize()
// (cumulative)
//------------------------------------------------------------------------------
int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalReadRows,
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows,
RID& totalReadRows, RID& correctTotalRows,
const boost::ptr_vector<ColumnInfo>& columnsInfo,
unsigned int allowedErrCntThisCall)
{
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
@ -2164,10 +2165,10 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand
{
memcpy(fData, fOverflowBuf, fOverflowSize);
if (fOverflowBuf != NULL)
if (fOverflowBuf != nullptr)
{
delete[] fOverflowBuf;
fOverflowBuf = NULL;
fOverflowBuf = nullptr;
}
}
@ -2219,7 +2220,7 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand
if (fImportDataMode == IMPORT_DATA_TEXT)
{
tokenize(columnsInfo, allowedErrCntThisCall);
tokenize(columnsInfo, allowedErrCntThisCall, skipRows);
}
else
{
@ -2276,7 +2277,7 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand
// depending on whether the user has enabled the "enclosed by" feature.
//------------------------------------------------------------------------------
void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
unsigned int allowedErrCntThisCall)
unsigned int allowedErrCntThisCall, size_t& skipRows)
{
unsigned offset = 0; // length of field
unsigned curCol = 0; // dest db column counter within a row
@ -2334,6 +2335,15 @@ void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
while (p < pEndOfData)
{
c = *p;
if (UNLIKELY(skipRows > 0))
{
if (c == NEWLINE_CHAR)
{
--skipRows;
}
++p;
continue;
}
// If we have stripped "enclosed" characters, then save raw data
if (rawDataRowLength > 0)

View File

@ -215,7 +215,8 @@ class BulkLoadBuffer
/** @brief tokenize the buffer contents and fill up the token array.
*/
void tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall);
void tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall,
size_t& skipRows);
/** @brief Binary tokenization of the buffer, and fill up the token array.
*/
@ -273,13 +274,14 @@ class BulkLoadBuffer
bool tryAndLockColumn(const int& columnId, const int& id);
int fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
size_t* parse_length, RID& totalReadRows, RID& correctTotalRows,
size_t* parse_length, size_t& skipRows, RID& totalReadRows, RID& correctTotalRows,
const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall);
/** @brief Read the table data into the buffer
*/
int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalRows, RID& correctTotalRows,
const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall);
int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows, RID& totalRows,
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
unsigned int allowedErrCntThisCall);
/** @brief Get the overflow size
*/

View File

@ -0,0 +1,559 @@
/* Copyright (C) 2014 InfiniDB, Inc.
Copyright (C) 2016 MariaDB Corporation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include "we_simplesyslog.h"
#include <unistd.h>
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <vector>
#include <string>
#include <sstream>
#include <iostream>
#include <exception>
#include <stdexcept>
#include <cerrno>
#include <boost/program_options.hpp>
namespace po = boost::program_options;
using namespace std;
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <boost/filesystem.hpp>
#include "dataconvert.h"
#include "liboamcpp.h"
using namespace oam;
#include "we_cmdargs.h"
#include "mcsconfig.h"
namespace WriteEngine
{
//----------------------------------------------------------------------
//----------------------------------------------------------------------
WECmdArgs::WECmdArgs(int argc, char** argv)
{
try
{
fOptions = std::make_unique<po::options_description>();
fVisibleOptions = std::make_unique<po::options_description>();
#define DECLARE_INT_ARG(name, stor, min, max, desc) \
(name,\
po::value<int>(&stor)\
->notifier([this](auto&& value) { checkIntArg(name, min, max, value); }),\
desc)
fVisibleOptions->add_options()
("help,h", "Print this message.")
DECLARE_INT_ARG("read-buffer,b", fIOReadBufSize, 1, INT_MAX, "Number of read buffers.")
DECLARE_INT_ARG("read-buffer-size,c", fReadBufSize, 1, INT_MAX,
"Application read buffer size (in bytes)")
DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message")
DECLARE_INT_ARG("max-errors,e", fMaxErrors, 0, INT_MAX,
"Maximum number of allowable error per table per PM")
("file-path,f", po::value<string>(&fPmFilePath),
"Data file directory path. Default is current working directory.\n"
"\tIn Mode 1, represents the local input file path.\n"
"\tIn Mode 2, represents the PM based input file path.\n"
"\tIn Mode 3, represents the local input file path.")
DECLARE_INT_ARG("mode,m", fArgMode, 1, 3,
"\t1 - rows will be loaded in a distributed manner acress PMs.\n"
"\t2 - PM based input files loaded into their respective PM.\n"
"\t3 - input files will be loaded on the local PM.")
("filename,l", po::value<string>(&fPmFile),
"Name of import file to be loaded, relative to 'file-path'")
("console-log,i", po::bool_switch(&fConsoleLog),
"Print extended info to console in Mode 3.")
("job-id,j", po::value<string>(),
"Job ID. In simple usage, default is the table OID unless a fully qualified input "
"file name is given.")
("null-strings,n", po::value(&fNullStrMode)->implicit_value(true),
"NullOption (0-treat the string NULL as data (default);\n"
"1-treat the string NULL as a NULL value)")
("xml-job-path,p", po::value<string>(&fJobPath), "Path for the XML job description file.")
DECLARE_INT_ARG("readers,r", fNoOfReadThrds, 1, INT_MAX, "Number of readers.")
("separator,s", po::value<string>(), "Delimiter between column values.")
DECLARE_INT_ARG("io-buffer-size,B", fSetBufSize, 1, INT_MAX,
"I/O library read buffer size (in bytes)")
DECLARE_INT_ARG("writers,w", fNoOfWriteThrds, 1, INT_MAX, "Number of parsers.")
("enclosed-by,E", po::value<char>(&fEnclosedChar),
"Enclosed by character if field values are enclosed.")
("escape-char,C", po::value<char>(&fEscChar)->default_value('\\'),
"Escape character used in conjunction with 'enclosed-by'"
"character, or as a part of NULL escape sequence ('\\N');\n"
"default is '\\'")
("headers,O",
po::value<int>(&fSkipRows)->implicit_value(1)
->notifier([this](auto&& value) { checkIntArg("headers,O", 0, INT_MAX, value); }),
"Number of header rows to skip.")
("binary-mode,I", po::value<int>(),
"Import binary data; how to treat NULL values:\n"
"\t1 - import NULL values\n"
"\t2 - saturate NULL values\n")
("calling-module,P", po::value<string>(&fModuleIDandPID), "Calling module ID and PID.")
("truncation-as-error,S", po::bool_switch(&fbTruncationAsError),
"Treat string truncations as errors.")
("tz,T", po::value<string>(),
"Timezone used for TIMESTAMP datatype. Possible values:\n"
"\t\"SYSTEM\" (default)\n"
"\tOffset in the form +/-HH:MM")
("disable-tablelock-timeout,D", po::bool_switch(&fDisableTableLockTimeOut),
"Disable timeout when waiting for table lock.")
("silent,N", po::bool_switch(&fSilent), "Disable console output.")
("s3-key,y", po::value<string>(&fS3Key),
"S3 Authentication Key (for S3 imports)")
("s3-secret,K", po::value<string>(&fS3Secret),
"S3 Authentication Secret (for S3 imports)")
("s3-bucket,t", po::value<string>(&fS3Bucket),
"S3 Bucket (for S3 imports)")
("s3-hostname,H", po::value<string>(&fS3Host),
"S3 Hostname (for S3 imports, Amazon's S3 default)")
("s3-region,g", po::value<string>(&fS3Region),
"S3 Region (for S3 imports)")
("errors-dir,L", po::value<string>(&fErrorDir)->default_value(MCSLOGDIR),
"Directory for the output .err and .bad files")
("job-uuid,u", po::value<string>(&fUUID), "import job UUID")
("username,U", po::value<string>(&fUsername), "Username of the files owner.")
("dbname", po::value<string>(), "Name of the database to load")
("table", po::value<string>(), "Name of table to load")
("load-file", po::value<string>(),
"Optional input file name in current directory, "
"unless a fully qualified name is given. If not given, input read from STDIN.");
po::options_description hidden("Hidden options");
hidden.add_options()
("keep-rollback-metadata,k", po::bool_switch(&fKeepRollbackMetaData),
"Keep rollback metadata.")
("report-file,R", po::value<string>(&fReportFilename), "Report file name.")
("allow-missing-columns,X", po::value<string>(), "Allow missing columns.");
fOptions->add(*fVisibleOptions).add(hidden);
#undef DECLARE_INT_ARG
parseCmdLineArgs(argc, argv);
}
catch (std::exception& exp)
{
startupError(exp.what(), true);
}
}
WECmdArgs::~WECmdArgs() = default;
//----------------------------------------------------------------------
void WECmdArgs::checkIntArg(const std::string& name, long min, long max, int value) const
{
if (value < min || value > max)
{
ostringstream oss;
oss << "Argument " << name << " is out of range [" << min << ", " << max << "]";
startupError(oss.str(), true);
}
}
//----------------------------------------------------------------------
void WECmdArgs::usage() const
{
cout << endl
<< "Simple usage using positional parameters "
"(no XML job file):"
<< endl
<< " " << fPrgmName << " dbName tblName [loadFile] [-j jobID] " << endl
<< " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl
<< " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl
<< " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] "
"[-d debugLevel] [-i] "
<< endl
<< " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl
<< " [-U username]" << endl
<< endl;
cout << endl
<< "Traditional usage without positional parameters "
"(XML job file required):"
<< endl
<< " " << fPrgmName << " -j jobID " << endl
<< " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl
<< " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl
<< " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] "
"[-d debugLevel] [-i] "
<< endl
<< " [-p path] [-l loadFile]" << endl
<< " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl
<< " [-U username]" << endl
<< endl;
cout << "\n\n" << (*fVisibleOptions) << endl;
cout << " Example1:" << endl
<< " " << fPrgmName << " -j 1234" << endl
<< " Example2: Some column values are enclosed within double quotes." << endl
<< " " << fPrgmName << " -j 3000 -E '\"'" << endl
<< " Example3: Import a nation table without a Job XML file" << endl
<< " " << fPrgmName << " -j 301 tpch nation nation.tbl" << endl;
exit(1);
}
//-----------------------------------------------------------------------------
void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
{
std::string importPath;
if (argc > 0)
fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; // argv[0] is splitter but we need cpimport
po::positional_options_description pos_opt;
pos_opt.add("dbname", 1)
.add("table", 1)
.add("load-file", 1);
po::variables_map vm;
po::store(po::command_line_parser(argc, argv).options(*fOptions).positional(pos_opt).run(), vm);
po::notify(vm);
if (vm.contains("help"))
{
fHelp = true;
usage();
return;
}
if (vm.contains("separator"))
{
auto value = vm["separator"].as<std::string>();
if (value == "\\t")
{
fColDelim = '\t';
}
else
{
fColDelim = value[0];
}
}
if (vm.contains("binary-mode"))
{
int value = vm["binary-mode"].as<int>();
if (value == 1)
{
fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL;
}
else if (value == 2)
{
fImportDataMode = IMPORT_DATA_BIN_SAT_NULL;
}
else
{
startupError("Invalid Binary mode; value can be 1 or 2");
}
}
if (vm.contains("tz"))
{
auto tz = vm["tz"].as<std::string>();
long offset;
if (tz != "SYSTEM" && dataconvert::timeZoneToOffset(tz.c_str(), tz.size(), &offset))
{
startupError("Value for option --tz/-T is invalid");
}
fTimeZone = tz;
}
if (vm.contains("job-id"))
{
errno = 0;
string optarg = vm["job-id"].as<std::string>();
long lValue = strtol(optarg.c_str(), nullptr, 10);
if (errno != 0 || lValue < 0 || lValue > INT_MAX)
{
startupError("Option --job-id/-j is invalid or outof range");
}
fJobId = optarg;
fOrigJobId = fJobId;
if (0 == fJobId.length())
{
startupError("Wrong JobID Value");
}
}
if (vm.contains("allow-missing-columns"))
{
if (vm["allow-missing-columns"].as<string>() == "AllowMissingColumn")
{
fAllowMissingColumn = true;
}
}
if (fArgMode != -1)
fMode = fArgMode; // BUG 4210
if (2 == fArgMode && fPmFilePath.empty())
throw runtime_error("-f option is mandatory with mode 2.");
if (vm.contains("dbname"))
{
fSchema = vm["dbname"].as<std::string>();
}
if (vm.contains("table"))
{
fTable = vm["table"].as<std::string>();
}
if (vm.contains("load-file"))
{
fLocFile = vm["load-file"].as<std::string>();
}
}
void WECmdArgs::fillParams(BulkLoad& curJob, std::string& sJobIdStr, std::string& sXMLJobDir,
std::string& sModuleIDandPID, bool& bLogInfo2ToConsole, std::string& xmlGenSchema,
std::string& xmlGenTable, bool& bValidateColumnList)
{
std::string importPath;
std::string rptFileName;
bool bImportFileArg = false;
BulkModeType bulkMode = BULK_MODE_LOCAL;
std::string jobUUID;
curJob.setReadBufferCount(fIOReadBufSize);
curJob.setReadBufferSize(fReadBufSize);
if (fMaxErrors >= 0)
{
curJob.setMaxErrorCount(fMaxErrors);
}
if (!fPmFilePath.empty())
{
importPath = fPmFilePath;
string setAltErrMsg;
if (curJob.setAlternateImportDir(importPath, setAltErrMsg) != NO_ERROR)
{
startupError(setAltErrMsg, false);
}
}
bLogInfo2ToConsole = fConsoleLog;
sJobIdStr = fJobId;
curJob.setKeepRbMetaFiles(fKeepRollbackMetaData);
bulkMode = static_cast<BulkModeType>(fMode);
curJob.setNullStringMode(fNullStrMode);
sXMLJobDir = fJobPath;
curJob.setNoOfReadThreads(fNoOfReadThrds);
curJob.setColDelimiter(fColDelim);
curJob.setJobUUID(fUUID);
curJob.setNoOfParseThreads(fNoOfWriteThrds);
curJob.setVbufReadSize(fReadBufSize);
if (fEscChar != -1)
{
curJob.setEscapeChar(fEscChar);
}
if (fEnclosedChar != -1)
{
curJob.setEnclosedByChar(fEnclosedChar);
}
curJob.setImportDataMode(fImportDataMode);
curJob.setErrorDir(fErrorDir);
sModuleIDandPID = fModuleIDandPID;
rptFileName = fReportFilename;
curJob.setTruncationAsError(fbTruncationAsError);
if (!fTimeZone.empty())
{
long offset;
if (dataconvert::timeZoneToOffset(fTimeZone.c_str(), fTimeZone.size(), &offset))
{
startupError("Invalid timezone specified");
}
curJob.setTimeZone(offset);
}
bValidateColumnList = !fAllowMissingColumn;
curJob.disableTimeOut(fDisableTableLockTimeOut);
curJob.disableConsoleOutput(fSilent);
curJob.setS3Key(fS3Key);
curJob.setS3Bucket(fS3Bucket);
curJob.setS3Secret(fS3Secret);
curJob.setS3Region(fS3Region);
curJob.setS3Host(fS3Host);
if (!fUsername.empty())
{
curJob.setUsername(fUsername);
}
curJob.setSkipRows(fSkipRows);
curJob.setDefaultJobUUID();
// Inconsistent to specify -f STDIN with -l importFile
if (bImportFileArg && importPath == "STDIN")
{
startupError(std::string("-f STDIN is invalid with -l importFile."), true);
}
// If distributed mode, make sure report filename is specified and that we
// can create the file using the specified path.
if (bulkMode == BULK_MODE_REMOTE_SINGLE_SRC || bulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)
{
if (rptFileName.empty())
{
startupError(std::string("Bulk modes 1 and 2 require -R rptFileName."), true);
}
else
{
std::ofstream rptFile(rptFileName.c_str());
if (rptFile.fail())
{
std::ostringstream oss;
oss << "Unable to open report file " << rptFileName;
startupError(oss.str(), false);
}
rptFile.close();
}
curJob.setBulkLoadMode(bulkMode, rptFileName);
}
// Get positional arguments, User can provide:
// 1. no positional parameters
// 2. Two positional parameters (schema and table names)
// 3. Three positional parameters (schema, table, and import file name)
if (!fSchema.empty())
{
xmlGenSchema = fSchema;
if (!fTable.empty())
{
// Validate invalid options in conjunction with 2-3 positional
// parameter mode, which means we are using temp Job XML file.
if (bImportFileArg)
{
startupError(std::string("-l importFile is invalid with positional parameters"), true);
}
if (!sXMLJobDir.empty())
{
startupError(std::string("-p path is invalid with positional parameters."), true);
}
if (importPath == "STDIN")
{
startupError(std::string("-f STDIN is invalid with positional parameters."), true);
}
xmlGenTable = fTable;
if (!fLocFile.empty())
{
// 3rd pos parm
curJob.addToCmdLineImportFileList(fLocFile);
// Default to CWD if loadfile name given w/o -f path
if (importPath.empty())
{
std::string setAltErrMsg;
if (curJob.setAlternateImportDir(std::string("."), setAltErrMsg) != NO_ERROR)
startupError(setAltErrMsg, false);
}
}
else
{
// Invalid to specify -f if no load file name given
if (!importPath.empty())
{
startupError(std::string("-f requires 3rd positional parameter (load file name)."), true);
}
// Default to STDIN if no import file name given
std::string setAltErrMsg;
if (curJob.setAlternateImportDir(std::string("STDIN"), setAltErrMsg) != NO_ERROR)
startupError(setAltErrMsg, false);
}
}
else
{
startupError(std::string("No table name specified with schema."), true);
}
}
else
{
// JobID is a required parameter with no positional parm mode,
// because we need the jobid to identify the input job xml file.
if (sJobIdStr.empty())
{
startupError(std::string("No JobID specified."), true);
}
}
// Dump some configuration info
if (!fSilent)
{
if (fDebugLvl != 0)
{
cout << "Debug level is set to " << fDebugLvl << endl;
}
if (fNoOfReadThrds != 0)
{
cout << "number of read threads : " << fNoOfReadThrds << endl;
}
cout << "Column delimiter : " << (fColDelim == '\t' ? "\\t" : string{fColDelim}) << endl;
if (fNoOfWriteThrds != 0)
{
cout << "number of parse threads : " << fNoOfWriteThrds << endl;
}
if (fEscChar != 0)
{
cout << "Escape Character : " << fEscChar << endl;
}
if (fEnclosedChar != 0)
{
cout << "Enclosed by Character : " << fEnclosedChar << endl;
}
}
}
void WECmdArgs::startupError(const std::string& errMsg, bool showHint) const
{
BRMWrapper::getInstance()->finishCpimportJob(fCpimportJobId);
// Log to console
if (!BulkLoad::disableConsoleOutput())
cerr << errMsg << endl;
if (showHint && !fSilent)
{
cerr << "Try '" << fPrgmName << " -h' for more information." << endl;
}
// Log to syslog
logging::Message::Args errMsgArgs;
errMsgArgs.add(errMsg);
SimpleSysLog::instance()->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0087);
std::string jobIdStr("0");
logging::Message::Args endMsgArgs;
endMsgArgs.add(jobIdStr);
endMsgArgs.add("FAILED");
SimpleSysLog::instance()->logMsg(endMsgArgs, logging::LOG_TYPE_INFO, logging::M0082);
exit(EXIT_FAILURE);
}
} /* namespace WriteEngine */

View File

@ -0,0 +1,130 @@
/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*******************************************************************************
* $Id$
*
*******************************************************************************/
#pragma once
#include <boost/uuid/nil_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "we_bulkload.h"
#include "we_type.h"
namespace boost::program_options
{
class options_description;
}
namespace WriteEngine
{
class WECmdArgs
{
public:
WECmdArgs(int argc, char** argv);
~WECmdArgs();
using VecInts = std::vector<unsigned int>;
using VecArgs = std::vector<std::string>;
void parseCmdLineArgs(int argc, char** argv);
void usage() const;
bool checkForCornerCases();
void startupError(const std::string& errMsg, bool showHint = false) const;
void fillParams(BulkLoad& curJob, std::string& sJobIdStr,
std::string& sXMLJobDir, std::string& sModuleIDandPID, bool& bLogInfo2ToConsole,
std::string& xmlGenSchema, std::string& xmlGenTable, bool& bValidateColumnList);
void setCpimportJobId(uint32_t cpimportJobId)
{
fCpimportJobId = cpimportJobId;
}
private:
void checkIntArg(const std::string& name, long min, long max, int value) const;
VecArgs fVecArgs;
VecInts fPmVec;
VecArgs fVecJobFiles; // JobFiles splitter from master JobFile
int fMultiTableCount{0}; // MultiTable count
VecArgs fColFldsFromJobFile; // List of columns from any job file, that
// represent fields in the import data
std::string fJobId; // JobID
std::string fOrigJobId; // Original JobID, in case we have to split it
bool fJobLogOnly{false}; // Job number is only for log filename only
bool fHelp{false}; // Help mode
int fMode{BULK_MODE_LOCAL}; // splitter Mode
int fArgMode{-1}; // Argument mode, dep. on this fMode is decided.
bool fQuiteMode{true}; // in quite mode or not
bool fConsoleLog{false}; // Log everything to console - w.r.t cpimport
std::string fPmFile; // FileName at PM
std::string fPmFilePath; // Path of input file in PM
std::string fLocFile; // Local file name
std::string fBrmRptFile; // BRM report file
std::string fJobPath; // Path to Job File
std::string fTmpFileDir; // Temp file directory.
std::string fBulkRoot; // Bulk Root path
std::string fJobFile; // Job File Name
std::string fS3Key; // S3 key
std::string fS3Secret; // S3 Secret
std::string fS3Bucket; // S3 Bucket
std::string fS3Host; // S3 Host
std::string fS3Region; // S3 Region
int fNoOfReadThrds{1}; // No. of read buffers
int fDebugLvl{0}; // Debug level
int fMaxErrors{-1}; // Max allowable errors
int fReadBufSize{-1}; // Read buffer size
int fIOReadBufSize{-1}; // I/O read buffer size
int fSetBufSize{0}; // Buff size w/setvbuf
char fColDelim{0}; // column delimiter
char fEnclosedChar{0}; // enclosed by char
char fEscChar{0}; // esc char
int fSkipRows{0}; // skip header
int fNoOfWriteThrds{3}; // No. of write threads
bool fNullStrMode{false}; // set null string mode - treat null as null
ImportDataMode fImportDataMode{IMPORT_DATA_TEXT}; // Importing text or binary data
std::string fPrgmName; // argv[0]
std::string fSchema; // Schema name - positional parmater
std::string fTable; // Table name - table name parameter
bool fBlockMode3{false}; // Do not allow Mode 3
bool fbTruncationAsError{false}; // Treat string truncation as error
std::string fUUID{boost::uuids::to_string(boost::uuids::nil_generator()())};
bool fConsoleOutput{true}; // If false, no output to console.
std::string fTimeZone{"SYSTEM"}; // Timezone to use for TIMESTAMP datatype
std::string fUsername; // Username of the data files owner
std::string fErrorDir{MCSLOGDIR "/cpimport"};
bool fDisableTableLockTimeOut{false};
bool fSilent{false};
std::string fModuleIDandPID;
std::string fReportFilename;
bool fKeepRollbackMetaData{false};
bool fAllowMissingColumn{false};
uint32_t fCpimportJobId{};
std::unique_ptr<boost::program_options::options_description> fOptions;
std::unique_ptr<boost::program_options::options_description> fVisibleOptions;
};
} // namespace WriteEngine

View File

@ -145,6 +145,8 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN
, fNullStringMode(false)
, fEnclosedByChar('\0')
, fEscapeChar('\\')
, fSkipRows(0)
, fSkipRowsCur(0)
, fProcessingBegun(false)
, fBulkMode(BULK_MODE_LOCAL)
, fBRMReporter(logger, tableName)
@ -269,7 +271,7 @@ int TableInfo::readTableData()
int fileCounter = 0;
unsigned long long qtSentAt = 0;
if (fHandle == NULL)
if (fHandle == nullptr)
{
fFileName = fLoadFileList[fileCounter];
int rc = openTableFile();
@ -421,13 +423,14 @@ int TableInfo::readTableData()
if (fReadFromS3)
{
readRc = fBuffers[readBufNo].fillFromMemory(fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength,
&fS3ParseLength, totalRowsPerInputFile, validTotalRows,
fColumns, allowedErrCntThisCall);
&fS3ParseLength, fSkipRowsCur, totalRowsPerInputFile,
validTotalRows, fColumns, allowedErrCntThisCall);
}
else
{
readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
validTotalRows, fColumns, allowedErrCntThisCall);
readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, fSkipRowsCur,
totalRowsPerInputFile, validTotalRows, fColumns,
allowedErrCntThisCall);
}
if (readRc != NO_ERROR)
@ -1208,7 +1211,6 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList,
unsigned int fixedBinaryRecLen)
{
fReadBufCount = noOfBuffers;
// initialize and populate the buffer vector.
@ -1258,7 +1260,7 @@ void TableInfo::addColumn(ColumnInfo* info)
//------------------------------------------------------------------------------
int TableInfo::openTableFile()
{
if (fHandle != NULL)
if (fHandle != nullptr)
return NO_ERROR;
if (fReadFromStdin)
@ -1322,6 +1324,8 @@ int TableInfo::openTableFile()
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
fSkipRowsCur = fSkipRows;
return NO_ERROR;
}

View File

@ -148,8 +148,9 @@ class TableInfo : public WeUIDGID
size_t fS3ParseLength;
bool fNullStringMode; // Treat "NULL" as a null value
char fEnclosedByChar; // Character to enclose col values
char fEscapeChar; // Escape character used in conjunc-
// tion with fEnclosedByChar
char fEscapeChar; // Escape character used in conjunction with fEnclosedByChar
size_t fSkipRows; // Header rows to skip
size_t fSkipRowsCur; // Header rows left oto skip in the current file
bool fProcessingBegun; // Has processing begun on this tbl
BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3)
std::string fBRMRptFileName; // Name of distributed mode rpt file
@ -334,6 +335,10 @@ class TableInfo : public WeUIDGID
*/
void setEscapeChar(char esChar);
/** @brief Set how many header rows should be skipped.
*/
void setSkipRows(size_t skipRows);
/** @brief Has processing begun for this table.
*/
bool hasProcessingBegun();
@ -579,6 +584,12 @@ inline void TableInfo::setEscapeChar(char esChar)
fEscapeChar = esChar;
}
inline void TableInfo::setSkipRows(size_t skipRows)
{
fSkipRows = skipRows;
}
inline void TableInfo::setFileBufferSize(const int fileBufSize)
{
fFileBufSize = fileBufSize;