From 78c1b5034d3ce55a3da79ca1638d8fdb503c2ad5 Mon Sep 17 00:00:00 2001 From: Alexey Antipovsky Date: Fri, 11 Jul 2025 21:35:43 +0200 Subject: [PATCH] Feature/mcol 4882 cpimport skip rows (#3594) * feat(cpimport): MCOL-4882 add a parameter to skip header rows * chore(cpimport): MCOL-4882 Use boost::program_options to arguments parsing * feat(cpimport.bin): MCOL-4882 Add missing changes * add test * fix clang * add missing cmdline argument * fix bug * Fix double lines skipping * Fix incorrect --silent (-N) parsing * fix default --max-errors processing * fix overwriting default username * move initialization to members declaration --- .../r/MCOL-4882-cpimport-skip-headers.result | 22 + .../t/MCOL-4882-cpimport-skip-headers.test | 43 + writeengine/bulk/CMakeLists.txt | 3 +- writeengine/bulk/cpimport.cpp | 738 +----------------- writeengine/bulk/we_bulkload.cpp | 35 +- writeengine/bulk/we_bulkload.h | 88 ++- writeengine/bulk/we_bulkloadbuffer.cpp | 28 +- writeengine/bulk/we_bulkloadbuffer.h | 10 +- writeengine/bulk/we_cmdargs.cpp | 559 +++++++++++++ writeengine/bulk/we_cmdargs.h | 130 +++ writeengine/bulk/we_tableinfo.cpp | 18 +- writeengine/bulk/we_tableinfo.h | 15 +- writeengine/server/we_dataloader.cpp | 5 +- writeengine/shared/we_type.h | 2 + writeengine/splitter/CMakeLists.txt | 1 + writeengine/splitter/we_cmdargs.cpp | 682 ++++++---------- writeengine/splitter/we_cmdargs.h | 120 +-- writeengine/splitter/we_filereadthread.cpp | 48 +- writeengine/splitter/we_filereadthread.h | 51 +- writeengine/splitter/we_sdhandler.cpp | 15 +- writeengine/splitter/we_sdhandler.h | 9 +- writeengine/splitter/we_splclient.h | 22 +- writeengine/splitter/we_splitterapp.cpp | 37 +- writeengine/splitter/we_xmlgetter.cpp | 81 +- writeengine/splitter/we_xmlgetter.h | 14 +- writeengine/xml/we_xmlgendata.cpp | 44 +- writeengine/xml/we_xmlgendata.h | 7 +- writeengine/xml/we_xmlgenproc.cpp | 5 + writeengine/xml/we_xmljob.cpp | 12 +- writeengine/xml/we_xmltag.h | 4 +- 30 files changed, 1379 insertions(+), 1469 deletions(-) create mode 100644 mysql-test/columnstore/basic/r/MCOL-4882-cpimport-skip-headers.result create mode 100644 mysql-test/columnstore/basic/t/MCOL-4882-cpimport-skip-headers.test create mode 100644 writeengine/bulk/we_cmdargs.cpp create mode 100644 writeengine/bulk/we_cmdargs.h diff --git a/mysql-test/columnstore/basic/r/MCOL-4882-cpimport-skip-headers.result b/mysql-test/columnstore/basic/r/MCOL-4882-cpimport-skip-headers.result new file mode 100644 index 000000000..3b209e3bd --- /dev/null +++ b/mysql-test/columnstore/basic/r/MCOL-4882-cpimport-skip-headers.result @@ -0,0 +1,22 @@ +DROP DATABASE IF EXISTS mcol4882; +CREATE DATABASE mcol4882; +USE mcol4882; +CREATE TABLE t1(col1 INT, col2 VARCHAR(64)) ENGINE=Columnstore; +SELECT * FROM t1; +col1 col2 +1 test1 +2 test2 +3 test3 +TRUNCATE t1; +SELECT * FROM t1; +col1 col2 +2 test2 +3 test3 +TRUNCATE t1; +SELECT * FROM t1; +col1 col2 +3 test3 +TRUNCATE t1; +SELECT * FROM t1; +col1 col2 +DROP DATABASE mcol4882; diff --git a/mysql-test/columnstore/basic/t/MCOL-4882-cpimport-skip-headers.test b/mysql-test/columnstore/basic/t/MCOL-4882-cpimport-skip-headers.test new file mode 100644 index 000000000..167bb8f65 --- /dev/null +++ b/mysql-test/columnstore/basic/t/MCOL-4882-cpimport-skip-headers.test @@ -0,0 +1,43 @@ +if (!$MYSQL_TEST_ROOT){ + skip Should be run by root to execute cpimport; +} + +-- source ../include/have_columnstore.inc + +--disable_warnings +DROP DATABASE IF EXISTS mcol4882; +--enable_warnings + +CREATE DATABASE mcol4882; +USE mcol4882; + +CREATE TABLE t1(col1 INT, col2 VARCHAR(64)) ENGINE=Columnstore; + +--exec printf '1,test1\n2,test2\n3,test3\n' > /tmp/mcol4882.csv + +--disable_result_log +--exec $MCS_CPIMPORT -s , mcol4882 t1 /tmp/mcol4882.csv +--enable_result_log +SELECT * FROM t1; +TRUNCATE t1; + +--disable_result_log +--exec $MCS_CPIMPORT -s , --headers -- mcol4882 t1 /tmp/mcol4882.csv +--enable_result_log +SELECT * FROM t1; +TRUNCATE t1; + +--disable_result_log +--exec $MCS_CPIMPORT -s , --headers 2 mcol4882 t1 /tmp/mcol4882.csv +--enable_result_log +SELECT * FROM t1; +TRUNCATE t1; + +--disable_result_log +--exec $MCS_CPIMPORT -s , --headers 5 mcol4882 t1 /tmp/mcol4882.csv +--enable_result_log +SELECT * FROM t1; + +# Clean UP +--exec rm -f /tmp/mcol4882.csv +DROP DATABASE mcol4882; diff --git a/writeengine/bulk/CMakeLists.txt b/writeengine/bulk/CMakeLists.txt index 15068d037..0bb90a586 100644 --- a/writeengine/bulk/CMakeLists.txt +++ b/writeengine/bulk/CMakeLists.txt @@ -9,6 +9,7 @@ set(we_bulk_STAT_SRCS we_bulkload.cpp we_bulkloadbuffer.cpp we_bulkstatus.cpp + we_cmdargs.cpp we_colopbulk.cpp we_colbuf.cpp we_colbufcompressed.cpp @@ -28,7 +29,7 @@ set(we_bulk_STAT_SRCS add_definitions(-D_FILE_OFFSET_BITS=64) columnstore_static_library(we_bulk ${we_bulk_STAT_SRCS}) -columnstore_link(we_bulk ${NETSNMP_LIBRARIES} loggingcpp) +columnstore_link(we_bulk ${NETSNMP_LIBRARIES} loggingcpp boost_program_options) remove_definitions(-D_FILE_OFFSET_BITS=64) diff --git a/writeengine/bulk/cpimport.cpp b/writeengine/bulk/cpimport.cpp index ddf07a83d..3715c658d 100644 --- a/writeengine/bulk/cpimport.cpp +++ b/writeengine/bulk/cpimport.cpp @@ -49,6 +49,7 @@ #include "dataconvert.h" #include "mcsconfig.h" #include "mariadb_my_sys.h" +#include "we_cmdargs.h" using namespace std; using namespace WriteEngine; @@ -56,8 +57,8 @@ using namespace execplan; namespace { -char* pgmName = 0; const std::string IMPORT_PATH_CWD("."); +unique_ptr cmdArgs; bool bDebug = false; uint32_t cpimportJobId = 0; @@ -88,103 +89,6 @@ const char* taskLabels[] = {"", "processing data"}; } // namespace -//------------------------------------------------------------------------------ -// Print command line usage -//------------------------------------------------------------------------------ -void printUsage() -{ - cout << endl - << "Simple usage using positional parameters " - "(no XML job file):" - << endl - << " cpimport.bin dbName tblName [loadFile] [-j jobID] " << endl - << " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl - << " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl - << " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] " - "[-d debugLevel] [-i] " - << endl - << " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl - << " [-U username]" << endl - << endl; - - cout << endl - << "Traditional usage without positional parameters " - "(XML job file required):" - << endl - << " cpimport.bin -j jobID " << endl - << " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl - << " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl - << " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] " - "[-d debugLevel] [-i] " - << endl - << " [-p path] [-l loadFile]" << endl - << " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl - << " [-U username]" << endl - << endl; - - cout << " Positional parameters:" << endl - << " dbName Name of database to load" << endl - << " tblName Name of table to load" << endl - << " loadFile Optional input file name in current directory, " << "unless a fully" << endl - << " qualified name is given. If not given, " << "input read from stdin." << endl - << endl; - - cout << " Options:" << endl - << " -b Number of read buffers" << endl - << " -c Application read buffer size (in bytes)" << endl - << " -d Print different level (1-3) debug message " << endl - << " -e Maximum number of allowable errors per table" << endl - << " -f Data file directory path; " << endl - << " In simple usage:" << endl - << " Default is current working directory." << endl - << " -f option only applies if loadFile is specified." << endl - << " In traditional usage: " << endl - << " Default is /data/import." << endl - << " 'STDIN' (all caps) redirects input from stdin." << endl - << " -h Print this message" << endl - << " -i Print extended info to console, else this info only goes " - "to log file." - << endl - << " -j Job id. In simple usage, default is the table OID." << endl - << " -l Name of input file to be loaded, relative to -f path," << endl - << " unless a fully qualified input file name is given." << endl - << " -n NullOption (0-treat the string NULL as data (default);" << endl - << " 1-treat the string NULL as a NULL value)" << endl - << " -p Path for XML job description file" << endl - << " -r Number of readers" << endl - << " -s 'c' is the delimiter between column values" << endl - << " -w Number of parsers" << endl - << " -B I/O library read buffer size (in bytes)" << endl - << " -E Enclosed by character if field values are enclosed" << endl - << " -C Escape character used in conjunction with 'enclosed by' " << "character," << endl - << " or as part of NULL escape sequence ('\\N'); default is '\\'" << endl - << " -I Binary import; binaryOpt 1-import NULL values" << endl - << " 2-saturate NULL values" << endl - << " -S Treat string truncations as errors" << endl - << " -D Disable timeout when waiting for table lock" << endl - << " -N Disable console output" << endl - << " -L send *.err and *.bad (reject) files here" << endl - << " -T Timezone used for TIMESTAMP datatype" << endl - << " Possible values: \"SYSTEM\" (default)" << endl - << " : Offset in the form +/-HH:MM" << endl - << endl - << " -y S3 Authentication Key (for S3 imports)" << endl - << " -K S3 Authentication Secret (for S3 imports)" << endl - << " -t S3 Bucket (for S3 imports)" << endl - << " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl - << " -g S3 Regions (for S3 imports)" << endl - << " -U username of new data files owner. Default is mysql" << endl; - - cout << " Example1:" << endl - << " cpimport.bin -j 1234" << endl - << " Example2: Some column values are enclosed within double quotes." << endl - << " cpimport.bin -j 3000 -E '\"'" << endl - << " Example3: Import a nation table without a Job XML file" << endl - << " cpimport.bin -j 301 tpch nation nation.tbl" << endl; - - exit(EXIT_SUCCESS); -} - //------------------------------------------------------------------------------ // Signal handler to catch SIGTERM signal to terminate the process //------------------------------------------------------------------------------ @@ -227,40 +131,6 @@ void handleSigAbrt(int /*i*/) BulkStatus::setJobStatus(EXIT_FAILURE); } -//------------------------------------------------------------------------------ -// If error occurs during startup, this function is called to log the specified -// message and terminate the process. -//------------------------------------------------------------------------------ -void startupError(const std::string& errMsg, bool showHint) -{ - BRMWrapper::getInstance()->finishCpimportJob(cpimportJobId); - // Log to console - if (!BulkLoad::disableConsoleOutput()) - cerr << errMsg << endl; - - if (showHint) - { - std::ostringstream oss; - oss << "Try '" << pgmName << " -h' for more information."; - - if (!BulkLoad::disableConsoleOutput()) - cerr << oss.str() << endl; - } - - // Log to syslog - logging::Message::Args errMsgArgs; - errMsgArgs.add(errMsg); - SimpleSysLog::instance()->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0087); - - std::string jobIdStr("0"); - logging::Message::Args endMsgArgs; - endMsgArgs.add(jobIdStr); - endMsgArgs.add("FAILED"); - SimpleSysLog::instance()->logMsg(endMsgArgs, logging::LOG_TYPE_INFO, logging::M0082); - - exit(EXIT_FAILURE); -} - //------------------------------------------------------------------------------ // Initialize signal handling //------------------------------------------------------------------------------ @@ -307,540 +177,6 @@ void setupSignalHandlers() sigaction(SIGABRT, &act, 0); } -//------------------------------------------------------------------------------ -// Parse the command line arguments -//------------------------------------------------------------------------------ -void parseCmdLineArgs(int argc, char** argv, BulkLoad& curJob, std::string& sJobIdStr, - std::string& sXMLJobDir, std::string& sModuleIDandPID, bool& bLogInfo2ToConsole, - std::string& xmlGenSchema, std::string& xmlGenTable, bool& bValidateColumnList) -{ - std::string importPath; - std::string rptFileName; - int option; - bool bImportFileArg = false; - BulkModeType bulkMode = BULK_MODE_LOCAL; - std::string jobUUID; - - while ((option = getopt(argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:y:K:t:H:g:U:")) != - EOF) - { - switch (option) - { - case 'b': // -b: no. of read buffers - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - { - startupError(std::string("Option -b is invalid or out of range."), true); - } - - int noOfReadBuffers = lValue; - curJob.setReadBufferCount(noOfReadBuffers); - break; - } - - case 'c': // -c: read buffer size - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - { - startupError(std::string("Option -c is invalid or out of range."), true); - } - - int readBufferSize = lValue; - curJob.setReadBufferSize(readBufferSize); - break; - } - - case 'd': // -d: debug level - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - { - startupError(std::string("Option -d is invalid or out of range."), true); - } - - int debugLevel = lValue; - - if (debugLevel > 0 && debugLevel <= 3) - { - bDebug = true; - curJob.setAllDebug((DebugLevel)debugLevel); - - if (!BulkLoad::disableConsoleOutput()) - cout << "\nDebug level is set to " << debugLevel << endl; - } - - break; - } - - case 'e': // -e: max allowed errors - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX)) - { - startupError(std::string("Option -e is invalid or out of range."), true); - } - - int maxErrors = lValue; - curJob.setMaxErrorCount(maxErrors); - break; - } - - case 'f': // -f: import path - { - importPath = optarg; - std::string setAltErrMsg; - - if (curJob.setAlternateImportDir(importPath, setAltErrMsg) != NO_ERROR) - startupError(setAltErrMsg, false); - - break; - } - - case 'h': // -h: help - { - printUsage(); - break; - } - - case 'i': // -i: log info to console - { - bLogInfo2ToConsole = true; - break; - } - - case 'j': // -j: jobID - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX)) - { - startupError(std::string("Option -j is invalid or out of range."), true); - } - - sJobIdStr = optarg; - break; - } - - case 'k': // -k: hidden option to keep (not delete) - { - // bulk rollback meta-data files - curJob.setKeepRbMetaFiles(true); - break; - } - - case 'l': // -l: import load file(s) - { - bImportFileArg = true; - curJob.addToCmdLineImportFileList(std::string(optarg)); - break; - } - - case 'm': // -m: bulk load mode - { - bulkMode = (BulkModeType)atoi(optarg); - - if ((bulkMode != BULK_MODE_REMOTE_SINGLE_SRC) && (bulkMode != BULK_MODE_REMOTE_MULTIPLE_SRC) && - (bulkMode != BULK_MODE_LOCAL)) - { - startupError(std::string("Invalid bulk mode; can be 1,2, or 3"), true); - } - - break; - } - - case 'n': // -n: treat "NULL" as null - { - int nullStringMode = atoi(optarg); - - if ((nullStringMode != 0) && (nullStringMode != 1)) - { - startupError(std::string("Invalid NULL option; value can be 0 or 1"), true); - } - - if (nullStringMode) - curJob.setNullStringMode(true); - else - curJob.setNullStringMode(false); - - break; - } - - case 'p': // -p: Job XML path - { - sXMLJobDir = optarg; - break; - } - - case 'r': // -r: num read threads - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - { - startupError(std::string("Option -r is invalid or out of range."), true); - } - - int numOfReaders = lValue; -#if !defined(__LP64__) && !defined(_MSC_VER) - - if (numOfReaders > 1) - { - cerr << "Note: resetting number of read threads to maximum" << endl; - numOfReaders = 1; - } - -#endif - curJob.setNoOfReadThreads(numOfReaders); - - if (!BulkLoad::disableConsoleOutput()) - cout << "number of read threads : " << numOfReaders << endl; - - break; - } - - case 's': // -s: column delimiter - { - char delim; - - if (!strcmp(optarg, "\\t")) - { - delim = '\t'; - - if (!BulkLoad::disableConsoleOutput()) - cout << "Column delimiter : " << "\\t" << endl; - } - else - { - delim = optarg[0]; - - if (delim == '\t') // special case to print a - { - if (!BulkLoad::disableConsoleOutput()) - cout << "Column delimiter : '\\t'" << endl; - } - else - { - if (!BulkLoad::disableConsoleOutput()) - cout << "Column delimiter : " << delim << endl; - } - } - - curJob.setColDelimiter(delim); - break; - } - - case 'u': // -u: import job UUID - { - jobUUID = optarg; - curJob.setJobUUID(jobUUID); - break; - } - - case 'w': // -w: num parse threads - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - { - startupError(std::string("Option -w is invalid or out of range."), true); - } - - int numOfParser = lValue; -#if !defined(__LP64__) && !defined(_MSC_VER) - - if (numOfParser > 3) - { - cerr << "Note: resetting number of parse threads to maximum" << endl; - numOfParser = 3; - } - -#endif - curJob.setNoOfParseThreads(numOfParser); - - if (!BulkLoad::disableConsoleOutput()) - cout << "number of parse threads : " << numOfParser << endl; - - break; - } - - case 'B': // -B: setvbuf read size - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - { - startupError(std::string("Option -B is invalid or out of range."), true); - } - - int vbufReadSize = lValue; - curJob.setVbufReadSize(vbufReadSize); - break; - } - - case 'C': // -C: enclosed escape char - { - curJob.setEscapeChar(optarg[0]); - - if (!BulkLoad::disableConsoleOutput()) - cout << "Escape Character : " << optarg[0] << endl; - - break; - } - - case 'E': // -E: enclosed by char - { - curJob.setEnclosedByChar(optarg[0]); - - if (!BulkLoad::disableConsoleOutput()) - cout << "Enclosed by Character : " << optarg[0] << endl; - - break; - } - - case 'I': // -I: Binary import mode - { - ImportDataMode importMode = (ImportDataMode)atoi(optarg); - - if ((importMode != IMPORT_DATA_BIN_ACCEPT_NULL) && (importMode != IMPORT_DATA_BIN_SAT_NULL)) - { - startupError(std::string("Invalid binary import option; value can be 1" - "(accept NULL values) or 2(saturate NULL values)"), - true); - } - - curJob.setImportDataMode(importMode); - break; - } - - case 'L': // -L: Error log directory - { - curJob.setErrorDir(optarg); - break; - } - - case 'P': // -P: Calling moduleid - { - // and PID - sModuleIDandPID = optarg; - break; - } - - case 'R': // -R: distributed mode - { - // report file - rptFileName = optarg; - break; - } - - case 'S': // -S: Char & VarChar data - { - // greater than col def - curJob.setTruncationAsError(true); // are reported as err - break; - } - - case 'T': - { - std::string timeZone = optarg; - long offset; - - if (dataconvert::timeZoneToOffset(timeZone.c_str(), timeZone.size(), &offset)) - { - startupError(std::string("Value for option -T is invalid"), true); - } - - curJob.setTimeZone(offset); - break; - } - - case 'X': // Hidden extra options - { - if (!strcmp(optarg, "AllowMissingColumn")) - bValidateColumnList = false; - - break; - } - - case 'D': // disable table lock waiting timeout - { - curJob.disableTimeOut(true); - break; - } - - case 'N': // silent the output to console - { - BulkLoad::disableConsoleOutput(true); - break; - } - - case 'y': - { - curJob.setS3Key(optarg); - break; - } - - case 'K': - { - curJob.setS3Secret(optarg); - break; - } - - case 't': - { - curJob.setS3Bucket(optarg); - break; - } - - case 'H': - { - curJob.setS3Host(optarg); - break; - } - - case 'g': - { - curJob.setS3Region(optarg); - break; - } - - case 'U': - { - curJob.setUsername(optarg); - break; - } - - default: - { - ostringstream oss; - oss << "Unrecognized command line option (" << option << ")"; - startupError(oss.str(), true); - } - } - } - - curJob.setDefaultJobUUID(); - - // Inconsistent to specify -f STDIN with -l importFile - if ((bImportFileArg) && (importPath == "STDIN")) - { - startupError(std::string("-f STDIN is invalid with -l importFile."), true); - } - - // If distributed mode, make sure report filename is specified and that we - // can create the file using the specified path. - if ((bulkMode == BULK_MODE_REMOTE_SINGLE_SRC) || (bulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC)) - { - if (rptFileName.empty()) - { - startupError(std::string("Bulk modes 1 and 2 require -R rptFileName."), true); - } - else - { - std::ofstream rptFile(rptFileName.c_str()); - - if (rptFile.fail()) - { - std::ostringstream oss; - oss << "Unable to open report file " << rptFileName; - startupError(oss.str(), false); - } - - rptFile.close(); - } - - curJob.setBulkLoadMode(bulkMode, rptFileName); - } - - // Get positional arguments, User can provide: - // 1. no positional parameters - // 2. Two positional parameters (schema and table names) - // 3. Three positional parameters (schema, table, and import file name) - if (optind < argc) // see if db schema name is given - { - xmlGenSchema = argv[optind]; // 1st pos parm - optind++; - - if (optind < argc) // see if table name is given - { - // Validate invalid options in conjunction with 2-3 positional - // parameter mode, which means we are using temp Job XML file. - if (bImportFileArg) - { - startupError(std::string("-l importFile is invalid with positional parameters"), true); - } - - if (!sXMLJobDir.empty()) - { - startupError(std::string("-p path is invalid with positional parameters."), true); - } - - if (importPath == "STDIN") - { - startupError(std::string("-f STDIN is invalid with positional parameters."), true); - } - - xmlGenTable = argv[optind]; // 2nd pos parm - optind++; - - if (optind < argc) // see if input file name is given - { - // 3rd pos parm - curJob.addToCmdLineImportFileList(std::string(argv[optind])); - - // Default to CWD if loadfile name given w/o -f path - if (importPath.empty()) - { - std::string setAltErrMsg; - - if (curJob.setAlternateImportDir(std::string("."), setAltErrMsg) != NO_ERROR) - startupError(setAltErrMsg, false); - } - } - else - { - // Invalid to specify -f if no load file name given - if (!importPath.empty()) - { - startupError(std::string("-f requires 3rd positional parameter (load file name)."), true); - } - - // Default to STDIN if no import file name given - std::string setAltErrMsg; - - if (curJob.setAlternateImportDir(std::string("STDIN"), setAltErrMsg) != NO_ERROR) - startupError(setAltErrMsg, false); - } - } - else - { - startupError(std::string("No table name specified with schema."), true); - } - } - else - { - // JobID is a required parameter with no positional parm mode, - // because we need the jobid to identify the input job xml file. - if (sJobIdStr.empty()) - { - startupError(std::string("No JobID specified."), true); - } - } -} - //------------------------------------------------------------------------------ // Print the path of the input load file(s), and the name of the job xml file. //------------------------------------------------------------------------------ @@ -857,8 +193,7 @@ void printInputSource(const std::string& alternateImportDir, const std::string& if (alternateImportDir == IMPORT_PATH_CWD) { char cwdBuf[4096]; - char* bufPtr = &cwdBuf[0]; - bufPtr = ::getcwd(cwdBuf, sizeof(cwdBuf)); + char* bufPtr = ::getcwd(cwdBuf, sizeof(cwdBuf)); if (!(BulkLoad::disableConsoleOutput())) cout << "Input file(s) will be read from : " << bufPtr << endl; @@ -900,14 +235,14 @@ void getTableOID(const std::string& xmlGenSchema, const std::string& xmlGenTable std::ostringstream oss; oss << "Unable to set default JobID; " << "Error getting OID for table " << tbl.schema << '.' << tbl.table << ": " << ex.what(); - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } catch (...) { std::ostringstream oss; oss << "Unable to set default JobID; " << "Unknown error getting OID for table " << tbl.schema << '.' << tbl.table; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } std::ostringstream oss; @@ -950,7 +285,7 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob { std::ostringstream oss; oss << "cpimport.bin error creating temporary Job XML file name: " << xmlErrMsg; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } printInputSource(alternateImportDir, sFileName.string(), S3Bucket); @@ -970,7 +305,7 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob { std::ostringstream oss; oss << "No columns for " << xmlGenSchema << '.' << xmlGenTable; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } } catch (runtime_error& ex) @@ -979,7 +314,7 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob oss << "cpimport.bin runtime exception constructing temporary " "Job XML file: " << ex.what(); - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } catch (exception& ex) { @@ -987,13 +322,13 @@ void constructTempXmlFile(const std::string& tempJobDir, const std::string& sJob oss << "cpimport.bin exception constructing temporary " "Job XML file: " << ex.what(); - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } catch (...) { - startupError(std::string("cpimport.bin " - "unknown exception constructing temporary Job XML file"), - false); + cmdArgs->startupError(std::string("cpimport.bin " + "unknown exception constructing temporary Job XML file"), + false); } genProc.writeXMLFile(sFileName.string()); @@ -1009,9 +344,9 @@ void verifyNode() // Validate running on a PM if (localModuleType != "pm") { - startupError(std::string("Exiting, " - "cpimport.bin can only be run on a PM node"), - true); + cmdArgs->startupError(std::string("Exiting, " + "cpimport.bin can only be run on a PM node"), + true); } } @@ -1049,34 +384,22 @@ int main(int argc, char** argv) setlocale(LC_NUMERIC, "C"); // Initialize singleton instance of syslogging - if (argc > 0) - pgmName = argv[0]; - logging::IDBErrorInfo::instance(); SimpleSysLog::instance()->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_BULK)); // Log job initiation unless user is asking for help + cmdArgs = make_unique(argc, argv); std::ostringstream ossArgList; - bool bHelpFlag = false; for (int m = 1; m < argc; m++) { - if (strcmp(argv[m], "-h") == 0) - { - bHelpFlag = true; - break; - } - if (!strcmp(argv[m], "\t")) // special case to print a ossArgList << "'\\t'" << ' '; else ossArgList << argv[m] << ' '; } - if (!bHelpFlag) - { - logInitiateMsg(ossArgList.str().c_str()); - } + logInitiateMsg(ossArgList.str().c_str()); BulkLoad curJob; string sJobIdStr; @@ -1099,8 +422,8 @@ int main(int argc, char** argv) task = TASK_CMD_LINE_PARSING; string xmlGenSchema; string xmlGenTable; - parseCmdLineArgs(argc, argv, curJob, sJobIdStr, sXMLJobDir, sModuleIDandPID, bLogInfo2ToConsole, - xmlGenSchema, xmlGenTable, bValidateColumnList); + cmdArgs->fillParams(curJob, sJobIdStr, sXMLJobDir, sModuleIDandPID, bLogInfo2ToConsole, xmlGenSchema, + xmlGenTable, bValidateColumnList); //-------------------------------------------------------------------------- // Save basename portion of program path from argv[0] @@ -1154,9 +477,9 @@ int main(int argc, char** argv) if (!BRMWrapper::getInstance()->isSystemReady()) { - startupError(std::string("System is not ready. Verify that ColumnStore is up and ready " - "before running cpimport."), - false); + cmdArgs->startupError(std::string("System is not ready. Verify that ColumnStore is up and ready " + "before running cpimport."), + false); } if (bDebug) @@ -1173,7 +496,7 @@ int main(int argc, char** argv) WErrorCodes ec; std::ostringstream oss; oss << ec.errorString(brmReadWriteStatus) << " cpimport.bin is terminating."; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } if (bDebug) @@ -1190,7 +513,7 @@ int main(int argc, char** argv) WErrorCodes ec; std::ostringstream oss; oss << ec.errorString(brmShutdownPending) << " cpimport.bin is terminating."; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } if (bDebug) @@ -1207,7 +530,7 @@ int main(int argc, char** argv) WErrorCodes ec; std::ostringstream oss; oss << ec.errorString(brmSuspendPending) << " cpimport.bin is terminating."; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } if (bDebug) @@ -1268,7 +591,7 @@ int main(int argc, char** argv) { std::ostringstream oss; oss << "cpimport.bin error creating Job XML file name: " << xmlErrMsg; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } printInputSource(curJob.getAlternateImportDir(), sFileName.string(), curJob.getS3Bucket()); @@ -1300,13 +623,14 @@ int main(int argc, char** argv) } rc = BRMWrapper::getInstance()->newCpimportJob(cpimportJobId); + // TODO kemm: pass cpimportJobId to WECmdArgs if (rc != NO_ERROR) { WErrorCodes ec; std::ostringstream oss; oss << "Error in creating new cpimport job on Controller node; " << ec.errorString(rc) << "; cpimport is terminating."; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } //-------------------------------------------------------------------------- @@ -1321,7 +645,7 @@ int main(int argc, char** argv) WErrorCodes ec; std::ostringstream oss; oss << "Error in loading job information; " << ec.errorString(rc) << "; cpimport.bin is terminating."; - startupError(oss.str(), false); + cmdArgs->startupError(oss.str(), false); } if (bDebug) @@ -1353,7 +677,7 @@ int main(int argc, char** argv) if (task != TASK_PROCESS_DATA) { - startupError(exceptionMsg, false); + cmdArgs->startupError(exceptionMsg, false); } rc = ERR_UNKNOWN; @@ -1379,7 +703,7 @@ int main(int argc, char** argv) failMsg += exceptionMsg; } - endMsgArgs.add(failMsg.c_str()); + endMsgArgs.add(failMsg); } else { diff --git a/writeengine/bulk/we_bulkload.cpp b/writeengine/bulk/we_bulkload.cpp index 9d259abc1..2b4f1ca1f 100644 --- a/writeengine/bulk/we_bulkload.cpp +++ b/writeengine/bulk/we_bulkload.cpp @@ -72,7 +72,7 @@ const std::string ERR_LOG_SUFFIX = ".err"; // Job err log file suffix namespace WriteEngine { /* static */ std::vector> BulkLoad::fTableInfo; -/* static */ boost::mutex* BulkLoad::fDDLMutex = 0; +/* static */ boost::mutex* BulkLoad::fDDLMutex = new boost::mutex(); /* static */ const std::string BulkLoad::DIR_BULK_JOB("job"); /* static */ const std::string BulkLoad::DIR_BULK_TEMP_JOB("tmpjob"); @@ -140,35 +140,8 @@ struct CancellationThread // Constructor //------------------------------------------------------------------------------ BulkLoad::BulkLoad() - : fColOp(new ColumnOpBulk()) - , fColDelim('\0') - , fNoOfBuffers(-1) - , fBufferSize(-1) - , fFileVbufSize(-1) - , fMaxErrors(-1) - , fNoOfParseThreads(3) - , fNoOfReadThreads(1) - , fKeepRbMetaFiles(false) - , fNullStringMode(false) - , fEnclosedByChar('\0') - , // not enabled unless user overrides enclosed by char - fEscapeChar('\0') - , fTotalTime(0.0) - , fBulkMode(BULK_MODE_LOCAL) - , fbTruncationAsError(false) - , fImportDataMode(IMPORT_DATA_TEXT) - , fbContinue(false) - , fDisableTimeOut(false) - , fUUID(boost::uuids::nil_generator()()) - , fTimeZone(dataconvert::systemTimeZoneOffset()) - , fUsername("mysql") // MCOL-4328 default file owner { - fTableInfo.clear(); setDebugLevel(DEBUG_0); - - fDDLMutex = new boost::mutex(); - memset(&fStartTime, 0, sizeof(timeval)); - memset(&fEndTime, 0, sizeof(timeval)); } //------------------------------------------------------------------------------ @@ -540,6 +513,7 @@ int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr& tabl tableInfo->setImportDataMode(fImportDataMode); tableInfo->setTimeZone(fTimeZone); tableInfo->setJobUUID(fUUID); + tableInfo->setSkipRows(fSkipRows); // MCOL-4328 Get username gid and uid if they are set // We inject uid and gid into TableInfo and All ColumnInfo-s later. @@ -1002,6 +976,11 @@ int BulkLoad::processJob() fEscapeChar = '\\'; } + if (fSkipRows == 0) + { + fSkipRows = curJob.fSkipRows; + } + // std::cout << "bulkload::fEnclosedByChar<" << fEnclosedByChar << '>' << // std::endl << "bulkload::fEscapeChar<" << fEscapeChar << '>' << std::endl; diff --git a/writeengine/bulk/we_bulkload.h b/writeengine/bulk/we_bulkload.h index 91be178c8..f9ab26ffd 100644 --- a/writeengine/bulk/we_bulkload.h +++ b/writeengine/bulk/we_bulkload.h @@ -29,7 +29,7 @@ #include #include -#include +#include #include #include #include @@ -48,12 +48,7 @@ #include #include #include - -#if 0 // defined(_MSC_VER) && defined(WE_BULKLOAD_DLLEXPORT) -#define EXPORT __declspec(dllexport) -#else -#define EXPORT -#endif +#include /** Namespace WriteEngine */ namespace WriteEngine @@ -65,18 +60,18 @@ class BulkLoad : public FileOp /** * @brief BulkLoad constructor */ - EXPORT BulkLoad(); + BulkLoad(); /** * @brief BulkLoad destructor */ - EXPORT ~BulkLoad() override; + ~BulkLoad() override; /** * @brief Load job information */ - EXPORT int loadJobInfo(const std::string& fullFileName, bool bUseTempJobFile, int argc, char** argv, - bool bLogInfo2ToConsole, bool bValidateColumnList); + int loadJobInfo(const std::string& fullFileName, bool bUseTempJobFile, int argc, char** argv, + bool bLogInfo2ToConsole, bool bValidateColumnList); /** * @brief Pre process jobs to validate and assign values to the job structure @@ -91,7 +86,7 @@ class BulkLoad : public FileOp /** * @brief Process job */ - EXPORT int processJob(); + int processJob(); /** * @brief Set Debug level for this BulkLoad object and any data members @@ -126,12 +121,13 @@ class BulkLoad : public FileOp return fUUID; } - EXPORT int setAlternateImportDir(const std::string& loadDir, std::string& errMsg); + int setAlternateImportDir(const std::string& loadDir, std::string& errMsg); void setImportDataMode(ImportDataMode importMode); void setColDelimiter(char delim); void setBulkLoadMode(BulkModeType bulkMode, const std::string& rptFileName); void setEnclosedByChar(char enChar); void setEscapeChar(char esChar); + void setSkipRows(size_t skipRows); void setKeepRbMetaFiles(bool keepMeta); void setMaxErrorCount(unsigned int maxErrors); void setNoOfParseThreads(int parseThreads); @@ -181,7 +177,7 @@ class BulkLoad : public FileOp //-------------------------------------------------------------------------- XMLJob fJobInfo; // current job information - boost::scoped_ptr fColOp; // column operation + boost::scoped_ptr fColOp{new ColumnOpBulk()}; // column operation std::string fRootDir; // job process root directory std::string fJobFileName; // job description file name @@ -189,49 +185,50 @@ class BulkLoad : public FileOp Log fLog; // logger int fNumOfParser; // total number of parser - char fColDelim; // delimits col values within a row + char fColDelim{0}; // delimits col values within a row - int fNoOfBuffers; // Number of read buffers - int fBufferSize; // Read buffer size - int fFileVbufSize; // Internal file system buffer size - long long fMaxErrors; // Max allowable errors per job + int fNoOfBuffers{-1}; // Number of read buffers + int fBufferSize{-1}; // Read buffer size + int fFileVbufSize{-1}; // Internal file system buffer size + long long fMaxErrors{-1}; // Max allowable errors per job std::string fAlternateImportDir; // Alternate bulk import directory std::string fErrorDir; // Opt. where error records record std::string fProcessName; // Application process name static std::vector> fTableInfo; // Vector of Table information - int fNoOfParseThreads; // Number of parse threads - int fNoOfReadThreads; // Number of read threads + int fNoOfParseThreads{3}; // Number of parse threads + int fNoOfReadThreads{1}; // Number of read threads boost::thread_group fReadThreads; // Read thread group boost::thread_group fParseThreads; // Parse thread group boost::mutex fReadMutex; // Manages table selection by each - // read thread + // read thread boost::mutex fParseMutex; // Manages table/buffer/column - // selection by each parsing thread - BRM::TxnID fTxnID; // TransID acquired from SessionMgr - bool fKeepRbMetaFiles; // Keep/delete bulkRB metadata files - bool fNullStringMode; // Treat "NULL" as NULL value - char fEnclosedByChar; // Char used to enclose column value - char fEscapeChar; // Escape char within enclosed value - timeval fStartTime; // job start time - timeval fEndTime; // job end time - double fTotalTime; // elapsed time for current phase - std::vector fCmdLineImportFiles; // Import Files from cmd line - BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3) - std::string fBRMRptFileName; // Name of distributed mode rpt file - bool fbTruncationAsError; // Treat string truncation as error - ImportDataMode fImportDataMode; // Importing text or binary data - bool fbContinue; // true when read and parse r running + // selection by each parsing thread + BRM::TxnID fTxnID; // TransID acquired from SessionMgr + bool fKeepRbMetaFiles{false}; // Keep/delete bulkRB metadata files + bool fNullStringMode{false}; // Treat "NULL" as NULL value + char fEnclosedByChar{0}; // Char used to enclose column value + char fEscapeChar{0}; // Escape char within enclosed value + size_t fSkipRows{0}; // Header rows to skip + timeval fStartTime{0, 0}; // job start time + timeval fEndTime{0, 0}; // job end time + double fTotalTime{0.0}; // elapsed time for current phase + std::vector fCmdLineImportFiles; // Import Files from cmd line + BulkModeType fBulkMode{BULK_MODE_LOCAL}; // Distributed bulk mode (1,2, or 3) + std::string fBRMRptFileName; // Name of distributed mode rpt file + bool fbTruncationAsError{false}; // Treat string truncation as error + ImportDataMode fImportDataMode{IMPORT_DATA_TEXT}; // Importing text or binary data + bool fbContinue{false}; // true when read and parse r running // static boost::mutex* fDDLMutex; // Insure only 1 DDL op at a time - EXPORT static const std::string DIR_BULK_JOB; // Bulk job directory - EXPORT static const std::string DIR_BULK_TEMP_JOB; // Dir for tmp job files + static const std::string DIR_BULK_JOB; // Bulk job directory + static const std::string DIR_BULK_TEMP_JOB; // Dir for tmp job files static const std::string DIR_BULK_IMPORT; // Bulk job import dir static const std::string DIR_BULK_LOG; // Bulk job log directory - bool fDisableTimeOut; // disable timeout when waiting for table lock - boost::uuids::uuid fUUID; // job UUID + bool fDisableTimeOut{false}; // disable timeout when waiting for table lock + boost::uuids::uuid fUUID{boost::uuids::nil_generator()()}; // job UUID static bool fNoConsoleOutput; // disable output to console - long fTimeZone; // Timezone offset (in seconds) relative to UTC, + long fTimeZone{dataconvert::systemTimeZoneOffset()};// Timezone offset (in seconds) relative to UTC, // to use for TIMESTAMP data type. For example, // for EST which is UTC-5:00, offset will be -18000s. std::string fS3Key; // S3 Key @@ -239,7 +236,7 @@ class BulkLoad : public FileOp std::string fS3Host; // S3 Host std::string fS3Bucket; // S3 Bucket std::string fS3Region; // S3 Region - std::string fUsername; // data files owner name mysql by default + std::string fUsername{"mysql"}; // data files owner name mysql by default //-------------------------------------------------------------------------- // Private Functions @@ -417,6 +414,11 @@ inline void BulkLoad::setEscapeChar(char esChar) fEscapeChar = esChar; } +inline void BulkLoad::setSkipRows(size_t skipRows) +{ + fSkipRows = skipRows; +} + inline void BulkLoad::setImportDataMode(ImportDataMode importMode) { fImportDataMode = importMode; diff --git a/writeengine/bulk/we_bulkloadbuffer.cpp b/writeengine/bulk/we_bulkloadbuffer.cpp index 8b74b8c15..cd43d6cd0 100644 --- a/writeengine/bulk/we_bulkloadbuffer.cpp +++ b/writeengine/bulk/we_bulkloadbuffer.cpp @@ -2047,8 +2047,8 @@ int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, int tokenPos, RID s } int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length, - size_t* parse_length, RID& totalReadRows, RID& correctTotalRows, - const boost::ptr_vector& columnsInfo, + size_t* parse_length, size_t& skipRows, RID& totalReadRows, + RID& correctTotalRows, const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall) { boost::mutex::scoped_lock lock(fSyncUpdatesBLB); @@ -2119,7 +2119,7 @@ int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const ch if (fImportDataMode == IMPORT_DATA_TEXT) { - tokenize(columnsInfo, allowedErrCntThisCall); + tokenize(columnsInfo, allowedErrCntThisCall, skipRows); } else { @@ -2150,8 +2150,9 @@ int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const ch // correctTotalRows (input/output) - total valid row count from tokenize() // (cumulative) //------------------------------------------------------------------------------ -int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalReadRows, - RID& correctTotalRows, const boost::ptr_vector& columnsInfo, +int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows, + RID& totalReadRows, RID& correctTotalRows, + const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall) { boost::mutex::scoped_lock lock(fSyncUpdatesBLB); @@ -2164,10 +2165,10 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand { memcpy(fData, fOverflowBuf, fOverflowSize); - if (fOverflowBuf != NULL) + if (fOverflowBuf != nullptr) { delete[] fOverflowBuf; - fOverflowBuf = NULL; + fOverflowBuf = nullptr; } } @@ -2219,7 +2220,7 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand if (fImportDataMode == IMPORT_DATA_TEXT) { - tokenize(columnsInfo, allowedErrCntThisCall); + tokenize(columnsInfo, allowedErrCntThisCall, skipRows); } else { @@ -2276,7 +2277,7 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand // depending on whether the user has enabled the "enclosed by" feature. //------------------------------------------------------------------------------ void BulkLoadBuffer::tokenize(const boost::ptr_vector& columnsInfo, - unsigned int allowedErrCntThisCall) + unsigned int allowedErrCntThisCall, size_t& skipRows) { unsigned offset = 0; // length of field unsigned curCol = 0; // dest db column counter within a row @@ -2334,6 +2335,15 @@ void BulkLoadBuffer::tokenize(const boost::ptr_vector& columnsInfo, while (p < pEndOfData) { c = *p; + if (UNLIKELY(skipRows > 0)) + { + if (c == NEWLINE_CHAR) + { + --skipRows; + } + ++p; + continue; + } // If we have stripped "enclosed" characters, then save raw data if (rawDataRowLength > 0) diff --git a/writeengine/bulk/we_bulkloadbuffer.h b/writeengine/bulk/we_bulkloadbuffer.h index 509c725c7..475c98663 100644 --- a/writeengine/bulk/we_bulkloadbuffer.h +++ b/writeengine/bulk/we_bulkloadbuffer.h @@ -215,7 +215,8 @@ class BulkLoadBuffer /** @brief tokenize the buffer contents and fill up the token array. */ - void tokenize(const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall); + void tokenize(const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall, + size_t& skipRows); /** @brief Binary tokenization of the buffer, and fill up the token array. */ @@ -273,13 +274,14 @@ class BulkLoadBuffer bool tryAndLockColumn(const int& columnId, const int& id); int fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length, - size_t* parse_length, RID& totalReadRows, RID& correctTotalRows, + size_t* parse_length, size_t& skipRows, RID& totalReadRows, RID& correctTotalRows, const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall); /** @brief Read the table data into the buffer */ - int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalRows, RID& correctTotalRows, - const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall); + int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows, RID& totalRows, + RID& correctTotalRows, const boost::ptr_vector& columnsInfo, + unsigned int allowedErrCntThisCall); /** @brief Get the overflow size */ diff --git a/writeengine/bulk/we_cmdargs.cpp b/writeengine/bulk/we_cmdargs.cpp new file mode 100644 index 000000000..1989f0719 --- /dev/null +++ b/writeengine/bulk/we_cmdargs.cpp @@ -0,0 +1,559 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + Copyright (C) 2016 MariaDB Corporation + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "we_simplesyslog.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +namespace po = boost::program_options; +using namespace std; + +#include +#include +#include +#include + +#include "dataconvert.h" +#include "liboamcpp.h" +using namespace oam; + +#include "we_cmdargs.h" + +#include "mcsconfig.h" + +namespace WriteEngine +{ +//---------------------------------------------------------------------- +//---------------------------------------------------------------------- +WECmdArgs::WECmdArgs(int argc, char** argv) +{ + try + { + fOptions = std::make_unique(); + fVisibleOptions = std::make_unique(); +#define DECLARE_INT_ARG(name, stor, min, max, desc) \ + (name,\ + po::value(&stor)\ + ->notifier([this](auto&& value) { checkIntArg(name, min, max, value); }),\ + desc) + + fVisibleOptions->add_options() + ("help,h", "Print this message.") + DECLARE_INT_ARG("read-buffer,b", fIOReadBufSize, 1, INT_MAX, "Number of read buffers.") + DECLARE_INT_ARG("read-buffer-size,c", fReadBufSize, 1, INT_MAX, + "Application read buffer size (in bytes)") + DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message") + DECLARE_INT_ARG("max-errors,e", fMaxErrors, 0, INT_MAX, + "Maximum number of allowable error per table per PM") + ("file-path,f", po::value(&fPmFilePath), + "Data file directory path. Default is current working directory.\n" + "\tIn Mode 1, represents the local input file path.\n" + "\tIn Mode 2, represents the PM based input file path.\n" + "\tIn Mode 3, represents the local input file path.") + DECLARE_INT_ARG("mode,m", fArgMode, 1, 3, + "\t1 - rows will be loaded in a distributed manner acress PMs.\n" + "\t2 - PM based input files loaded into their respective PM.\n" + "\t3 - input files will be loaded on the local PM.") + ("filename,l", po::value(&fPmFile), + "Name of import file to be loaded, relative to 'file-path'") + ("console-log,i", po::bool_switch(&fConsoleLog), + "Print extended info to console in Mode 3.") + ("job-id,j", po::value(), + "Job ID. In simple usage, default is the table OID unless a fully qualified input " + "file name is given.") + ("null-strings,n", po::value(&fNullStrMode)->implicit_value(true), + "NullOption (0-treat the string NULL as data (default);\n" + "1-treat the string NULL as a NULL value)") + ("xml-job-path,p", po::value(&fJobPath), "Path for the XML job description file.") + DECLARE_INT_ARG("readers,r", fNoOfReadThrds, 1, INT_MAX, "Number of readers.") + ("separator,s", po::value(), "Delimiter between column values.") + DECLARE_INT_ARG("io-buffer-size,B", fSetBufSize, 1, INT_MAX, + "I/O library read buffer size (in bytes)") + DECLARE_INT_ARG("writers,w", fNoOfWriteThrds, 1, INT_MAX, "Number of parsers.") + ("enclosed-by,E", po::value(&fEnclosedChar), + "Enclosed by character if field values are enclosed.") + ("escape-char,C", po::value(&fEscChar)->default_value('\\'), + "Escape character used in conjunction with 'enclosed-by'" + "character, or as a part of NULL escape sequence ('\\N');\n" + "default is '\\'") + ("headers,O", + po::value(&fSkipRows)->implicit_value(1) + ->notifier([this](auto&& value) { checkIntArg("headers,O", 0, INT_MAX, value); }), + "Number of header rows to skip.") + ("binary-mode,I", po::value(), + "Import binary data; how to treat NULL values:\n" + "\t1 - import NULL values\n" + "\t2 - saturate NULL values\n") + ("calling-module,P", po::value(&fModuleIDandPID), "Calling module ID and PID.") + ("truncation-as-error,S", po::bool_switch(&fbTruncationAsError), + "Treat string truncations as errors.") + ("tz,T", po::value(), + "Timezone used for TIMESTAMP datatype. Possible values:\n" + "\t\"SYSTEM\" (default)\n" + "\tOffset in the form +/-HH:MM") + ("disable-tablelock-timeout,D", po::bool_switch(&fDisableTableLockTimeOut), + "Disable timeout when waiting for table lock.") + ("silent,N", po::bool_switch(&fSilent), "Disable console output.") + ("s3-key,y", po::value(&fS3Key), + "S3 Authentication Key (for S3 imports)") + ("s3-secret,K", po::value(&fS3Secret), + "S3 Authentication Secret (for S3 imports)") + ("s3-bucket,t", po::value(&fS3Bucket), + "S3 Bucket (for S3 imports)") + ("s3-hostname,H", po::value(&fS3Host), + "S3 Hostname (for S3 imports, Amazon's S3 default)") + ("s3-region,g", po::value(&fS3Region), + "S3 Region (for S3 imports)") + ("errors-dir,L", po::value(&fErrorDir)->default_value(MCSLOGDIR), + "Directory for the output .err and .bad files") + ("job-uuid,u", po::value(&fUUID), "import job UUID") + ("username,U", po::value(&fUsername), "Username of the files owner.") + ("dbname", po::value(), "Name of the database to load") + ("table", po::value(), "Name of table to load") + ("load-file", po::value(), + "Optional input file name in current directory, " + "unless a fully qualified name is given. If not given, input read from STDIN."); + + po::options_description hidden("Hidden options"); + hidden.add_options() + ("keep-rollback-metadata,k", po::bool_switch(&fKeepRollbackMetaData), + "Keep rollback metadata.") + ("report-file,R", po::value(&fReportFilename), "Report file name.") + ("allow-missing-columns,X", po::value(), "Allow missing columns."); + + fOptions->add(*fVisibleOptions).add(hidden); + +#undef DECLARE_INT_ARG + parseCmdLineArgs(argc, argv); + } + catch (std::exception& exp) + { + startupError(exp.what(), true); + } +} + +WECmdArgs::~WECmdArgs() = default; + +//---------------------------------------------------------------------- + +void WECmdArgs::checkIntArg(const std::string& name, long min, long max, int value) const +{ + if (value < min || value > max) + { + ostringstream oss; + oss << "Argument " << name << " is out of range [" << min << ", " << max << "]"; + startupError(oss.str(), true); + } +} + +//---------------------------------------------------------------------- + +void WECmdArgs::usage() const +{ + cout << endl + << "Simple usage using positional parameters " + "(no XML job file):" + << endl + << " " << fPrgmName << " dbName tblName [loadFile] [-j jobID] " << endl + << " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl + << " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl + << " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] " + "[-d debugLevel] [-i] " + << endl + << " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl + << " [-U username]" << endl + << endl; + + cout << endl + << "Traditional usage without positional parameters " + "(XML job file required):" + << endl + << " " << fPrgmName << " -j jobID " << endl + << " [-h] [-r readers] [-w parsers] [-s c] [-f path] [-b readBufs] " << endl + << " [-c readBufSize] [-e maxErrs] [-B libBufSize] [-n NullOption] " << endl + << " [-E encloseChar] [-C escapeChar] [-I binaryOpt] [-S] " + "[-d debugLevel] [-i] " + << endl + << " [-p path] [-l loadFile]" << endl + << " [-D] [-N] [-L rejectDir] [-T timeZone]" << endl + << " [-U username]" << endl + << endl; + + cout << "\n\n" << (*fVisibleOptions) << endl; + + cout << " Example1:" << endl + << " " << fPrgmName << " -j 1234" << endl + << " Example2: Some column values are enclosed within double quotes." << endl + << " " << fPrgmName << " -j 3000 -E '\"'" << endl + << " Example3: Import a nation table without a Job XML file" << endl + << " " << fPrgmName << " -j 301 tpch nation nation.tbl" << endl; + + exit(1); +} + +//----------------------------------------------------------------------------- + +void WECmdArgs::parseCmdLineArgs(int argc, char** argv) +{ + std::string importPath; + + if (argc > 0) + fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; // argv[0] is splitter but we need cpimport + + po::positional_options_description pos_opt; + pos_opt.add("dbname", 1) + .add("table", 1) + .add("load-file", 1); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv).options(*fOptions).positional(pos_opt).run(), vm); + po::notify(vm); + + if (vm.contains("help")) + { + fHelp = true; + usage(); + return; + } + if (vm.contains("separator")) + { + auto value = vm["separator"].as(); + if (value == "\\t") + { + fColDelim = '\t'; + } + else + { + fColDelim = value[0]; + } + } + if (vm.contains("binary-mode")) + { + int value = vm["binary-mode"].as(); + if (value == 1) + { + fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL; + } + else if (value == 2) + { + fImportDataMode = IMPORT_DATA_BIN_SAT_NULL; + } + else + { + startupError("Invalid Binary mode; value can be 1 or 2"); + } + } + if (vm.contains("tz")) + { + auto tz = vm["tz"].as(); + long offset; + if (tz != "SYSTEM" && dataconvert::timeZoneToOffset(tz.c_str(), tz.size(), &offset)) + { + startupError("Value for option --tz/-T is invalid"); + } + fTimeZone = tz; + } + if (vm.contains("job-id")) + { + errno = 0; + string optarg = vm["job-id"].as(); + long lValue = strtol(optarg.c_str(), nullptr, 10); + if (errno != 0 || lValue < 0 || lValue > INT_MAX) + { + startupError("Option --job-id/-j is invalid or outof range"); + } + fJobId = optarg; + fOrigJobId = fJobId; + + if (0 == fJobId.length()) + { + startupError("Wrong JobID Value"); + } + } + if (vm.contains("allow-missing-columns")) + { + if (vm["allow-missing-columns"].as() == "AllowMissingColumn") + { + fAllowMissingColumn = true; + } + } + + if (fArgMode != -1) + fMode = fArgMode; // BUG 4210 + + if (2 == fArgMode && fPmFilePath.empty()) + throw runtime_error("-f option is mandatory with mode 2."); + + if (vm.contains("dbname")) + { + fSchema = vm["dbname"].as(); + } + if (vm.contains("table")) + { + fTable = vm["table"].as(); + } + if (vm.contains("load-file")) + { + fLocFile = vm["load-file"].as(); + } +} + +void WECmdArgs::fillParams(BulkLoad& curJob, std::string& sJobIdStr, std::string& sXMLJobDir, + std::string& sModuleIDandPID, bool& bLogInfo2ToConsole, std::string& xmlGenSchema, + std::string& xmlGenTable, bool& bValidateColumnList) +{ + std::string importPath; + std::string rptFileName; + bool bImportFileArg = false; + BulkModeType bulkMode = BULK_MODE_LOCAL; + std::string jobUUID; + + curJob.setReadBufferCount(fIOReadBufSize); + curJob.setReadBufferSize(fReadBufSize); + if (fMaxErrors >= 0) + { + curJob.setMaxErrorCount(fMaxErrors); + } + if (!fPmFilePath.empty()) + { + importPath = fPmFilePath; + string setAltErrMsg; + if (curJob.setAlternateImportDir(importPath, setAltErrMsg) != NO_ERROR) + { + startupError(setAltErrMsg, false); + } + } + bLogInfo2ToConsole = fConsoleLog; + sJobIdStr = fJobId; + curJob.setKeepRbMetaFiles(fKeepRollbackMetaData); + bulkMode = static_cast(fMode); + curJob.setNullStringMode(fNullStrMode); + sXMLJobDir = fJobPath; + curJob.setNoOfReadThreads(fNoOfReadThrds); + curJob.setColDelimiter(fColDelim); + curJob.setJobUUID(fUUID); + curJob.setNoOfParseThreads(fNoOfWriteThrds); + curJob.setVbufReadSize(fReadBufSize); + if (fEscChar != -1) + { + curJob.setEscapeChar(fEscChar); + } + if (fEnclosedChar != -1) + { + curJob.setEnclosedByChar(fEnclosedChar); + } + curJob.setImportDataMode(fImportDataMode); + curJob.setErrorDir(fErrorDir); + sModuleIDandPID = fModuleIDandPID; + rptFileName = fReportFilename; + curJob.setTruncationAsError(fbTruncationAsError); + if (!fTimeZone.empty()) + { + long offset; + if (dataconvert::timeZoneToOffset(fTimeZone.c_str(), fTimeZone.size(), &offset)) + { + startupError("Invalid timezone specified"); + } + curJob.setTimeZone(offset); + } + bValidateColumnList = !fAllowMissingColumn; + curJob.disableTimeOut(fDisableTableLockTimeOut); + curJob.disableConsoleOutput(fSilent); + curJob.setS3Key(fS3Key); + curJob.setS3Bucket(fS3Bucket); + curJob.setS3Secret(fS3Secret); + curJob.setS3Region(fS3Region); + curJob.setS3Host(fS3Host); + if (!fUsername.empty()) + { + curJob.setUsername(fUsername); + } + curJob.setSkipRows(fSkipRows); + + curJob.setDefaultJobUUID(); + + // Inconsistent to specify -f STDIN with -l importFile + if (bImportFileArg && importPath == "STDIN") + { + startupError(std::string("-f STDIN is invalid with -l importFile."), true); + } + + // If distributed mode, make sure report filename is specified and that we + // can create the file using the specified path. + if (bulkMode == BULK_MODE_REMOTE_SINGLE_SRC || bulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC) + { + if (rptFileName.empty()) + { + startupError(std::string("Bulk modes 1 and 2 require -R rptFileName."), true); + } + else + { + std::ofstream rptFile(rptFileName.c_str()); + + if (rptFile.fail()) + { + std::ostringstream oss; + oss << "Unable to open report file " << rptFileName; + startupError(oss.str(), false); + } + + rptFile.close(); + } + + curJob.setBulkLoadMode(bulkMode, rptFileName); + } + + // Get positional arguments, User can provide: + // 1. no positional parameters + // 2. Two positional parameters (schema and table names) + // 3. Three positional parameters (schema, table, and import file name) + if (!fSchema.empty()) + { + xmlGenSchema = fSchema; + + if (!fTable.empty()) + { + // Validate invalid options in conjunction with 2-3 positional + // parameter mode, which means we are using temp Job XML file. + if (bImportFileArg) + { + startupError(std::string("-l importFile is invalid with positional parameters"), true); + } + + if (!sXMLJobDir.empty()) + { + startupError(std::string("-p path is invalid with positional parameters."), true); + } + + if (importPath == "STDIN") + { + startupError(std::string("-f STDIN is invalid with positional parameters."), true); + } + + xmlGenTable = fTable; + + if (!fLocFile.empty()) + { + // 3rd pos parm + curJob.addToCmdLineImportFileList(fLocFile); + + // Default to CWD if loadfile name given w/o -f path + if (importPath.empty()) + { + std::string setAltErrMsg; + + if (curJob.setAlternateImportDir(std::string("."), setAltErrMsg) != NO_ERROR) + startupError(setAltErrMsg, false); + } + } + else + { + // Invalid to specify -f if no load file name given + if (!importPath.empty()) + { + startupError(std::string("-f requires 3rd positional parameter (load file name)."), true); + } + + // Default to STDIN if no import file name given + std::string setAltErrMsg; + + if (curJob.setAlternateImportDir(std::string("STDIN"), setAltErrMsg) != NO_ERROR) + startupError(setAltErrMsg, false); + } + } + else + { + startupError(std::string("No table name specified with schema."), true); + } + } + else + { + // JobID is a required parameter with no positional parm mode, + // because we need the jobid to identify the input job xml file. + if (sJobIdStr.empty()) + { + startupError(std::string("No JobID specified."), true); + } + } + + // Dump some configuration info + if (!fSilent) + { + if (fDebugLvl != 0) + { + cout << "Debug level is set to " << fDebugLvl << endl; + } + if (fNoOfReadThrds != 0) + { + cout << "number of read threads : " << fNoOfReadThrds << endl; + } + cout << "Column delimiter : " << (fColDelim == '\t' ? "\\t" : string{fColDelim}) << endl; + if (fNoOfWriteThrds != 0) + { + cout << "number of parse threads : " << fNoOfWriteThrds << endl; + } + if (fEscChar != 0) + { + cout << "Escape Character : " << fEscChar << endl; + } + if (fEnclosedChar != 0) + { + cout << "Enclosed by Character : " << fEnclosedChar << endl; + } + } +} + +void WECmdArgs::startupError(const std::string& errMsg, bool showHint) const +{ + BRMWrapper::getInstance()->finishCpimportJob(fCpimportJobId); + // Log to console + if (!BulkLoad::disableConsoleOutput()) + cerr << errMsg << endl; + + if (showHint && !fSilent) + { + cerr << "Try '" << fPrgmName << " -h' for more information." << endl; + } + + // Log to syslog + logging::Message::Args errMsgArgs; + errMsgArgs.add(errMsg); + SimpleSysLog::instance()->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0087); + + std::string jobIdStr("0"); + logging::Message::Args endMsgArgs; + endMsgArgs.add(jobIdStr); + endMsgArgs.add("FAILED"); + SimpleSysLog::instance()->logMsg(endMsgArgs, logging::LOG_TYPE_INFO, logging::M0082); + + exit(EXIT_FAILURE); +} + +} /* namespace WriteEngine */ diff --git a/writeengine/bulk/we_cmdargs.h b/writeengine/bulk/we_cmdargs.h new file mode 100644 index 000000000..5446fdc61 --- /dev/null +++ b/writeengine/bulk/we_cmdargs.h @@ -0,0 +1,130 @@ +/* Copyright (C) 2014 InfiniDB, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/******************************************************************************* + * $Id$ + * + *******************************************************************************/ +#pragma once + +#include +#include +#include "we_bulkload.h" + +#include "we_type.h" + +namespace boost::program_options +{ +class options_description; +} + +namespace WriteEngine +{ +class WECmdArgs +{ +public: + WECmdArgs(int argc, char** argv); + ~WECmdArgs(); + + using VecInts = std::vector; + using VecArgs = std::vector; + + void parseCmdLineArgs(int argc, char** argv); + void usage() const; + bool checkForCornerCases(); + + void startupError(const std::string& errMsg, bool showHint = false) const; + void fillParams(BulkLoad& curJob, std::string& sJobIdStr, + std::string& sXMLJobDir, std::string& sModuleIDandPID, bool& bLogInfo2ToConsole, + std::string& xmlGenSchema, std::string& xmlGenTable, bool& bValidateColumnList); + + void setCpimportJobId(uint32_t cpimportJobId) + { + fCpimportJobId = cpimportJobId; + } + +private: + void checkIntArg(const std::string& name, long min, long max, int value) const; + VecArgs fVecArgs; + VecInts fPmVec; + + VecArgs fVecJobFiles; // JobFiles splitter from master JobFile + int fMultiTableCount{0}; // MultiTable count + VecArgs fColFldsFromJobFile; // List of columns from any job file, that + // represent fields in the import data + + std::string fJobId; // JobID + std::string fOrigJobId; // Original JobID, in case we have to split it + bool fJobLogOnly{false}; // Job number is only for log filename only + bool fHelp{false}; // Help mode + int fMode{BULK_MODE_LOCAL}; // splitter Mode + int fArgMode{-1}; // Argument mode, dep. on this fMode is decided. + bool fQuiteMode{true}; // in quite mode or not + bool fConsoleLog{false}; // Log everything to console - w.r.t cpimport + std::string fPmFile; // FileName at PM + std::string fPmFilePath; // Path of input file in PM + std::string fLocFile; // Local file name + std::string fBrmRptFile; // BRM report file + std::string fJobPath; // Path to Job File + std::string fTmpFileDir; // Temp file directory. + std::string fBulkRoot; // Bulk Root path + std::string fJobFile; // Job File Name + std::string fS3Key; // S3 key + std::string fS3Secret; // S3 Secret + std::string fS3Bucket; // S3 Bucket + std::string fS3Host; // S3 Host + std::string fS3Region; // S3 Region + + int fNoOfReadThrds{1}; // No. of read buffers + int fDebugLvl{0}; // Debug level + int fMaxErrors{-1}; // Max allowable errors + int fReadBufSize{-1}; // Read buffer size + int fIOReadBufSize{-1}; // I/O read buffer size + int fSetBufSize{0}; // Buff size w/setvbuf + char fColDelim{0}; // column delimiter + char fEnclosedChar{0}; // enclosed by char + char fEscChar{0}; // esc char + int fSkipRows{0}; // skip header + int fNoOfWriteThrds{3}; // No. of write threads + bool fNullStrMode{false}; // set null string mode - treat null as null + ImportDataMode fImportDataMode{IMPORT_DATA_TEXT}; // Importing text or binary data + std::string fPrgmName; // argv[0] + std::string fSchema; // Schema name - positional parmater + std::string fTable; // Table name - table name parameter + + bool fBlockMode3{false}; // Do not allow Mode 3 + bool fbTruncationAsError{false}; // Treat string truncation as error + std::string fUUID{boost::uuids::to_string(boost::uuids::nil_generator()())}; + bool fConsoleOutput{true}; // If false, no output to console. + std::string fTimeZone{"SYSTEM"}; // Timezone to use for TIMESTAMP datatype + std::string fUsername; // Username of the data files owner + std::string fErrorDir{MCSLOGDIR "/cpimport"}; + bool fDisableTableLockTimeOut{false}; + bool fSilent{false}; + std::string fModuleIDandPID; + + std::string fReportFilename; + bool fKeepRollbackMetaData{false}; + bool fAllowMissingColumn{false}; + + uint32_t fCpimportJobId{}; + + std::unique_ptr fOptions; + std::unique_ptr fVisibleOptions; +}; + +} // namespace WriteEngine diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 6b23fa26c..1df5b752b 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -145,6 +145,8 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, const string& processN , fNullStringMode(false) , fEnclosedByChar('\0') , fEscapeChar('\\') + , fSkipRows(0) + , fSkipRowsCur(0) , fProcessingBegun(false) , fBulkMode(BULK_MODE_LOCAL) , fBRMReporter(logger, tableName) @@ -269,7 +271,7 @@ int TableInfo::readTableData() int fileCounter = 0; unsigned long long qtSentAt = 0; - if (fHandle == NULL) + if (fHandle == nullptr) { fFileName = fLoadFileList[fileCounter]; int rc = openTableFile(); @@ -421,13 +423,14 @@ int TableInfo::readTableData() if (fReadFromS3) { readRc = fBuffers[readBufNo].fillFromMemory(fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength, - &fS3ParseLength, totalRowsPerInputFile, validTotalRows, - fColumns, allowedErrCntThisCall); + &fS3ParseLength, fSkipRowsCur, totalRowsPerInputFile, + validTotalRows, fColumns, allowedErrCntThisCall); } else { - readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile, - validTotalRows, fColumns, allowedErrCntThisCall); + readRc = fBuffers[readBufNo].fillFromFile(fBuffers[prevReadBuf], fHandle, fSkipRowsCur, + totalRowsPerInputFile, validTotalRows, fColumns, + allowedErrCntThisCall); } if (readRc != NO_ERROR) @@ -1208,7 +1211,6 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const int TableInfo::initializeBuffers(int noOfBuffers, const JobFieldRefList& jobFieldRefList, unsigned int fixedBinaryRecLen) { - fReadBufCount = noOfBuffers; // initialize and populate the buffer vector. @@ -1258,7 +1260,7 @@ void TableInfo::addColumn(ColumnInfo* info) //------------------------------------------------------------------------------ int TableInfo::openTableFile() { - if (fHandle != NULL) + if (fHandle != nullptr) return NO_ERROR; if (fReadFromStdin) @@ -1322,6 +1324,8 @@ int TableInfo::openTableFile() fLog->logMsg(oss.str(), MSGLVL_INFO2); } + fSkipRowsCur = fSkipRows; + return NO_ERROR; } diff --git a/writeengine/bulk/we_tableinfo.h b/writeengine/bulk/we_tableinfo.h index 996661cf9..3d4e836a8 100644 --- a/writeengine/bulk/we_tableinfo.h +++ b/writeengine/bulk/we_tableinfo.h @@ -148,8 +148,9 @@ class TableInfo : public WeUIDGID size_t fS3ParseLength; bool fNullStringMode; // Treat "NULL" as a null value char fEnclosedByChar; // Character to enclose col values - char fEscapeChar; // Escape character used in conjunc- - // tion with fEnclosedByChar + char fEscapeChar; // Escape character used in conjunction with fEnclosedByChar + size_t fSkipRows; // Header rows to skip + size_t fSkipRowsCur; // Header rows left oto skip in the current file bool fProcessingBegun; // Has processing begun on this tbl BulkModeType fBulkMode; // Distributed bulk mode (1,2, or 3) std::string fBRMRptFileName; // Name of distributed mode rpt file @@ -334,6 +335,10 @@ class TableInfo : public WeUIDGID */ void setEscapeChar(char esChar); + /** @brief Set how many header rows should be skipped. + */ + void setSkipRows(size_t skipRows); + /** @brief Has processing begun for this table. */ bool hasProcessingBegun(); @@ -579,6 +584,12 @@ inline void TableInfo::setEscapeChar(char esChar) fEscapeChar = esChar; } +inline void TableInfo::setSkipRows(size_t skipRows) +{ + fSkipRows = skipRows; +} + + inline void TableInfo::setFileBufferSize(const int fileBufSize) { fFileBufSize = fileBufSize; diff --git a/writeengine/server/we_dataloader.cpp b/writeengine/server/we_dataloader.cpp index 022963db5..16d648fd6 100644 --- a/writeengine/server/we_dataloader.cpp +++ b/writeengine/server/we_dataloader.cpp @@ -239,12 +239,13 @@ bool WEDataLoader::setupCpimport() // fork the cpimport std::string aCmdLine = fCmdLineStr; std::istringstream ss(aCmdLine); std::string arg; - std::vector v2(20, ""); + std::vector v2; unsigned int i = 0; while (ss >> arg) { - v2[i++] = arg; + v2.push_back(arg); + i++; } for (unsigned int j = 0; j < i; ++j) diff --git a/writeengine/shared/we_type.h b/writeengine/shared/we_type.h index 4b74c0efd..faef43018 100644 --- a/writeengine/shared/we_type.h +++ b/writeengine/shared/we_type.h @@ -525,6 +525,7 @@ struct Job /** @brief Job Structure */ int numberOfReadBuffers; unsigned readBufferSize; unsigned writeBufferSize; + int fSkipRows; Job() : id(0) , fDelimiter('|') @@ -533,6 +534,7 @@ struct Job /** @brief Job Structure */ , numberOfReadBuffers(0) , readBufferSize(0) , writeBufferSize(0) + , fSkipRows(0) { } }; diff --git a/writeengine/splitter/CMakeLists.txt b/writeengine/splitter/CMakeLists.txt index 96656adf9..7b05928f6 100644 --- a/writeengine/splitter/CMakeLists.txt +++ b/writeengine/splitter/CMakeLists.txt @@ -26,4 +26,5 @@ columnstore_link( batchloader threadpool marias3 + boost_program_options ) diff --git a/writeengine/splitter/we_cmdargs.cpp b/writeengine/splitter/we_cmdargs.cpp index 32d126346..34f67ccb7 100644 --- a/writeengine/splitter/we_cmdargs.cpp +++ b/writeengine/splitter/we_cmdargs.cpp @@ -29,6 +29,8 @@ #include #include #include +#include +namespace po = boost::program_options; using namespace std; #include @@ -50,38 +52,96 @@ namespace WriteEngine //---------------------------------------------------------------------- //---------------------------------------------------------------------- WECmdArgs::WECmdArgs(int argc, char** argv) - : fMultiTableCount(0) - , fJobLogOnly(false) - , fHelp(false) - , fMode(1) - , fArgMode(-1) - , fQuiteMode(true) - , fConsoleLog(false) - , fVerbose(0) - , fBatchQty(10000) - , fNoOfReadThrds(0) - , fDebugLvl(0) - , fMaxErrors(-1) - , fReadBufSize(0) - , fIOReadBufSize(0) - , fSetBufSize(0) - , fColDelim('|') - , fEnclosedChar(0) - , fEscChar(0) - , fNoOfWriteThrds(0) - , fNullStrMode(false) - , fImportDataMode(IMPORT_DATA_TEXT) - , fCpiInvoke(false) - , fBlockMode3(false) - , fbTruncationAsError(false) - , fUUID(boost::uuids::nil_generator()()) - , fConsoleOutput(true) - , fTimeZone("SYSTEM") - , fErrorDir(string(MCSLOGDIR) + "/cpimport/") { try { appTestFunction(); + fOptions = std::make_unique(); +#define DECLARE_INT_ARG(name, stor, min, max, desc) \ + (name,\ + po::value(&stor)\ + ->notifier([](auto&& value) { checkIntArg(name, min, max, value); }),\ + desc) + + fOptions->add_options() + ("help,h", "Print this message.") + DECLARE_INT_ARG("read-buffer,b", fIOReadBufSize, 1, INT_MAX, "Number of read buffers.") + DECLARE_INT_ARG("read-buffer-size,c", fReadBufSize, 1, INT_MAX, + "Application read buffer size (in bytes)") + DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message") + ("verbose,v", po::value()) + ("silent,N", po::bool_switch()) + DECLARE_INT_ARG("max-errors,e", fMaxErrors, 0, INT_MAX, + "Maximum number of allowable error per table per PM") + ("file-path,f", po::value(&fPmFilePath), + "Data file directory path. Default is current working directory.\n" + "\tIn Mode 1, represents the local input file path.\n" + "\tIn Mode 2, represents the PM based input file path.\n" + "\tIn Mode 3, represents the local input file path.") + DECLARE_INT_ARG("mode,m", fArgMode, 0, 3, + "\t1 - rows will be loaded in a distributed manner acress PMs.\n" + "\t2 - PM based input files loaded into their respective PM.\n" + "\t3 - input files will be loaded on the local PM.") + ("filename,l", po::value(&fPmFile), + "Name of import file to be loaded, relative to 'file-path'") + DECLARE_INT_ARG("batch-quantity,q", fBatchQty, 1, INT_MAX, + "Batch quantity, Number of rows distributed per batch in Mode 1") + ("console-log,i", po::bool_switch(&fConsoleLog), + "Print extended info to console in Mode 3.") + ("job-id,j", po::value(), + "Job ID. In simple usage, default is the table OID unless a fully qualified input " + "file name is given.") + ("null-strings,n", po::value(&fNullStrMode)->implicit_value(true), + "NullOption (0-treat the string NULL as data (default);\n" + "1-treat the string NULL as a NULL value)") + ("xml-job-path,p", po::value(&fJobPath), "Path for the XML job description file.") + DECLARE_INT_ARG("readers,r", fNoOfReadThrds, 1, INT_MAX, "Number of readers.") + ("separator,s", po::value(), "Delimiter between column values.") + DECLARE_INT_ARG("io-buffer-size,B", fSetBufSize, 1, INT_MAX, + "I/O library read buffer size (in bytes)") + DECLARE_INT_ARG("writers,w", fNoOfWriteThrds, 1, INT_MAX, "Number of parsers.") + ("enclosed-by,E", po::value(&fEnclosedChar), + "Enclosed by character if field values are enclosed.") + ("escape-char,C", po::value(&fEscChar)->default_value('\\'), + "Escape character used in conjunction with 'enclosed-by'" + "character, or as a part of NULL escape sequence ('\\N');\n" + "default is '\\'") + ("headers,O", + po::value(&fSkipRows)->implicit_value(1) + ->notifier([](auto&& value) { checkIntArg("headers,O", 0, INT_MAX, value); }), + "Number of header rows to skip.") + ("binary-mode,I", po::value(), + "Import binary data; how to treat NULL values:\n" + "\t1 - import NULL values\n" + "\t2 - saturate NULL values\n") + ("pm,P", po::value>(&fPmVec), + "List of PMs ex: -P 1,2,3. Default is all PMs.") + ("truncation-as-error,S", po::bool_switch(&fbTruncationAsError), + "Treat string truncations as errors.") + ("tz,T", po::value(), + "Timezone used for TIMESTAMP datatype. Possible values:\n" + "\t\"SYSTEM\" (default)\n" + "\tOffset in the form +/-HH:MM") + ("s3-key,y", po::value(&fS3Key), + "S3 Authentication Key (for S3 imports)") + ("s3-secret,K", po::value(&fS3Secret), + "S3 Authentication Secret (for S3 imports)") + ("s3-bucket,t", po::value(&fS3Bucket), + "S3 Bucket (for S3 imports)") + ("s3-hostname,H", po::value(&fS3Host), + "S3 Hostname (for S3 imports, Amazon's S3 default)") + ("s3-region,g", po::value(&fS3Region), + "S3 Region (for S3 imports)") + ("errors-dir,L", po::value(&fErrorDir)->default_value(MCSLOGDIR), + "Directory for the output .err and .bad files") + ("username,U", po::value(&fUsername), "Username of the files owner.") + ("dbname", po::value(), "Name of the database to load") + ("table", po::value(), "Name of table to load") + ("load-file", po::value(), + "Optional input file name in current directory, " + "unless a fully qualified name is given. If not given, input read from STDIN."); + +#undef DECLARE_INT_ARG parseCmdLineArgs(argc, argv); } catch (std::exception& exp) @@ -92,6 +152,8 @@ WECmdArgs::WECmdArgs(int argc, char** argv) } } +WECmdArgs::~WECmdArgs() = default; + //---------------------------------------------------------------------- void WECmdArgs::appTestFunction() @@ -107,8 +169,18 @@ void WECmdArgs::appTestFunction() return; } +void WECmdArgs::checkIntArg(const std::string& name, long min, long max, int value) +{ + if (value < min || value > max) + { + ostringstream oss; + oss << "Argument " << name << " is out of range [" << min << ", " << max << "]"; + throw runtime_error(oss.str()); + } +} + //---------------------------------------------------------------------- -std::string WECmdArgs::getCpImportCmdLine() +std::string WECmdArgs::getCpImportCmdLine(bool skipRows) { std::ostringstream aSS; std::string aCmdLine; @@ -185,6 +257,11 @@ std::string WECmdArgs::getCpImportCmdLine() if (fEscChar != 0) aSS << " -C " << fEscChar; + if (skipRows && fSkipRows) + { + aSS << " -O " << fSkipRows; + } + if (fNullStrMode) aSS << " -n " << '1'; @@ -321,6 +398,12 @@ bool WECmdArgs::checkForCornerCases() // BUG 4210 this->checkJobIdCase(); // Need to do this before we go further + if (fSkipRows && fImportDataMode != IMPORT_DATA_TEXT) + { + cout << "Invalid option -O with binary file" << endl; + throw runtime_error("Invalid option -O with binary file"); + } + if (fMode == 0) { if (!fJobId.empty()) @@ -522,52 +605,7 @@ void WECmdArgs::usage() cout << "\t\t\tunless a fully qualified name is given.\n"; cout << "\t\t\tIf not given, input read from STDIN.\n"; - cout << "\n\nOptions:\n" - << "\t-b\tNumber of read buffers\n" - << "\t-c\tApplication read buffer size(in bytes)\n" - << "\t-d\tPrint different level(1-3) debug message\n" - << "\t-e\tMax number of allowable error per table per PM\n" - << "\t-f\tData file directory path.\n" - << "\t\t\tDefault is current working directory.\n" - << "\t\t\tIn Mode 1, -f represents the local input file path.\n" - << "\t\t\tIn Mode 2, -f represents the PM based input file path.\n" - << "\t\t\tIn Mode 3, -f represents the local input file path.\n" - << "\t-l\tName of import file to be loaded, relative to -f path,\n" - << "\t-h\tPrint this message.\n" - << "\t-q\tBatch Quantity, Number of rows distributed per batch in Mode 1\n" - << "\t-i\tPrint extended info to console in Mode 3.\n" - << "\t-j\tJob ID. In simple usage, default is the table OID.\n" - << "\t\t\tunless a fully qualified input file name is given.\n" - << "\t-n\tNullOption (0-treat the string NULL as data (default);\n" - << "\t\t\t1-treat the string NULL as a NULL value)\n" - << "\t-p\tPath for XML job description file.\n" - << "\t-r\tNumber of readers.\n" - << "\t-s\t'c' is the delimiter between column values.\n" - << "\t-B\tI/O library read buffer size (in bytes)\n" - << "\t-w\tNumber of parsers.\n" - << "\t-E\tEnclosed by character if field values are enclosed.\n" - << "\t-C\tEscape character used in conjunction with 'enclosed by'\n" - << "\t\t\tcharacter, or as part of NULL escape sequence ('\\N');\n" - << "\t\t\tdefault is '\\'\n" - << "\t-I\tImport binary data; how to treat NULL values:\n" - << "\t\t\t1 - import NULL values\n" - << "\t\t\t2 - saturate NULL values\n" - << "\t-P\tList of PMs ex: -P 1,2,3. Default is all PMs.\n" - << "\t-S\tTreat string truncations as errors.\n" - << "\t-m\tmode\n" - << "\t\t\t1 - rows will be loaded in a distributed manner across PMs.\n" - << "\t\t\t2 - PM based input files loaded onto their respective PM.\n" - << "\t\t\t3 - input files will be loaded on the local PM.\n" - << "\t-T\tTimezone used for TIMESTAMP datatype.\n" - << "\t\tPossible values: \"SYSTEM\" (default)\n" - << "\t\t : Offset in the form +/-HH:MM\n" - << "\t-y\tS3 Authentication Key (for S3 imports)\n" - << "\t-K\tS3 Authentication Secret (for S3 imports)\n" - << "\t-t\tS3 Bucket (for S3 imports)\n" - << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n" - << "\t-g\tS3 Region (for S3 imports)\n" - << "\t-L\tDirectory for the output .err and .bad files.\n" - << "\t\tDefault is " << string(MCSLOGDIR); + cout << "\n\n" << (*fOptions) << endl; cout << "\nExample1: Traditional usage\n" << "\tcpimport -j 1234"; @@ -591,375 +629,112 @@ void WECmdArgs::usage() void WECmdArgs::parseCmdLineArgs(int argc, char** argv) { - int aCh; std::string importPath; bool aJobType = false; if (argc > 0) fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; // argv[0] is splitter but we need cpimport - while ((aCh = getopt(argc, argv, "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:Ny:K:t:H:g:U:L:")) != EOF) + po::positional_options_description pos_opt; + pos_opt.add("dbname", 1) + .add("table", 1) + .add("load-file", 1); + + po::variables_map vm; + po::store(po::command_line_parser(argc, argv).options(*fOptions).positional(pos_opt).run(), vm); + po::notify(vm); + + if (vm.contains("silent")) { - switch (aCh) + fConsoleOutput = !vm["silent"].as(); + } + if (vm.contains("help")) + { + fHelp = true; + usage(); + return; + } + if (vm.contains("separator")) + { + auto value = vm["separator"].as(); + if (value == "\\t") { - case 'm': + fColDelim = '\t'; + if (fDebugLvl) { - fArgMode = atoi(optarg); - - // cout << "Mode level set to " << fMode << endl; - if ((fArgMode > -1) && (fArgMode <= 3)) - { - } - else - throw runtime_error("Wrong Mode level"); - - break; + cout << "Column delimiter : \\t" << endl; } - - case 'B': + } + else + { + fColDelim = value[0]; + if (fDebugLvl) { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - throw runtime_error("Option -B is invalid or out of range"); - - fSetBufSize = lValue; - break; - } - - case 'b': - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - throw runtime_error("Option -b is invalid or out of range"); - - fIOReadBufSize = lValue; - break; - } - - case 'e': - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX)) - throw runtime_error("Option -e is invalid or out of range"); - - fMaxErrors = lValue; - break; - } - - case 'i': - { - fConsoleLog = true; - break; - } - - case 'c': - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - throw runtime_error("Option -c is invalid or out of range"); - - fReadBufSize = lValue; - break; - } - - case 'j': // -j: jobID - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 0) || (lValue > INT_MAX)) - throw runtime_error("Option -j is invalid or out of range"); - - fJobId = optarg; - fOrigJobId = fJobId; // in case if we need to split it. - - if (0 == fJobId.length()) - throw runtime_error("Wrong JobID Value"); - - aJobType = true; - break; - } - - case 'v': // verbose - { - string aVerbLen = optarg; - fVerbose = aVerbLen.length(); - fDebugLvl = fVerbose; - break; - } - - case 'd': // -d debug - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - throw runtime_error("Option -d is invalid or out of range"); - - fDebugLvl = lValue; - - if (fDebugLvl > 0 && fDebugLvl <= 3) - { - cout << "\nDebug level set to " << fDebugLvl << endl; - } - else - { - throw runtime_error("Wrong Debug level"); - } - - break; - } - - case 'r': // -r: num read threads - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - throw runtime_error("Option -r is invalid or out of range"); - - fNoOfReadThrds = lValue; - break; - } - - case 'w': // -w: num parse threads - { - errno = 0; - long lValue = strtol(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > INT_MAX)) - throw runtime_error("Option -w is invalid or out of range"); - - fNoOfWriteThrds = lValue; - break; - } - - case 's': // -s: column delimiter - { - if (!strcmp(optarg, "\\t")) - { - fColDelim = '\t'; - - if (fDebugLvl) - cout << "Column delimiter : " - << "\\t" << endl; - } - else - { - fColDelim = optarg[0]; - - if (fDebugLvl) - cout << "Column delimiter : " << fColDelim << endl; - } - - break; - } - - case 'l': // -l: if JobId (-j), it can be input file - { - fPmFile = optarg; - - if (0 == fPmFile.length()) - throw runtime_error("Wrong local filename"); - - break; - } - - case 'f': // -f: import file path - { - fPmFilePath = optarg; - break; - } - - case 'n': // -n: treat "NULL" as null - { - // default is 0, ie it is equal to not giving this option - int nullStringMode = atoi(optarg); - - if ((nullStringMode != 0) && (nullStringMode != 1)) - { - throw(runtime_error("Invalid NULL option; value can be 0 or 1")); - } - - if (nullStringMode) - fNullStrMode = true; - else - fNullStrMode = false; // This is default - - break; - } - - case 'P': // -p: list of PM's - { - try - { - std::string aPmList = optarg; - - if (!str2PmList(aPmList, fPmVec)) - throw(runtime_error("PM list is wrong")); - } - catch (runtime_error& ex) - { - throw(ex); - } - - break; - } - - case 'p': - { - fJobPath = optarg; - break; - } - - case 'E': // -E: enclosed by char - { - fEnclosedChar = optarg[0]; - // cout << "Enclosed by Character : " << optarg[0] << endl; - break; - } - - case 'C': // -C: enclosed escape char - { - fEscChar = optarg[0]; - // cout << "Escape Character : " << optarg[0] << endl; - break; - } - - case 'h': // -h: help - { - // usage(); // will exit(1) here - fHelp = true; - break; - } - - case 'I': // -I: binary mode (null handling) - { - // default is text mode, unless -I option is specified - int binaryMode = atoi(optarg); - - if (binaryMode == 1) - { - fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL; - } - else if (binaryMode == 2) - { - fImportDataMode = IMPORT_DATA_BIN_SAT_NULL; - } - else - { - throw(runtime_error("Invalid Binary mode; value can be 1 or 2")); - } - - break; - } - - case 'S': // -S: Treat string truncations as errors - { - setTruncationAsError(true); - // cout << "TruncationAsError : true" << endl; - break; - } - - case 'T': - { - std::string timeZone = optarg; - long offset; - - if (timeZone != "SYSTEM" && dataconvert::timeZoneToOffset(timeZone.c_str(), timeZone.size(), &offset)) - { - throw(runtime_error("Value for option -T is invalid")); - } - - fTimeZone = timeZone; - break; - } - - case 'q': // -q: batch quantity - default value is 10000 - { - errno = 0; - long long lValue = strtoll(optarg, 0, 10); - - if ((errno != 0) || (lValue < 1) || (lValue > UINT_MAX)) - throw runtime_error("Option -q is invalid or out of range"); - - fBatchQty = lValue; - - if (fBatchQty < 10000) - fBatchQty = 10000; - else if (fBatchQty > 100000) - fBatchQty = 10000; - - break; - } - - case 'N': //-N no console output - { - fConsoleOutput = false; - break; - } - - case 'y': //-y S3 Key - { - fS3Key = optarg; - break; - } - - case 'K': //-K S3 Secret - { - fS3Secret = optarg; - break; - } - - case 'H': //-H S3 Host - { - fS3Host = optarg; - break; - } - - case 't': //-t S3 bucket - { - fS3Bucket = optarg; - break; - } - - case 'g': //-g S3 Region - { - fS3Region = optarg; - break; - } - - case 'U': //-U username of the files owner - { - fUsername = optarg; - break; - } - - case 'L': // -L set the output location of .bad/.err files - { - fErrorDir = optarg; - break; - } - - default: - { - std::string aErr = std::string("Unknown command line option ") + std::to_string(aCh); - // cout << "Unknown command line option " << aCh << endl; - throw(runtime_error(aErr)); + cout << "Column delimiter : " << fColDelim << endl; } } } + if (vm.contains("binary-mode")) + { + int value = vm["binary-mode"].as(); + if (value == 1) + { + fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL; + } + else if (value == 2) + { + fImportDataMode = IMPORT_DATA_BIN_SAT_NULL; + } + else + { + throw runtime_error("Invalid Binary mode; value can be 1 or 2"); + } + } + if (vm.contains("tz")) + { + auto tz = vm["tz"].as(); + long offset; + if (tz != "SYSTEM" && dataconvert::timeZoneToOffset(tz.c_str(), tz.size(), &offset)) + { + throw runtime_error("Value for option --tz/-T is invalid"); + } + fTimeZone = tz; + } + if (vm.contains("job-id")) + { + errno = 0; + string optarg = vm["job-id"].as(); + long lValue = strtol(optarg.c_str(), nullptr, 10); + if (errno != 0 || lValue < 0 || lValue > INT_MAX) + { + throw runtime_error("Option --job-id/-j is invalid or out of range"); + } + fJobId = optarg; + fOrigJobId = fJobId; - if (fHelp) - usage(); // BUG 4210 + if (fJobId.empty()) + { + throw runtime_error("Wrong JobID Value"); + } + + aJobType = true; + } + if (vm.contains("verbose")) + { + string optarg = vm["verbose"].as(); + fVerbose = fDebugLvl = optarg.length(); + } + if (vm.contains("batch-quantity")) + { + if (fBatchQty < 10000) + { + fBatchQty = 10000; + } + else if (fBatchQty > 100000) + { + fBatchQty = 10000; + } + } if (fArgMode != -1) fMode = fArgMode; // BUG 4210 @@ -976,26 +751,23 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) if (0 == fArgMode) throw runtime_error("Incompatible mode and option types"); - if (optind < argc) + if (vm.contains("dbname")) { - fSchema = argv[optind]; // 1st pos parm - optind++; + fSchema = vm["dbname"].as(); - if (optind < argc) - { - fTable = argv[optind]; // 2nd pos parm - optind++; - } - else + + if (!vm.contains("table")) { // if schema is there, table name should be there throw runtime_error("No table name specified with schema."); } - if (optind < argc) // see if input file name is given + fTable = vm["table"].as(); // 2nd pos parm + + if (vm.contains("load-file")) // see if input file name is given { // 3rd pos parm - fLocFile = argv[optind]; + fLocFile = vm["load-file"].as(); if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN")) { @@ -1074,7 +846,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) // 1. no positional parameters - Mode 0 & stdin // 2. Two positional parameters (schema and table names) - Mode 1/2, stdin // 3. Three positional parameters (schema, table, and import file name) - else if (optind < argc) // see if db schema name is given + else if (vm.contains("dbname")) // see if db schema name is given { if (fArgMode == 0) { @@ -1088,13 +860,12 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) } else { - fLocFile = argv[optind]; - optind++; + fLocFile = vm["dbname"].as(); } - if (optind < argc) // dest filename provided + if (vm.contains("table")) // dest filename provided { - fPmFile = argv[optind]; + fPmFile = vm["table"].as(); if ((fPmFile.at(0) != '/') && (fS3Key.empty())) { @@ -1144,19 +915,16 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) */ } else - fSchema = argv[optind]; // 1st pos parm + fSchema = vm["dbname"].as(); // 1st pos parm - optind++; - - if (optind < argc) // see if table name is given + if (vm.contains("table")) // see if table name is given { - fTable = argv[optind]; // 2nd pos parm - optind++; + fTable = vm["table"].as(); // 2nd pos parm - if (optind < argc) // see if input file name is given + if (vm.contains("load-file")) // see if input file name is given { // 3rd pos parm - fLocFile = argv[optind]; + fLocFile = vm["load-file"].as(); // BUG 4379 if -f option given we need to use that path, // over riding bug 4231. look at the code below @@ -1543,9 +1311,7 @@ void WECmdArgs::setEnclByAndEscCharFromJobFile(std::string& JobName) if (fEnclosedChar == 0) // check anything in Jobxml file { WEXmlgetter aXmlGetter(JobName); - vector aSections; - aSections.push_back("BulkJob"); - aSections.push_back("EnclosedByChar"); + const vector aSections{"BulkJob", "EnclosedByChar"}; try { @@ -1569,9 +1335,7 @@ void WECmdArgs::setEnclByAndEscCharFromJobFile(std::string& JobName) if (fEscChar == 0) // check anything in Jobxml file { WEXmlgetter aXmlGetter(JobName); - vector aSections; - aSections.push_back("BulkJob"); - aSections.push_back("EscapeChar"); + const vector aSections{"BulkJob", "EscapeChar"}; try { diff --git a/writeengine/splitter/we_cmdargs.h b/writeengine/splitter/we_cmdargs.h index 64eb3dedb..64d17b598 100644 --- a/writeengine/splitter/we_cmdargs.h +++ b/writeengine/splitter/we_cmdargs.h @@ -24,28 +24,33 @@ #include #include +#include #include "we_xmlgetter.h" #include "we_type.h" +namespace boost::program_options +{ +class options_description; +} + namespace WriteEngine { class WECmdArgs { public: WECmdArgs(int argc, char** argv); - virtual ~WECmdArgs() = default; + virtual ~WECmdArgs(); typedef std::vector VecInts; typedef std::vector VecArgs; void appTestFunction(); void parseCmdLineArgs(int argc, char** argv); - std::string getCpImportCmdLine(); + std::string getCpImportCmdLine(bool skipRows); void setSchemaAndTableFromJobFile(std::string& JobName); void setEnclByAndEscCharFromJobFile(std::string& JobName); void usage(); - void usageMode3(); bool checkForCornerCases(); void checkForBulkLogDir(const std::string& BulkRoot); @@ -76,11 +81,11 @@ class WECmdArgs { return fLocFile; } - int getReadBufSize() + int getReadBufSize() const { return fReadBufSize; } - int getMode() + int getMode() const { return fMode; } @@ -88,36 +93,40 @@ class WECmdArgs { return fArgMode; } - bool isHelpMode() + bool isHelpMode() const { return fHelp; } - int getDebugLvl() + int getDebugLvl() const { return fDebugLvl; } - char getEnclChar() + char getEnclChar() const { return fEnclosedChar; } - char getEscChar() + char getEscChar() const { return fEscChar; } - char getDelimChar() + char getDelimChar() const { return fColDelim; } + int getSkipRows() const + { + return fSkipRows; + } ImportDataMode getImportDataMode() const { return fImportDataMode; } - bool getConsoleLog() + bool getConsoleLog() const { return fConsoleLog; } - bool isCpimportInvokeMode() + bool isCpimportInvokeMode() const { return (fBlockMode3) ? false : fCpiInvoke; } @@ -125,11 +134,15 @@ class WECmdArgs { return fQuiteMode; } - void setJobId(std::string fJobId) + void setJobId(const std::string& fJobId) { this->fJobId = fJobId; } - void setLocFile(std::string fLocFile) + void setOrigJobId() + { + this->fOrigJobId = fJobId; + } + void setLocFile(const std::string& fLocFile) { this->fLocFile = fLocFile; } @@ -141,7 +154,7 @@ class WECmdArgs { this->fArgMode = ArgMode; } - void setPmFile(std::string fPmFile) + void setPmFile(const std::string& fPmFile) { this->fPmFile = fPmFile; } @@ -183,7 +196,7 @@ class WECmdArgs { fUUID = jobUUID; } - bool getConsoleOutput() + bool getConsoleOutput() const { return fConsoleOutput; } @@ -194,7 +207,7 @@ class WECmdArgs bool getPmStatus(int Id); bool str2PmList(std::string& PmList, VecInts& V); - int getPmVecSize() + size_t getPmVecSize() const { return fPmVec.size(); } @@ -265,7 +278,7 @@ class WECmdArgs { return fErrorDir; } - void setErrorDir(std::string fErrorDir) + void setErrorDir(const std::string& fErrorDir) { this->fErrorDir = fErrorDir; } @@ -273,24 +286,26 @@ class WECmdArgs std::string PrepMode2ListOfFiles(std::string& FileName); // Bug 4342 void getColumnList(std::set& columnList) const; + private: + static void checkIntArg(const std::string& name, long min, long max, int value); private: // variables for SplitterApp VecArgs fVecArgs; VecInts fPmVec; VecArgs fVecJobFiles; // JobFiles splitter from master JobFile - int fMultiTableCount; // MultiTable count + int fMultiTableCount{0}; // MultiTable count VecArgs fColFldsFromJobFile; // List of columns from any job file, that - // represent fields in the import data + // represent fields in the import data std::string fJobId; // JobID std::string fOrigJobId; // Original JobID, in case we have to split it - bool fJobLogOnly; // Job number is only for log filename only - bool fHelp; // Help mode - int fMode; // splitter Mode - int fArgMode; // Argument mode, dep. on this fMode is decided. - bool fQuiteMode; // in quite mode or not - bool fConsoleLog; // Log everything to console - w.r.t cpimport - int fVerbose; // how many v's + bool fJobLogOnly{false}; // Job number is only for log filename only + bool fHelp{false}; // Help mode + int fMode{1}; // splitter Mode + int fArgMode{-1}; // Argument mode, dep. on this fMode is decided. + bool fQuiteMode{true}; // in quite mode or not + bool fConsoleLog{false}; // Log everything to console - w.r.t cpimport + int fVerbose{0}; // how many v's std::string fPmFile; // FileName at PM std::string fPmFilePath; // Path of input file in PM std::string fLocFile; // Local file name @@ -305,32 +320,33 @@ class WECmdArgs std::string fS3Host; // S3 Host std::string fS3Region; // S3 Region - unsigned int fBatchQty; // No. of batch Qty. - int fNoOfReadThrds; // No. of read buffers - // std::string fConfig; // config filename - int fDebugLvl; // Debug level - int fMaxErrors; // Max allowable errors - int fReadBufSize; // Read buffer size - int fIOReadBufSize; // I/O read buffer size - int fSetBufSize; // Buff size w/setvbuf - char fColDelim; // column delimiter - char fEnclosedChar; // enclosed by char - char fEscChar; // esc char - int fNoOfWriteThrds; // No. of write threads - bool fNullStrMode; // set null string mode - treat null as null - ImportDataMode fImportDataMode; // Importing text or binary data - std::string fPrgmName; // argv[0] - std::string fSchema; // Schema name - positional parmater - std::string fTable; // Table name - table name parameter + int fBatchQty{10000}; // No. of batch Qty. + int fNoOfReadThrds{0}; // No. of read buffers + int fDebugLvl{0}; // Debug level + int fMaxErrors{-1}; // Max allowable errors + int fReadBufSize{0}; // Read buffer size + int fIOReadBufSize{0}; // I/O read buffer size + int fSetBufSize{0}; // Buff size w/setvbuf + char fColDelim{'|'}; // column delimiter + char fEnclosedChar{0}; // enclosed by char + char fEscChar{0}; // esc char + int fSkipRows{0}; // skip header + int fNoOfWriteThrds{0}; // No. of write threads + bool fNullStrMode{false}; // set null string mode - treat null as null + ImportDataMode fImportDataMode{IMPORT_DATA_TEXT}; // Importing text or binary data + std::string fPrgmName; // argv[0] + std::string fSchema; // Schema name - positional parmater + std::string fTable; // Table name - table name parameter - bool fCpiInvoke; // invoke cpimport in mode 3 - bool fBlockMode3; // Do not allow Mode 3 - bool fbTruncationAsError; // Treat string truncation as error - boost::uuids::uuid fUUID; - bool fConsoleOutput; // If false, no output to console. - std::string fTimeZone; // Timezone to use for TIMESTAMP datatype - std::string fUsername; // Username of the data files owner - std::string fErrorDir; + bool fCpiInvoke{false}; // invoke cpimport in mode 3 + bool fBlockMode3{false}; // Do not allow Mode 3 + bool fbTruncationAsError{false}; // Treat string truncation as error + boost::uuids::uuid fUUID{boost::uuids::nil_generator()()}; + bool fConsoleOutput{true}; // If false, no output to console. + std::string fTimeZone{"SYSTEM"}; // Timezone to use for TIMESTAMP datatype + std::string fUsername; // Username of the data files owner + std::string fErrorDir{MCSLOGDIR "/cpimport/"}; + std::unique_ptr fOptions; }; //---------------------------------------------------------------------- diff --git a/writeengine/splitter/we_filereadthread.cpp b/writeengine/splitter/we_filereadthread.cpp index ac7e84d65..ea83d229d 100644 --- a/writeengine/splitter/we_filereadthread.cpp +++ b/writeengine/splitter/we_filereadthread.cpp @@ -79,6 +79,7 @@ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh) , fEncl('\0') , fEsc('\\') , fDelim('|') + , fSkipRows(0) { // TODO batch qty to get from config fBatchQty = 10000; @@ -187,6 +188,8 @@ void WEFileReadThread::setup(std::string FileName) if (aEncl != 0) fEnclEsc = true; + fSkipRows = fSdh.getSkipRows(); + // BUG 4342 - Need to support "list of infiles" // chkForListOfFiles(FileName); - List prepared in sdhandler. @@ -216,12 +219,10 @@ void WEFileReadThread::setup(std::string FileName) //------------------------------------------------------------------------------ -bool WEFileReadThread::chkForListOfFiles(std::string& FileName) +bool WEFileReadThread::chkForListOfFiles(const std::string& fileName) { // cout << "Inside chkForListOfFiles("<< FileName << ")" << endl; - std::string aFileName = FileName; - - istringstream iss(aFileName); + istringstream iss(fileName); ostringstream oss; size_t start = 0, end = 0; const char* sep = " ,|"; @@ -229,8 +230,8 @@ bool WEFileReadThread::chkForListOfFiles(std::string& FileName) do { - end = aFileName.find_first_of(sep, start); - std::string aFile = aFileName.substr(start, end - start); + end = fileName.find_first_of(sep, start); + std::string aFile = fileName.substr(start, end - start); if (aFile == "STDIN" || aFile == "stdin") aFile = "/dev/stdin"; @@ -270,9 +271,9 @@ std::string WEFileReadThread::getNextInputDataFile() } //------------------------------------------------------------------------------ -void WEFileReadThread::add2InputDataFileList(std::string& FileName) +void WEFileReadThread::add2InputDataFileList(const std::string& fileName) { - fInfileList.push_front(FileName); + fInfileList.push_front(fileName); } //------------------------------------------------------------------------------ @@ -371,17 +372,33 @@ unsigned int WEFileReadThread::readDataFile(messageqcpp::SBS& Sbs) // For now we are going to send KEEPALIVES //*Sbs << (ByteStream::byte)(WE_CLT_SRV_KEEPALIVE); - if ((fInFile.good()) && (!fInFile.eof())) + if (fInFile.good() && !fInFile.eof()) { // cout << "Inside WEFileReadThread::readDataFile" << endl; // char aBuff[1024*1024]; // TODO May have to change it later // char*pStart = aBuff; unsigned int aIdx = 0; int aLen = 0; - *Sbs << (ByteStream::byte)(WE_CLT_SRV_DATA); + *Sbs << static_cast(WE_CLT_SRV_DATA); - while ((!fInFile.eof()) && (aIdx < getBatchQty())) + while (!fInFile.eof() && aIdx < getBatchQty()) { + if (fSkipRows > 0) + { + fSkipRows--; + fInFile.getline(fBuff, fBuffSize - 1); + if (fSdh.getDebugLvl() > 3) + { + aLen = fInFile.gcount(); + if (aLen > 0 && aLen < fBuffSize - 2) + { + fBuff[aLen - 1] = 0; + cout << "Skip header row (" << fSkipRows<< " to go): " << fBuff << endl; + } + } + continue; + } + if (fEnclEsc) { // pStart = aBuff; @@ -551,6 +568,9 @@ void WEFileReadThread::openInFile() fInFile.rdbuf(fIfFile.rdbuf()); //@BUG 4326 } + // Got new file, so reset fSkipRows + fSkipRows = fSdh.getSkipRows(); + //@BUG 4326 -below three lines commented out // if (!fInFile.is_open()) fInFile.open(fInFileName.c_str()); // if (!fInFile.good()) @@ -657,13 +677,13 @@ void WEFileReadThread::initS3Connection(const WECmdArgs& args) s3Host = args.getS3Host(); ms3_library_init(); s3Connection = - ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? NULL : s3Host.c_str())); + ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? nullptr : s3Host.c_str())); if (!s3Connection) throw runtime_error("failed to get an S3 connection"); } else - s3Connection = NULL; - buf = NULL; + s3Connection = nullptr; + buf = nullptr; } //------------------------------------------------------------------------------ diff --git a/writeengine/splitter/we_filereadthread.h b/writeengine/splitter/we_filereadthread.h index 83e819500..665b3bf95 100644 --- a/writeengine/splitter/we_filereadthread.h +++ b/writeengine/splitter/we_filereadthread.h @@ -42,13 +42,11 @@ class WEFileReadThread; class WEReadThreadRunner { public: - WEReadThreadRunner(WEFileReadThread& Owner) : fRef(Owner) + explicit WEReadThreadRunner(WEFileReadThread& Owner) : fRef(Owner) { // ctor } - ~WEReadThreadRunner() - { - } + ~WEReadThreadRunner() = default; void operator()(); // Thread function @@ -61,7 +59,7 @@ class WEReadThreadRunner class WEFileReadThread { public: - WEFileReadThread(WESDHandler& aSdh); + explicit WEFileReadThread(WESDHandler& aSdh); virtual ~WEFileReadThread(); void reset(); @@ -82,9 +80,9 @@ class WEFileReadThread { return fContinue; } - void setContinue(bool fContinue) + void setContinue(bool cont) { - this->fContinue = fContinue; + fContinue = cont; } std::string getInFileName() const { @@ -98,30 +96,34 @@ class WEFileReadThread { return fBatchQty; } - void setFpThread(boost::thread* fpThread) + void setFpThread(boost::thread* pThread) { - this->fpThread = fpThread; + fpThread = pThread; } - void setInFileName(std::string fInFileName) + void setInFileName(const std::string& inFileName) { - if ((0 == fInFileName.compare("STDIN")) || (0 == fInFileName.compare("stdin"))) - this->fInFileName = "/dev/stdin"; + if (0 == inFileName.compare("STDIN") || 0 == inFileName.compare("stdin")) + { + fInFileName = "/dev/stdin"; + } else - this->fInFileName = fInFileName; + { + fInFileName = inFileName; + } } //@BUG 4326 const std::istream& getInFile() const { return fInFile; } - void setBatchQty(unsigned int BatchQty) + void setBatchQty(unsigned int batchQty) { - fBatchQty = BatchQty; + fBatchQty = batchQty; } - bool chkForListOfFiles(std::string& FileName); + bool chkForListOfFiles(const std::string& fileName); std::string getNextInputDataFile(); - void add2InputDataFileList(std::string& FileName); + void add2InputDataFileList(const std::string& fileName); private: enum @@ -130,9 +132,9 @@ class WEFileReadThread }; // don't allow anyone else to set - void setTgtPmId(unsigned int fTgtPmId) + void setTgtPmId(unsigned int tgtPmId) { - this->fTgtPmId = fTgtPmId; + fTgtPmId = tgtPmId; } WESDHandler& fSdh; @@ -148,11 +150,12 @@ class WEFileReadThread unsigned int fTgtPmId; unsigned int fBatchQty; - bool fEnclEsc; // Encl/Esc char is set - char fEncl; // Encl char - char fEsc; // Esc char - char fDelim; // Column Delimit char - char* fBuff; // main data buffer + bool fEnclEsc; // Encl/Esc char is set + char fEncl; // Encl char + char fEsc; // Esc char + char fDelim; // Column Delimit char + size_t fSkipRows; // Header rows to skip + char* fBuff; // main data buffer int fBuffSize; /* To support mode 1 imports from objects on S3 */ diff --git a/writeengine/splitter/we_sdhandler.cpp b/writeengine/splitter/we_sdhandler.cpp index d4eb6af6e..e5a6d688b 100644 --- a/writeengine/splitter/we_sdhandler.cpp +++ b/writeengine/splitter/we_sdhandler.cpp @@ -767,7 +767,7 @@ void WESDHandler::setup() oss << "Running distributed import (mode "; oss << fRef.fCmdArgs.getMode() << ") on "; - if (fRef.fCmdArgs.getPmVecSize() == fPmCount) + if (fRef.fCmdArgs.getPmVecSize() == static_cast(fPmCount)) oss << "all PMs..."; else { @@ -2548,20 +2548,20 @@ void WESDHandler::exportJobFile(std::string& JobId, std::string& JobFileName) } //------------------------------------------------------------------------------ -bool WESDHandler::getConsoleLog() +bool WESDHandler::getConsoleLog() const { return fRef.fCmdArgs.getConsoleLog(); } //------------------------------------------------------------------------------ -char WESDHandler::getEnclChar() +char WESDHandler::getEnclChar() const { return fRef.fCmdArgs.getEnclChar(); } //------------------------------------------------------------------------------ -char WESDHandler::getEscChar() +char WESDHandler::getEscChar() const { return fRef.fCmdArgs.getEscChar(); } @@ -2575,11 +2575,16 @@ int WESDHandler::getReadBufSize() //------------------------------------------------------------------------------ -char WESDHandler::getDelimChar() +char WESDHandler::getDelimChar() const { return fRef.fCmdArgs.getDelimChar(); } +size_t WESDHandler::getSkipRows() const +{ + return fRef.fCmdArgs.getSkipRows(); +} + //------------------------------------------------------------------------------ std::string WESDHandler::getTableName() const diff --git a/writeengine/splitter/we_sdhandler.h b/writeengine/splitter/we_sdhandler.h index ec9e6a78d..55ef07178 100644 --- a/writeengine/splitter/we_sdhandler.h +++ b/writeengine/splitter/we_sdhandler.h @@ -143,10 +143,11 @@ class WESDHandler void sendHeartbeats(); std::string getTableName() const; std::string getSchemaName() const; - char getEnclChar(); - char getEscChar(); - char getDelimChar(); - bool getConsoleLog(); + char getEnclChar() const; + char getEscChar() const; + char getDelimChar() const; + size_t getSkipRows() const; + bool getConsoleLog() const; int getReadBufSize(); ImportDataMode getImportDataMode() const; void sysLog(const logging::Message::Args& msgArgs, logging::LOG_TYPE logType, diff --git a/writeengine/splitter/we_splclient.h b/writeengine/splitter/we_splclient.h index e38ee0d25..ae3b84a22 100644 --- a/writeengine/splitter/we_splclient.h +++ b/writeengine/splitter/we_splclient.h @@ -46,9 +46,7 @@ class WEColOORInfo // Column Out-Of-Range Info WEColOORInfo() : fColNum(0), fColType(execplan::CalpontSystemCatalog::INT), fNoOfOORs(0) { } - ~WEColOORInfo() - { - } + ~WEColOORInfo() = default; public: int fColNum; @@ -63,14 +61,12 @@ class WESdHandlerException : public std::exception { public: std::string fWhat; - WESdHandlerException(std::string& What) throw() + explicit WESdHandlerException(const std::string& What) noexcept { fWhat = What; } - virtual ~WESdHandlerException() throw() - { - } - virtual const char* what() const throw() + ~WESdHandlerException() noexcept override = default; + const char* what() const noexcept override { return fWhat.c_str(); } @@ -82,12 +78,10 @@ class WESdHandlerException : public std::exception class WESplClientRunner { public: - WESplClientRunner(WESplClient& Sc) : fOwner(Sc) + explicit WESplClientRunner(WESplClient& Sc) : fOwner(Sc) { /* ctor */ } - virtual ~WESplClientRunner() - { /* dtor */ - } + virtual ~WESplClientRunner() = default; void operator()(); public: @@ -389,9 +383,7 @@ class WESplClient WERowsUploadInfo() : fRowsRead(0), fRowsInserted(0) { } - ~WERowsUploadInfo() - { - } + ~WERowsUploadInfo() = default; public: int64_t fRowsRead; diff --git a/writeengine/splitter/we_splitterapp.cpp b/writeengine/splitter/we_splitterapp.cpp index a9f5a6d2f..4f670899e 100644 --- a/writeengine/splitter/we_splitterapp.cpp +++ b/writeengine/splitter/we_splitterapp.cpp @@ -64,7 +64,6 @@ WESplitterApp::WESplitterApp(WECmdArgs& CmdArgs) : fCmdArgs(CmdArgs), fDh(*this) fpSysLog = SimpleSysLog::instance(); fpSysLog->setLoggingID(logging::LoggingID(SUBSYSTEM_ID_WE_SPLIT)); setupSignalHandlers(); - std::string err; fDh.setDebugLvl(fCmdArgs.getDebugLvl()); fDh.check4CpiInvokeMode(); @@ -100,6 +99,7 @@ WESplitterApp::WESplitterApp(WECmdArgs& CmdArgs) : fCmdArgs(CmdArgs), fDh(*this) } catch (std::exception& ex) { + std::string err; // err = string("Error in constructing WESplitterApp") + ex.what(); err = ex.what(); // cleaning up for BUG 4298 logging::Message::Args errMsgArgs; @@ -139,10 +139,10 @@ WESplitterApp::~WESplitterApp() // fDh.shutdown(); usleep(1000); // 1 millisec just checking - std::string aStr = "Calling WESplitterApp Destructor\n"; - if (fDh.getDebugLvl()) - cout << aStr << endl; + { + cout << "Calling WESplitterApp Destructor" << endl; + } } //------------------------------------------------------------------------------ @@ -151,18 +151,18 @@ WESplitterApp::~WESplitterApp() void WESplitterApp::setupSignalHandlers() { - struct sigaction sa; + struct sigaction sa{}; memset(&sa, 0, sizeof(sa)); sa.sa_handler = WESplitterApp::onSigInterrupt; - sigaction(SIGINT, &sa, 0); + sigaction(SIGINT, &sa, nullptr); sa.sa_handler = WESplitterApp::onSigTerminate; - sigaction(SIGTERM, &sa, 0); + sigaction(SIGTERM, &sa, nullptr); sa.sa_handler = SIG_IGN; - sigaction(SIGPIPE, &sa, 0); + sigaction(SIGPIPE, &sa, nullptr); sa.sa_handler = WESplitterApp::onSigHup; - sigaction(SIGHUP, &sa, 0); + sigaction(SIGHUP, &sa, nullptr); sa.sa_handler = WESplitterApp::onSigInterrupt; - sigaction(SIGUSR1, &sa, 0); + sigaction(SIGUSR1, &sa, nullptr); /* signal(SIGPIPE, SIG_IGN); signal(SIGINT, WESplitterApp::onSigInterrupt); @@ -258,7 +258,7 @@ void WESplitterApp::processMessages() } aBs.restart(); - std::string aCpImpCmd = fCmdArgs.getCpImportCmdLine(); + std::string aCpImpCmd = fCmdArgs.getCpImportCmdLine(false); fDh.fLog.logMsg(aCpImpCmd, MSGLVL_INFO2); if (fDh.getDebugLvl()) @@ -315,7 +315,7 @@ void WESplitterApp::processMessages() } aBs.restart(); - std::string aCpImpCmd = fCmdArgs.getCpImportCmdLine(); + std::string aCpImpCmd = fCmdArgs.getCpImportCmdLine(false); fDh.fLog.logMsg(aCpImpCmd, MSGLVL_INFO2); if (fDh.getDebugLvl()) @@ -467,7 +467,7 @@ void WESplitterApp::invokeCpimport() fCmdArgs.setJobUUID(u); fCmdArgs.setMode(3); - std::string aCmdLineStr = fCmdArgs.getCpImportCmdLine(); + std::string aCmdLineStr = fCmdArgs.getCpImportCmdLine(true); if (fDh.getDebugLvl()) cout << "CPI CmdLineArgs : " << aCmdLineStr << endl; @@ -477,7 +477,6 @@ void WESplitterApp::invokeCpimport() std::istringstream ss(aCmdLineStr); std::string arg; std::vector v2; - v2.reserve(50); while (ss >> arg) { @@ -490,7 +489,7 @@ void WESplitterApp::invokeCpimport() Cmds.push_back(const_cast(v2[j].c_str())); } - Cmds.push_back(0); // null terminate + Cmds.push_back(nullptr); // null terminate int aRet = execvp(Cmds[0], &Cmds[0]); // NOTE - works with full Path @@ -515,7 +514,7 @@ void WESplitterApp::updateWithJobFile(int aIdx) int main(int argc, char** argv) { std::string err; - std::cin.sync_with_stdio(false); + std::istream::sync_with_stdio(false); try { @@ -528,7 +527,7 @@ int main(int argc, char** argv) for (int idx = 0; idx < aTblCnt; idx++) { aWESplitterApp.fDh.reset(); - aWESplitterApp.fContinue = true; + WriteEngine::WESplitterApp::fContinue = true; aWESplitterApp.updateWithJobFile(idx); try @@ -541,10 +540,10 @@ int main(int argc, char** argv) err = ex.what(); // cleaning up for BUG 4298 logging::Message::Args errMsgArgs; errMsgArgs.add(err); - aWESplitterApp.fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); + WriteEngine::WESplitterApp::fpSysLog->logMsg(errMsgArgs, logging::LOG_TYPE_ERROR, logging::M0000); SPLTR_EXIT_STATUS = 1; aWESplitterApp.fDh.fLog.logMsg(err, WriteEngine::MSGLVL_ERROR); - aWESplitterApp.fContinue = false; + WriteEngine::WESplitterApp::fContinue = false; // throw runtime_error(err); BUG 4298 } diff --git a/writeengine/splitter/we_xmlgetter.cpp b/writeengine/splitter/we_xmlgetter.cpp index 222e4fd72..03992cb35 100644 --- a/writeengine/splitter/we_xmlgetter.cpp +++ b/writeengine/splitter/we_xmlgetter.cpp @@ -46,20 +46,23 @@ namespace WriteEngine //------------------------------------------------------------------------------ // WEXmlgetter constructor //------------------------------------------------------------------------------ -WEXmlgetter::WEXmlgetter(std::string& ConfigName) : fConfigName(ConfigName), fDoc(NULL), fpRoot(NULL) +WEXmlgetter::WEXmlgetter(const std::string& ConfigName) + : fConfigName(ConfigName) + , fDoc(nullptr) + , fpRoot(nullptr) { // xmlNodePtr curPtr; fDoc = xmlParseFile(ConfigName.c_str()); - if (fDoc == NULL) + if (fDoc == nullptr) throw runtime_error("WEXmlgetter::getConfig(): no XML document!"); fpRoot = xmlDocGetRootElement(fDoc); - if (fpRoot == NULL) + if (fpRoot == nullptr) { xmlFreeDoc(fDoc); - fDoc = NULL; + fDoc = nullptr; throw runtime_error("WEXmlgetter::getConfig(): no XML Root Tag!"); } } @@ -70,24 +73,24 @@ WEXmlgetter::WEXmlgetter(std::string& ConfigName) : fConfigName(ConfigName), fDo WEXmlgetter::~WEXmlgetter() { xmlFreeDoc(fDoc); - fDoc = NULL; + fDoc = nullptr; } //------------------------------------------------------------------------------ // Get/return the property or attribute value (strVal) for the specified xml tag // (pNode) and property/attribute (pTag) //------------------------------------------------------------------------------ -bool WEXmlgetter::getNodeAttribute(const xmlNode* pNode, const char* pTag, std::string& strVal) const +bool WEXmlgetter::getNodeAttribute(const xmlNode* pNode, const char* pTag, std::string& strVal) { - xmlChar* pTmp = NULL; + xmlChar* pTmp = nullptr; bool bFound = false; - pTmp = xmlGetProp(const_cast(pNode), (xmlChar*)pTag); + pTmp = xmlGetProp(pNode, reinterpret_cast(pTag)); if (pTmp) { bFound = true; - strVal = (char*)pTmp; + strVal = reinterpret_cast(pTmp); xmlFree(pTmp); } else @@ -101,19 +104,19 @@ bool WEXmlgetter::getNodeAttribute(const xmlNode* pNode, const char* pTag, std:: //------------------------------------------------------------------------------ // Get/return the node content (strVal) for the specified xml tag (pNode) //------------------------------------------------------------------------------ -bool WEXmlgetter::getNodeContent(const xmlNode* pNode, std::string& strVal) const +bool WEXmlgetter::getNodeContent(const xmlNode* pNode, std::string& strVal) { - xmlChar* pTmp = NULL; + xmlChar* pTmp = nullptr; bool bFound = false; - if (pNode->children != NULL) + if (pNode->children != nullptr) { pTmp = xmlNodeGetContent(pNode->children); if (pTmp) { bFound = true; - strVal = (char*)pTmp; + strVal = reinterpret_cast(pTmp); xmlFree(pTmp); } else @@ -152,29 +155,29 @@ void WEXmlgetter::getConfig(const string& section, const string& name, vectorxmlChildrenNode; + const xmlNode* pPtr = fpRoot->xmlChildrenNode; - while (pPtr != NULL) + while (pPtr != nullptr) { // cout << "pPtr->name: " << // (const xmlChar*)pPtr->name << std::endl; - if ((!xmlStrcmp(pPtr->name, (const xmlChar*)section.c_str()))) + if ((!xmlStrcmp(pPtr->name, reinterpret_cast(section.c_str())))) { xmlNodePtr pPtr2 = pPtr->xmlChildrenNode; - while (pPtr2 != NULL) + while (pPtr2 != nullptr) { // cout << " pPtr2->name: " << // (const xmlChar*)pPtr2->name << std::endl; - if ((!xmlStrcmp(pPtr2->name, (const xmlChar*)name.c_str()))) + if ((!xmlStrcmp(pPtr2->name, reinterpret_cast(name.c_str())))) { xmlNodePtr pPtr3 = pPtr2->xmlChildrenNode; - values.push_back((const char*)pPtr3->content); + values.emplace_back(reinterpret_cast(pPtr3->content)); // cout << " pPtr3->name: " << // (const xmlChar*)pPtr3->name << @@ -204,8 +207,8 @@ std::string WEXmlgetter::getValue(const vector& sections) const { std::string aRet; const xmlNode* pPtr = fpRoot; - int aSize = sections.size(); - int aIdx = 0; + auto aSize = sections.size(); + size_t aIdx = 0; // cout << aSize << endl; while (aIdx < aSize) @@ -213,7 +216,7 @@ std::string WEXmlgetter::getValue(const vector& sections) const // cout << aIdx <<" "<< sections[aIdx] << endl; pPtr = getNode(pPtr, sections[aIdx]); - if ((pPtr == NULL) || (aIdx == aSize - 1)) + if ((pPtr == nullptr) || (aIdx == aSize - 1)) break; else { @@ -223,7 +226,7 @@ std::string WEXmlgetter::getValue(const vector& sections) const } } - if (pPtr != NULL) + if (pPtr != nullptr) { // aRet = (const char*)pPtr->content; std::string aBuff; @@ -240,17 +243,17 @@ std::string WEXmlgetter::getValue(const vector& sections) const // a node with the specified name (section). The xmlNode (if found) is // returned. //------------------------------------------------------------------------------ -const xmlNode* WEXmlgetter::getNode(const xmlNode* pParent, const string& section) const +const xmlNode* WEXmlgetter::getNode(const xmlNode* pParent, const string& section) { - if (pParent == NULL) - return NULL; + if (pParent == nullptr) + return nullptr; const xmlNode* pPtr = pParent; - while (pPtr != NULL) + while (pPtr != nullptr) { // cout << "getNode Name " << (const char*)pPtr->name << endl; - if (!xmlStrcmp(pPtr->name, (const xmlChar*)section.c_str())) + if (!xmlStrcmp(pPtr->name, reinterpret_cast(section.c_str()))) return pPtr; else pPtr = pPtr->next; @@ -268,12 +271,12 @@ std::string WEXmlgetter::getAttribute(const vector& sections, const stri { std::string aRet; const xmlNode* pPtr = fpRoot; - int aSize = sections.size(); + auto aSize = sections.size(); if (aSize == 0) throw invalid_argument("WEXmlgetter::getAttribute(): section must be valid"); - int aIdx = 0; + size_t aIdx = 0; // cout << aSize << endl; while (aIdx < aSize) @@ -281,7 +284,7 @@ std::string WEXmlgetter::getAttribute(const vector& sections, const stri // cout << aIdx <<" "<< sections[aIdx] << endl; pPtr = getNode(pPtr, sections[aIdx]); - if ((pPtr == NULL) || (aIdx == aSize - 1)) + if ((pPtr == nullptr) || (aIdx == aSize - 1)) break; else { @@ -291,7 +294,7 @@ std::string WEXmlgetter::getAttribute(const vector& sections, const stri } } - if (pPtr != NULL) + if (pPtr != nullptr) { std::string aBuff; @@ -315,10 +318,10 @@ std::string WEXmlgetter::getAttribute(const vector& sections, const stri // is returned. //------------------------------------------------------------------------------ void WEXmlgetter::getAttributeListForAllChildren(const vector& sections, const string& attributeTag, - vector& attributeValues) + vector& attributeValues) const { const xmlNode* pPtr = fpRoot; - int aSize = sections.size(); + auto aSize = sections.size(); if (aSize == 0) { @@ -328,13 +331,13 @@ void WEXmlgetter::getAttributeListForAllChildren(const vector& sections, } // Step down the branch that has the nodes of interest - int aIdx = 0; + size_t aIdx = 0; while (aIdx < aSize) { pPtr = getNode(pPtr, sections[aIdx]); - if ((pPtr == NULL) || (aIdx == aSize - 1)) + if ((pPtr == nullptr) || (aIdx == aSize - 1)) { break; } @@ -347,9 +350,9 @@ void WEXmlgetter::getAttributeListForAllChildren(const vector& sections, // Look for all the "matching" nodes at the end of the branch, and // get the requested attribute value for each matching node. - if (pPtr != NULL) + if (pPtr != nullptr) { - while (pPtr != NULL) + while (pPtr != nullptr) { std::string attrib; diff --git a/writeengine/splitter/we_xmlgetter.h b/writeengine/splitter/we_xmlgetter.h index ba1480328..1d566b79c 100644 --- a/writeengine/splitter/we_xmlgetter.h +++ b/writeengine/splitter/we_xmlgetter.h @@ -36,23 +36,23 @@ namespace WriteEngine class WEXmlgetter { public: - WEXmlgetter(std::string& ConfigName); - virtual ~WEXmlgetter(); + explicit WEXmlgetter(const std::string& ConfigName); + ~WEXmlgetter(); public: //..Public methods - std::string getValue(const std::vector& section) const; + std::string getValue(const std::vector& sections) const; std::string getAttribute(const std::vector& sections, const std::string& Tag) const; void getConfig(const std::string& section, const std::string& name, std::vector& values) const; void getAttributeListForAllChildren(const std::vector& sections, const std::string& attributeTag, - std::vector& attributeValues); + std::vector& attributeValues) const; private: //..Private methods - const xmlNode* getNode(const xmlNode* pParent, const std::string& section) const; - bool getNodeAttribute(const xmlNode* pNode, const char* pTag, std::string& strVal) const; - bool getNodeContent(const xmlNode* pNode, std::string& strVal) const; + static const xmlNode* getNode(const xmlNode* pParent, const std::string& section); + static bool getNodeAttribute(const xmlNode* pNode, const char* pTag, std::string& strVal); + static bool getNodeContent(const xmlNode* pNode, std::string& strVal); //..Private data members std::string fConfigName; // xml filename diff --git a/writeengine/xml/we_xmlgendata.cpp b/writeengine/xml/we_xmlgendata.cpp index e619875f1..e977f44dd 100644 --- a/writeengine/xml/we_xmlgendata.cpp +++ b/writeengine/xml/we_xmlgendata.cpp @@ -35,18 +35,19 @@ namespace WriteEngine { /* static */ const std::string XMLGenData::DELIMITER("-d"); /* static */ const std::string XMLGenData::DESCRIPTION("-s"); - /* static */ const std::string XMLGenData::ENCLOSED_BY_CHAR("-E"); - /* static */ const std::string XMLGenData::ESCAPE_CHAR("-C"); - /* static */ const std::string XMLGenData::JOBID("-j"); +/* static */ const std::string XMLGenData::ENCLOSED_BY_CHAR("-E"); +/* static */ const std::string XMLGenData::ESCAPE_CHAR("-C"); +/* static */ const std::string XMLGenData::JOBID("-j"); /* static */ const std::string XMLGenData::MAXERROR("-e"); /* static */ const std::string XMLGenData::NAME("-n"); /* static */ const std::string XMLGenData::PATH("-p"); - /* static */ const std::string XMLGenData::RPT_DEBUG("-b"); +/* static */ const std::string XMLGenData::RPT_DEBUG("-b"); /* static */ const std::string XMLGenData::USER("-u"); /* static */ const std::string XMLGenData::NO_OF_READ_BUFFER("-r"); /* static */ const std::string XMLGenData::READ_BUFFER_CAPACITY("-c"); /* static */ const std::string XMLGenData::WRITE_BUFFER_SIZE("-w"); /* static */ const std::string XMLGenData::EXT("-x"); +/* static */ const std::string XMLGenData::SKIP_ROWS("-O"); //------------------------------------------------------------------------------ // XMLGenData constructor @@ -54,39 +55,38 @@ namespace WriteEngine //------------------------------------------------------------------------------ XMLGenData::XMLGenData() { - fParms.insert(ParmList::value_type(DELIMITER, std::string("|"))); - fParms.insert(ParmList::value_type(DESCRIPTION, std::string())); - fParms.insert(ParmList::value_type(ENCLOSED_BY_CHAR, std::string(""))); - fParms.insert(ParmList::value_type(ESCAPE_CHAR, std::string("\\"))); - fParms.insert(ParmList::value_type(JOBID, std::string("299"))); - fParms.insert(ParmList::value_type(MAXERROR, std::string("10"))); - fParms.insert(ParmList::value_type(NAME, std::string())); + fParms.emplace(DELIMITER, "|"); + fParms.emplace(DESCRIPTION, ""); + fParms.emplace(ENCLOSED_BY_CHAR, ""); + fParms.emplace(ESCAPE_CHAR, "\\"); + fParms.emplace(JOBID, "299"); + fParms.emplace(MAXERROR, "10"); + fParms.emplace(NAME, ""); boost::filesystem::path p{std::string(Config::getBulkRoot())}; p /= JOBDIR; - fParms.insert(ParmList::value_type(PATH, p.string())); + fParms.emplace(PATH, p.string()); - fParms.insert(ParmList::value_type(RPT_DEBUG, std::string("0"))); - fParms.insert(ParmList::value_type(USER, std::string())); - fParms.insert(ParmList::value_type(NO_OF_READ_BUFFER, std::string("5"))); - fParms.insert(ParmList::value_type(READ_BUFFER_CAPACITY, std::string("1048576"))); - fParms.insert(ParmList::value_type(WRITE_BUFFER_SIZE, std::string("10485760"))); - fParms.insert(ParmList::value_type(EXT, std::string("tbl"))); + fParms.emplace(RPT_DEBUG, "0"); + fParms.emplace(USER, ""); + fParms.emplace(NO_OF_READ_BUFFER, "5"); + fParms.emplace(READ_BUFFER_CAPACITY, "1048576"); + fParms.emplace(WRITE_BUFFER_SIZE, "10485760"); + fParms.emplace(EXT, "tbl"); + fParms.emplace(SKIP_ROWS, "0"); } //------------------------------------------------------------------------------ // XMLGenData destructor //------------------------------------------------------------------------------ /* virtual */ -XMLGenData::~XMLGenData() -{ -} +XMLGenData::~XMLGenData() = default; //------------------------------------------------------------------------------ // Return value for the specified parm. //------------------------------------------------------------------------------ std::string XMLGenData::getParm(const std::string& key) const { - ParmList::const_iterator p = fParms.find(key); + auto p = fParms.find(key); if (fParms.end() != p) return p->second; diff --git a/writeengine/xml/we_xmlgendata.h b/writeengine/xml/we_xmlgendata.h index 633f1ed8a..be9b4a2a9 100644 --- a/writeengine/xml/we_xmlgendata.h +++ b/writeengine/xml/we_xmlgendata.h @@ -60,10 +60,13 @@ class XMLGenData EXPORT const static std::string READ_BUFFER_CAPACITY; EXPORT const static std::string WRITE_BUFFER_SIZE; EXPORT const static std::string EXT; + EXPORT const static std::string SKIP_ROWS; /** @brief XMLGenData constructor */ EXPORT XMLGenData(); + XMLGenData(const XMLGenData&) = delete; + XMLGenData& operator=(const XMLGenData&) = delete; /** @brief XMLGenData destructor */ @@ -92,10 +95,6 @@ class XMLGenData ParmList fParms; std::string fSchema; LoadNames fLoadNames; - - private: - XMLGenData(const XMLGenData&); // disable default copy ctor - XMLGenData& operator=(const XMLGenData&); // disable default assignment }; } // namespace WriteEngine diff --git a/writeengine/xml/we_xmlgenproc.cpp b/writeengine/xml/we_xmlgenproc.cpp index 784f21891..c21251261 100644 --- a/writeengine/xml/we_xmlgenproc.cpp +++ b/writeengine/xml/we_xmlgenproc.cpp @@ -147,6 +147,11 @@ void XMLGenProc::startXMLFile() xmlTextWriterWriteElement(fWriter, BAD_CAST xmlTagTable[TAG_ESCAPE_CHAR], BAD_CAST fInputMgr->getParm(XMLGenData::ESCAPE_CHAR).c_str()); + if (auto skipRows = fInputMgr->getParm(XMLGenData::SKIP_ROWS); !skipRows.empty()) + { + xmlTextWriterWriteElement(fWriter, BAD_CAST xmlTagTable[TAG_SKIP_ROWS], BAD_CAST skipRows.c_str()); + } + // Added new tags for configurable parameters xmlTextWriterStartElement(fWriter, BAD_CAST xmlTagTable[TAG_READ_BUFFERS]); xmlTextWriterWriteFormatAttribute(fWriter, BAD_CAST xmlTagTable[TAG_NO_OF_READ_BUFFERS], "%d", diff --git a/writeengine/xml/we_xmljob.cpp b/writeengine/xml/we_xmljob.cpp index f37efeaeb..34b50e026 100644 --- a/writeengine/xml/we_xmljob.cpp +++ b/writeengine/xml/we_xmljob.cpp @@ -130,6 +130,7 @@ void XMLJob::printJobInfo(Log& logger) const oss1 << "Read Buffers: " << job.numberOfReadBuffers << endl; oss1 << "Read Buffer Size: " << job.readBufferSize << endl; oss1 << "setvbuf Size: " << job.writeBufferSize << endl; + oss1 << "Header rows : " << job.fSkipRows << endl; oss1 << "Create Date : " << job.createDate << endl; oss1 << "Create Time : " << job.createTime << endl; oss1 << "Schema Name : " << job.schema << endl; @@ -223,7 +224,8 @@ void XMLJob::printJobInfoBrief(Log& logger) const oss1 << "n/a"; oss1 << "); ReadBufs(" << job.numberOfReadBuffers << "); ReadBufSize(" << job.readBufferSize - << "); setvbufSize(" << job.writeBufferSize << ')'; + << "); setvbufSize(" << job.writeBufferSize << "); " + << "SkipRows(" << job.fSkipRows << ")"; logger.logMsg(oss1.str(), MSGLVL_INFO2); for (unsigned int i = 0; i < job.jobTableList.size(); i++) @@ -316,6 +318,8 @@ bool XMLJob::processNode(xmlNode* pNode) setJobData(pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR); else if (isTag(pNode, TAG_ESCAPE_CHAR)) setJobData(pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR); + else if (isTag(pNode, TAG_SKIP_ROWS)) + setJobData(pNode, TAG_SKIP_ROWS, true, TYPE_INT); else { ostringstream oss; @@ -432,6 +436,12 @@ void XMLJob::setJobData(xmlNode* pNode, const xmlTag tag, bool bExpectContent, X break; } + case TAG_SKIP_ROWS: + { + fJob.fSkipRows = intVal; + break; + } + default: break; } } diff --git a/writeengine/xml/we_xmltag.h b/writeengine/xml/we_xmltag.h index 96a14f810..e3bc51cdf 100644 --- a/writeengine/xml/we_xmltag.h +++ b/writeengine/xml/we_xmltag.h @@ -73,6 +73,7 @@ enum xmlTag TAG_TBL_OID, TAG_WIDTH, TAG_SCHEMA_NAME, + TAG_SKIP_ROWS, NUM_OF_XML_TAGS }; @@ -93,6 +94,7 @@ const char xmlTagTable[NUM_OF_XML_TAGS + 1][MAX_XML_TAG_NAME_SIZE] = { "origName", //@bug 3599: deprecated; kept for backwards compatibility "precision", "scale", "tblName", //@bug 3599: replaces origName - "tblOid", "width", "Name"}; + "tblOid", "width", "Name", + "skipRows"}; } // namespace WriteEngine