/* Copyright (C) 2014 InfiniDB, Inc. Copyright (C) 2016 MariaDB Corporation This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include #include #include #include #include #include #include #include #include #include #include #include #include namespace po = boost::program_options; using namespace std; #include #include #include #include #include "dataconvert.h" #include "liboamcpp.h" using namespace oam; #include "we_cmdargs.h" #include "installdir.h" #include "mcsconfig.h" namespace WriteEngine { //---------------------------------------------------------------------- //---------------------------------------------------------------------- WECmdArgs::WECmdArgs(int argc, char** argv) { try { appTestFunction(); fOptions = std::make_unique(); #define DECLARE_INT_ARG(name, stor, min, max, desc) \ (name,\ po::value(&stor)\ ->notifier([](auto&& value) { checkIntArg(name, min, max, value); }),\ desc) fOptions->add_options() ("help,h", "Print this message.") DECLARE_INT_ARG("read-buffer,b", fIOReadBufSize, 1, INT_MAX, "Number of read buffers.") DECLARE_INT_ARG("read-buffer-size,c", fReadBufSize, 1, INT_MAX, "Application read buffer size (in bytes)") DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message") ("verbose,v", po::value()) ("silent,N", po::bool_switch()) ("max-errors,e", po::value(), "Maximum number (or 'all') of allowable error per table per PM") ("file-path,f", po::value(&fPmFilePath), "Data file directory path. Default is current working directory.\n" "\tIn Mode 1, represents the local input file path.\n" "\tIn Mode 2, represents the PM based input file path.\n" "\tIn Mode 3, represents the local input file path.") DECLARE_INT_ARG("mode,m", fArgMode, 0, 3, "\t1 - rows will be loaded in a distributed manner acress PMs.\n" "\t2 - PM based input files loaded into their respective PM.\n" "\t3 - input files will be loaded on the local PM.") ("filename,l", po::value(&fPmFile), "Name of import file to be loaded, relative to 'file-path'") DECLARE_INT_ARG("batch-quantity,q", fBatchQty, 1, INT_MAX, "Batch quantity, Number of rows distributed per batch in Mode 1") ("console-log,i", po::bool_switch(&fConsoleLog), "Print extended info to console in Mode 3.") ("job-id,j", po::value(), "Job ID. In simple usage, default is the table OID unless a fully qualified input " "file name is given.") ("null-strings,n", po::value(&fNullStrMode)->implicit_value(true), "NullOption (0-treat the string NULL as data (default);\n" "1-treat the string NULL as a NULL value)") ("xml-job-path,p", po::value(&fJobPath), "Path for the XML job description file.") DECLARE_INT_ARG("readers,r", fNoOfReadThrds, 1, INT_MAX, "Number of readers.") ("separator,s", po::value(), "Delimiter between column values.") DECLARE_INT_ARG("io-buffer-size,B", fSetBufSize, 1, INT_MAX, "I/O library read buffer size (in bytes)") DECLARE_INT_ARG("writers,w", fNoOfWriteThrds, 1, INT_MAX, "Number of parsers.") ("enclosed-by,E", po::value(&fEnclosedChar), "Enclosed by character if field values are enclosed.") ("escape-char,C", po::value(&fEscChar)->default_value('\\'), "Escape character used in conjunction with 'enclosed-by'" "character, or as a part of NULL escape sequence ('\\N');\n" "default is '\\'") ("headers,O", po::value(&fSkipRows)->implicit_value(1) ->notifier([](auto&& value) { checkIntArg("headers,O", 0, INT_MAX, value); }), "Number of header rows to skip.") ("binary-mode,I", po::value(), "Import binary data; how to treat NULL values:\n" "\t1 - import NULL values\n" "\t2 - saturate NULL values\n") ("pm,P", po::value>(&fPmVec), "List of PMs ex: -P 1,2,3. Default is all PMs.") ("truncation-as-error,S", po::bool_switch(&fbTruncationAsError), "Treat string truncations as errors.") ("tz,T", po::value(), "Timezone used for TIMESTAMP datatype. Possible values:\n" "\t\"SYSTEM\" (default)\n" "\tOffset in the form +/-HH:MM") ("s3-key,y", po::value(&fS3Key), "S3 Authentication Key (for S3 imports)") ("s3-secret,K", po::value(&fS3Secret), "S3 Authentication Secret (for S3 imports)") ("s3-bucket,t", po::value(&fS3Bucket), "S3 Bucket (for S3 imports)") ("s3-hostname,H", po::value(&fS3Host), "S3 Hostname (for S3 imports, Amazon's S3 default)") ("s3-region,g", po::value(&fS3Region), "S3 Region (for S3 imports)") ("errors-dir,L", po::value(&fErrorDir)->default_value(fErrorDir), "Directory for the output .err and .bad files") ("username,U", po::value(&fUsername), "Username of the files owner.") ("dbname", po::value(), "Name of the database to load") ("table", po::value(), "Name of table to load") ("load-file", po::value(), "Optional input file name in current directory, " "unless a fully qualified name is given. If not given, input read from STDIN."); #undef DECLARE_INT_ARG parseCmdLineArgs(argc, argv); } catch (std::exception& exp) { std::string exceptMsg(exp.what()); exceptMsg += "\nTry 'cpimport -h' for more information."; throw(runtime_error(exceptMsg)); } } WECmdArgs::~WECmdArgs() = default; //---------------------------------------------------------------------- void WECmdArgs::appTestFunction() { // testing begins // std::string aJobFile("/home/bpaul/Calpont/data/bulk/job/Job_1234.xml"); // std::string aJobFile("/usr/local/mariadb/columnstore/data/bulk/job/Job_1234.xml"); // setSchemaAndTableFromJobFile(aJobFile); // setEnclByAndEscCharFromJobFile(aJobFile); // exit(1); // testing ends return; } void WECmdArgs::checkIntArg(const std::string& name, long min, long max, int value) { if (value < min || value > max) { ostringstream oss; oss << "Argument " << name << " is out of range [" << min << ", " << max << "]"; throw runtime_error(oss.str()); } } //---------------------------------------------------------------------- std::string WECmdArgs::getCpImportCmdLine(bool skipRows) { std::ostringstream aSS; std::string aCmdLine; aSS << fPrgmName; // prgm name as arg[0] if (fHelp) { aSS << " -h "; aCmdLine = aSS.str(); return aCmdLine; } // checkJobIdCase(); // check if JobID if ((fPmFile.length() > 0) && (0 == getMode())) aSS << " -l " << fPmFile; // BUG 4379 if -m is not given while prep args, default is m=1 but // on single node -m will set it to 3, after prep args. // if((fPmFilePath.length()>0)&&(1!=getMode())) // aSS << " -f " << fPmFilePath; if ((fPmFilePath.length() > 0) && (1 != getMode())) { if (fPmFilePath == "STDIN") // if stdin, must pass it aSS << " -f " << fPmFilePath; else if (2 == getArgMode()) // Mode 2 we need to pass the -f option aSS << " -f " << fPmFilePath; else if (3 == getArgMode()) //-m given, -f built in already. aSS << " -f " << fPmFilePath; else if (0 == fLocFile.length()) // No filename given, from job file aSS << " -f " << fPmFilePath; } if (fErrorDir.length() > 0) aSS << " -L " << fErrorDir; if (fUsername.length() > 0) aSS << " -U " << fUsername; if (fJobId.length() > 0) aSS << " -j " << fJobId; if (fNoOfReadThrds > 0) aSS << " -r " << fNoOfReadThrds; if (fNoOfWriteThrds > 0) aSS << " -w " << fNoOfWriteThrds; if (fMaxErrors == MAX_ERRORS_ALL) aSS << " -e all "; else if (fMaxErrors != MAX_ERRORS_DEFAULT) aSS << " -e " << fMaxErrors; // BUG 5088 if (fDebugLvl > 0) aSS << " -d " << fDebugLvl; if (fSetBufSize > 0) aSS << " -B " << fSetBufSize; if (fColDelim != '|') { if (fColDelim == '\t') aSS << " -s " << "\\t"; //- working with user '\t' // NONE of the following will work. // aSS << " -s " << "\t"; //aSS << " -s " << "\"\\t\""; //aSS << " -s " << "'\\t'"; else aSS << " -s " << fColDelim; } if (fEnclosedChar != 0) aSS << " -E " << fEnclosedChar; if (fEscChar != 0) aSS << " -C " << fEscChar; if (skipRows && fSkipRows) { aSS << " -O " << fSkipRows; } if (fNullStrMode) aSS << " -n " << '1'; if (fImportDataMode != IMPORT_DATA_TEXT) aSS << " -I " << fImportDataMode; // if(fConfig.length()>0) // aSS << " -c " << fConfig; if (fReadBufSize > 0) { cout << "setting fReadBufSize = " << fReadBufSize; aSS << " -c " << fReadBufSize; } if (fIOReadBufSize > 0) aSS << " -b " << fIOReadBufSize; if ((fJobPath.length() > 0) && (fMode == 3)) aSS << " -p " << fJobPath; if (fConsoleLog) aSS << " -i "; if ((fMode == 1) || (fMode == 2)) { aSS << " -R " << getBrmRptFileName(); aSS << " -m " << fMode; } aSS << " -P " << getModuleID(); aSS << " -T " << fTimeZone; if (fbTruncationAsError) aSS << " -S "; if (!fS3Key.empty() && !(fMode == 0 || fMode == 1)) { if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty()) throw(runtime_error("Not all required S3 options provided")); aSS << " -y " << fS3Key; aSS << " -K " << fS3Secret; aSS << " -t " << fS3Bucket; aSS << " -g " << fS3Region; if (!fS3Host.empty()) { aSS << " -H " << fS3Host; } } if ((fJobId.length() > 0) && (fMode == 1) && (!fJobLogOnly)) { // if JobPath provided, make it w.r.t WES aSS << " -p " << fTmpFileDir; aSS << " -fSTDIN"; } else if ((fJobId.length() > 0) && (fMode == 2) && (!fJobLogOnly)) { // if JobPath provided, make it w.r.t WES aSS << " -p " << fTmpFileDir; if (fPmFile.length() > 0) aSS << " -l " << fPmFile; if (fPmFilePath.length() > 0) aSS << " -f " << fPmFilePath; } else // do not provide schema & table with JobId { if (!fUUID.is_nil()) aSS << " -u" << boost::uuids::to_string(fUUID); if (fSchema.length() > 0) aSS << " " << fSchema; // else if((fMode != 0)||(fMode==3)) //TODO make it mode3 + jobID else if (fJobId.length() > 0) { } // may or may not contain Schema. // else if((fMode == 1)||(fMode==2)) //TODO make it mode3 + jobID else if (fMode != 0) throw(runtime_error("Schema not available")); if (fTable.length() > 0) aSS << " " << fTable; else if (fJobId.length() > 0) { } // may or may not contain Table. else if (fMode != 0) throw(runtime_error("Tablename not available")); // else if((fMode != 0)||(fMode==3)) //TODO make it mode3 + jobID // else if((fMode == 1)||(fMode == 2)) //TODO make it mode3 + jobID // throw (runtime_error("Tablename not available")); if ((fPmFile.length() > 0) && (2 == getMode())) { // if(fPmFilePath.length()>0) // aSS << " " << fPmFilePath; aSS << " " << fPmFile; } else if (2 == getMode()) throw(runtime_error("loadFile [-l ] not available")); } if ((fLocFile.length() > 0) && (fLocFile != "STDIN") && (3 == getMode())) { // Bug 4342 multi-files mode 3 support // convert all the spaces into 'commas' if (fLocFile.find_first_of(' ') == string::npos) aSS << " " << fLocFile; else { std::string aLocFiles = replaceCharInStr(fLocFile, ' ', ','); aSS << " " << aLocFiles; } } try { aCmdLine = aSS.str(); } catch (exception&) { throw runtime_error("getcpImportCmdLine failed"); } return aCmdLine; } //---------------------------------------------------------------------- // BUG 4205 (part FIX) - need to implement more into it bool WECmdArgs::checkForCornerCases() { // BUG 4210 this->checkJobIdCase(); // Need to do this before we go further if (fSkipRows && fImportDataMode != IMPORT_DATA_TEXT) { cout << "Invalid option -O with binary file" << endl; throw runtime_error("Invalid option -O with binary file"); } if (fMode == 0) { if (!fJobId.empty()) { // cout << "Invalid option mode 0 with a Job File" << endl; throw( runtime_error("Mode 0 with a Job file option is not valid!!" "\nTry 'cpimport -h' for more information.")); } else if (!fJobPath.empty()) { cout << "Invalid option mode 0 with a Job Path" << endl; throw( runtime_error("Mismatched options" "\nTry 'cpimport -h' for more information.")); } else if (!fSchema.empty()) { cout << "Invalid option in mode 0 with a schema name" << endl; throw(runtime_error("Mismatched options.")); } else if (!fTable.empty()) { cout << "Invalid option in mode 0 with a table name" << endl; throw(runtime_error("Mismatched options.")); } else if ((!fPmFilePath.empty()) && (fPmFilePath != "STDIN")) { cout << "Invalid option -f in Mode 0 with value other than STDIN" << endl; throw(runtime_error("Mismatched options.")); } if (fSetBufSize) { cout << "Invalid option -B with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } else if (fIOReadBufSize) { cout << "Invalid option -b with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } else if (fMaxErrors >= 0 || fMaxErrors == MAX_ERRORS_ALL) { cout << "Invalid option -e with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } else if (fConsoleLog) { cout << "Invalid option -i with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } else if (fReadBufSize) { cout << "Invalid option -c with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } else if (fNoOfReadThrds) { cout << "Invalid option -r with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } else if (fNoOfWriteThrds) { cout << "Invalid option -w with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } if (fImportDataMode != IMPORT_DATA_TEXT) { cout << "Invalid option -I with Mode 0" << endl; throw(runtime_error("Mismatched options.")); } } if (fMode == 1) { if (!fJobId.empty()) { if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN")) { // do not do anything.. this is good. } } // Mode 1, if Input Path is existing and input file is not existing // it is an error, bce it assumes all the files in directory. // In mode 2, we are passing info to cpimport.bin, which will take care // of it, as in Mode 3. else if ((!fPmFilePath.empty()) && (fPmFile.empty())) { // assumed since position param is missing if ((fLocFile == "STDIN") && (fPmFilePath != "STDIN")) { cout << "Invalid options in Mode 1 : option -l " << endl; cout << " or input file position parameter needed" << endl; // cout << "\tOption (-j) should follow with option -l option or " // "an input file position parameter" << endl; throw( runtime_error("In Mode 1 Error occurred!! " "\nTry 'cpimport -h' for more information.")); } } } if (fMode == 2) { if (fPmFile.empty()) throw( runtime_error("Mode 2 require PM based filename [-l]" "\nTry 'cpimport -h' for more information.")); if ((fPmFilePath.empty()) && (fPmFile.at(0) != '/')) throw( runtime_error("Mode 2 require remote file opts -f and -l or " "a fully qualified path for the remote file." "\nTry 'cpimport -h' for more information.")); if (!fS3Key.empty()) throw(runtime_error("Mode 2 & an input file from S3 does not make sense.")); } if (fMode == 3) { if (fPmVec.size()) { cout << "Invalid option -P with Mode 3" << endl; throw(runtime_error("Mismatched options.")); } } return true; } //---------------------------------------------------------------------- bool WECmdArgs::str2PmList(std::string& PmList, VecInts& V) { const int BUFLEN = 512; char aBuff[BUFLEN]; int aLen = PmList.length(); if (aLen > 0) { strncpy(aBuff, PmList.c_str(), BUFLEN - 1); aBuff[BUFLEN - 1] = 0; } else return false; char* pTok = strtok(aBuff, ","); while (pTok != NULL) { int aPmId = 0; try { aPmId = atoi(pTok); V.push_back(aPmId); } catch (exception& ex) { std::stringstream aErr; aErr << "Wrong PM id format : " << ex.what(); // cout << "Wrong PM id format : "<< ex.what() << endl; throw(runtime_error(aErr.str())); } pTok = strtok(NULL, ","); } return true; } //---------------------------------------------------------------------- void WECmdArgs::usage() { cout << "Simple usage using positional parameters (no XML job file):\n"; cout << "\tcpimport dbName tblName [loadFile] [-h] [-m mode]\n"; cout << "\t\t [-f path] [-d debugLevel] [-c readbufSize] [-b readBufs] \n"; cout << "\t\t [-r readers] [-j JobID] [-e maxErrs] [-B libBufSize] [-w parsers]\n"; cout << "\t\t [-s c] [-E enclosedChar] [-C escapeChar] [-n NullOption]\n"; cout << "\t\t [-q batchQty] [-p jobPath] [-P list of PMs] [-S] [-i] [-v verbose]\n"; cout << "\t\t [-I binaryOpt] [-T timeZone]\n"; cout << "Traditional usage without positional parameters (XML job file required):\n"; cout << "\tcpimport -j jobID\n"; cout << "\t\t [-h] [-m mode] [-r readers] [-w parsers] [-s c] [-f path]\n"; cout << "\t\t [-b readBufs] [-p path] [-c readBufSize] [-e maxErrs] [-B libBufSize]\n"; cout << "\t\t [-n NullOption] [-E encloseChar] [-C escapeChar] [-i] [-v verbose]\n"; cout << "\t\t [-d debugLevel] [-q batchQty] [-l loadFile] [-P list of PMs] [-S]\n"; cout << "\t\t [-I binaryOpt] [-T timeZone]\n"; cout << "\n\nPositional parameters:\n"; cout << "\tdbName Name of the database to load\n"; cout << "\ttblName Name of table to load\n"; cout << "\tloadFile Optional input file name in current directory,\n"; cout << "\t\t\tunless a fully qualified name is given.\n"; cout << "\t\t\tIf not given, input read from STDIN.\n"; cout << "\n\n" << (*fOptions) << endl; cout << "\nExample1: Traditional usage\n" << "\tcpimport -j 1234"; cout << "\nExample2: Some column values are enclosed within double quotes.\n" << "\tcpimport -j 3000 -E '\"'"; cout << "\nExample3: Import a nation table without a Job XML file\n" << "\tcpimport -j 301 tpch nation nation.tbl"; cout << "\nExample4: Import a nation table to all PMs in Mode 1\n" << "\tcpimport -m 1 tpch nation nation.tbl"; cout << "\nExample4: Import a nation table to only PM1 and PM2 in Mode 1\n" << "\tcpimport -m 1 -P 1,2 tpch nation nation.tbl"; cout << "\nExample5: Import nation.tbl from PMs to nation table in Mode 2\n" << "\tcpimport -m 2 tpch nation -f /var/lib/columnstore/data/bulk/data/import/ -l nation.tbl"; cout << "\nExample6: Import nation.tbl in mode 3\n" << "\tcpimport -m 3 tpch nation nation.tbl\n\n"; exit(1); } //----------------------------------------------------------------------------- void WECmdArgs::parseCmdLineArgs(int argc, char** argv) { std::string importPath; bool aJobType = false; if (argc > 0) fPrgmName = string(MCSBINDIR) + "/" + "cpimport.bin"; // argv[0] is splitter but we need cpimport po::positional_options_description pos_opt; pos_opt.add("dbname", 1) .add("table", 1) .add("load-file", 1); po::variables_map vm; po::store(po::command_line_parser(argc, argv).options(*fOptions).positional(pos_opt).run(), vm); po::notify(vm); if (vm.count("silent")) { fConsoleOutput = !vm["silent"].as(); } if (vm.count("help")) { fHelp = true; usage(); return; } if (vm.count("separator")) { auto value = vm["separator"].as(); if (value == "\\t") { fColDelim = '\t'; if (fDebugLvl) { cout << "Column delimiter : \\t" << endl; } } else { fColDelim = value[0]; if (fDebugLvl) { cout << "Column delimiter : " << fColDelim << endl; } } } if (vm.count("binary-mode")) { int value = vm["binary-mode"].as(); if (value == 1) { fImportDataMode = IMPORT_DATA_BIN_ACCEPT_NULL; } else if (value == 2) { fImportDataMode = IMPORT_DATA_BIN_SAT_NULL; } else { throw runtime_error("Invalid Binary mode; value can be 1 or 2"); } } if (vm.count("tz")) { auto tz = vm["tz"].as(); long offset; if (tz != "SYSTEM" && dataconvert::timeZoneToOffset(tz.c_str(), tz.size(), &offset)) { throw runtime_error("Value for option --tz/-T is invalid"); } fTimeZone = tz; } if (vm.count("job-id")) { errno = 0; string optarg = vm["job-id"].as(); long lValue = strtol(optarg.c_str(), nullptr, 10); if (errno != 0 || lValue < 0 || lValue > INT_MAX) { throw runtime_error("Option --job-id/-j is invalid or out of range"); } fJobId = optarg; fOrigJobId = fJobId; if (fJobId.empty()) { throw runtime_error("Wrong JobID Value"); } aJobType = true; } if (vm.count("verbose")) { string optarg = vm["verbose"].as(); fVerbose = fDebugLvl = optarg.length(); } if (vm.count("batch-quantity")) { if (fBatchQty < 10000) { fBatchQty = 10000; } else if (fBatchQty > 100000) { fBatchQty = 10000; } } if (vm.count("max-errors")) { auto optarg = vm["max-errors"].as(); if (optarg == "all") { fMaxErrors = MAX_ERRORS_ALL; } else { errno = 0; long lValue = strtol(optarg.c_str(), nullptr, 10); if (errno != 0 || lValue < 0 || lValue > INT_MAX) { throw runtime_error("Option --max-errors/-e is invalid or out of range"); } fMaxErrors = lValue; } } if (fArgMode != -1) fMode = fArgMode; // BUG 4210 std::string bulkRootPath = getBulkRootDir(); checkForBulkLogDir(bulkRootPath); if (2 == fArgMode && fPmFilePath.empty()) throw runtime_error("-f option is mandatory with mode 2."); if (aJobType) { if (0 == fArgMode) throw runtime_error("Incompatible mode and option types"); if (vm.count("dbname")) { fSchema = vm["dbname"].as(); if (!vm.count("table")) { // if schema is there, table name should be there throw runtime_error("No table name specified with schema."); } fTable = vm["table"].as(); // 2nd pos parm if (vm.count("load-file")) // see if input file name is given { // 3rd pos parm fLocFile = vm["load-file"].as(); if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN")) { std::string aTmp = fLocFile; // BUG 4379 -f given? use that if ((!fPmFilePath.empty()) && (fMode == 1)) fLocFile = fPmFilePath + "/" + aTmp; else if (fPmFilePath.empty()) fLocFile = bulkRootPath + "/data/import/" + aTmp; } } else { if (!fPmFile.empty()) fLocFile = fPmFile; // BUG 4186 // else // else take it from the jobxml file // fLocFile = "STDIN"; // Historically cpimport works with jobfile as // -l && -f or // -fSTDIN as the stdin, it will override colxml loadfile entry // if -fSTDIN is not provided get i/p file from jobfile else if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN")) fLocFile = "STDIN"; // else take it from the jobxml file } if ((fSchema.length() > 0) && (fTable.length() > 0) && (fLocFile.length() > 0)) fJobLogOnly = true; } else { if (!fPmFile.empty()) { fLocFile = fPmFile; if (!fPmFilePath.empty()) { if (fPmFilePath == "STDIN") { throw runtime_error("Conflicting options -l and -fSTDIN"); } else { std::string aTmp = fLocFile; if ((!fPmFilePath.empty()) && (fMode == 1)) // BUG 4379 -f given? use that fLocFile = fPmFilePath + "/" + aTmp; else if (!fPmFilePath.empty()) fLocFile = bulkRootPath + "/data/import/" + aTmp; } } if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN") && (fPmFilePath.empty())) { std::string aTmp = fLocFile; fLocFile = bulkRootPath + "/data/import/" + aTmp; } } // BUG 4186 // else // fLocFile = "STDIN"; // Historically cpimport works with jobfile as // -l && -f or // -fSTDIN as the stdin, it will override colxml loadfile entry // if -fSTDIN is not provided get i/p file from jobfile else if ((!fPmFilePath.empty()) && (fPmFilePath == "STDIN")) fLocFile = "STDIN"; // else take it from the jobxml file } } // Get positional arguments, User can provide: // 1. no positional parameters - Mode 0 & stdin // 2. Two positional parameters (schema and table names) - Mode 1/2, stdin // 3. Three positional parameters (schema, table, and import file name) else if (vm.count("dbname")) // see if db schema name is given { if (fArgMode == 0) { // added the code as per BUG 4245 if (!fPmFilePath.empty()) { fLocFile = fPmFilePath; if (fLocFile != "STDIN") throw(runtime_error("ERROR: In Mode 0, -f option can only have value STDIN")); } else { fLocFile = vm["dbname"].as(); } if (vm.count("table")) // dest filename provided { fPmFile = vm["table"].as(); if ((fPmFile.at(0) != '/') && (fS3Key.empty())) { std::string aTmp = fPmFile; fPmFile = bulkRootPath + "/data/import/" + aTmp; } } else // no dest filename { if (fLocFile == "STDIN") throw(runtime_error("ERROR: Destination file name required!!")); if (fLocFile.at(0) == '/') // local FQ-filename,parse out filename fPmFile = getFileNameFromPath(fLocFile); else fPmFile = fLocFile; if ((fPmFile.at(0) != '/') && (fS3Key.empty())) // should be true all the time { std::string aTmp = fPmFile; fPmFile = bulkRootPath + "/data/import/" + aTmp; } } /* commented out for BUG 4245 if(fPmFilePath.empty()) fLocFile = argv[optind]; else fLocFile = fPmFilePath +"/"+ argv[optind]; if (fPmFile.empty()) //BUG 4200 { //if(fLocFile.at(0)== '/') // fPmFile = fLocFile; //else if(fLocFile.at(0)!='/') fPmFile = bulkRootPath + "/data/import/"+ fLocFile; } else { if(fPmFile.at(0)!='/') { std::string aTmp = fPmFile; fPmFile = bulkRootPath + "/data/import/"+aTmp; } } */ } else fSchema = vm["dbname"].as(); // 1st pos parm if (vm.count("table")) // see if table name is given { fTable = vm["table"].as(); // 2nd pos parm if (vm.count("load-file")) // see if input file name is given { // 3rd pos parm fLocFile = vm["load-file"].as(); // BUG 4379 if -f option given we need to use that path, // over riding bug 4231. look at the code below // BUG 4231 - This bug over writes 4199 and commenting out changes // BUG 4199 // Path not provided, not fully qualified, Look in import dir // if((fLocFile.at(0)!= '/')&&(fLocFile != "STDIN")) //{ // std::string aTmp = fLocFile; // fLocFile = bulkRootPath + "/data/import/"+ aTmp; //} // BUG 4379 if -f option given we need to use that path if ((fLocFile.at(0) != '/') && (fLocFile != "STDIN")) { std::string aTmp = fLocFile; // if -f given? use that otherwise just go ahead with CWD if ((!fPmFilePath.empty()) && (fMode == 1)) fLocFile = fPmFilePath + "/" + aTmp; // TODO - if -f option is given and a list of files are // are provided, we need to be able to import all that. } } else { if (fPmFile.length() > 0) { // BUG 4210 // if (fPmFilePath.length() > 0) //{ // fLocFile = fPmFilePath +"/"+ fPmFile; //} // else if (fPmFilePath.empty()) { // NOTE - un-commenting with an if statement for Mode 2 // BUG 4231 makes it comment out the below changes, // This will not change even though directly, to be // on safer side, we should take out this too. // check path fully qualified? then set as data import if (2 == fArgMode) { // BUG 4342 if ((fPmFile.at(0) != '/') && (fS3Key.empty())) { std::string aTmp = fPmFile; fPmFile = PrepMode2ListOfFiles(aTmp); } else { if (fPmFile.find_first_of(' ') != string::npos) { std::string aPmFiles = replaceCharInStr(fPmFile, ' ', ','); fPmFile = aPmFiles; } } } fLocFile = fPmFile; } } else { fLocFile = "STDIN"; } // cout << "LocFile set as stdin" << endl; } } else { // If Mode is not 0 and table name is a required argument if (fArgMode != 0) throw(runtime_error("No table name specified with schema.")); } } else { // for testing we are allowing data from stdin even with Mode 0 // that is without LocFileName if (0 == fArgMode) { fLocFile = "STDIN"; // cout << "LocFile set as stdin" << endl; } else { // If Mode 0, LocFileName is reqd and otherwies Schema is required throw(runtime_error("No schema or local filename specified.")); } } /* check for all-or-nothing cmdline args to enable S3 import */ int s3Tmp = (fS3Key.empty() ? 0 : 1) + (fS3Bucket.empty() ? 0 : 1) + (fS3Secret.empty() ? 0 : 1) + (fS3Region.empty() ? 0 : 1); if (s3Tmp != 0 && s3Tmp != 4) throw runtime_error("The access key, secret, bucket, and region are all required to import from S3"); } std::string WECmdArgs::getJobFileName() { std::ostringstream aSS; string aJobIdFileName; if (fJobId.length() > 0) { if (fJobPath.length() > 0) aSS << fJobPath; else { fJobPath = config::Config::makeConfig()->getConfig("WriteEngine", "BulkRoot") + "/Job"; aSS << fJobPath; } aSS << "/Job_" << fJobId << ".xml"; aJobIdFileName = aSS.str(); } return aJobIdFileName; } bool WECmdArgs::getPmStatus(int Id) { // if no PMID's provided on cmdline, return true; if (0 == fPmVec.size()) return true; VecInts::iterator aIt = fPmVec.begin(); while (aIt != fPmVec.end()) { if (*aIt == static_cast(Id)) return true; ++aIt; } return false; } //------------------------------------------------------------------------------ // It is a recursive call. std::string WECmdArgs::getBrmRptFileName() { if (!fBrmRptFile.empty()) return fBrmRptFile; string brmRptFileName = getTmpFileDir(); if (!brmRptFileName.empty()) { fTmpFileDir = brmRptFileName; char aBuff[64]; time_t aTime; struct tm pTm; time(&aTime); localtime_r(&aTime, &pTm); // BUG 4424 // M D H M S snprintf(aBuff, sizeof(aBuff), "/BrmRpt%02d%02d%02d%02d%02d%d.rpt", pTm.tm_mon, pTm.tm_mday, pTm.tm_hour, pTm.tm_min, pTm.tm_sec, getpid()); brmRptFileName += aBuff; } else { // cout << "ERROR: Could not find TempFileDir in Columnstore.xml" << endl; throw(runtime_error("Could not find TempFileDir in Columnstore.xml")); } setBrmRptFileName(brmRptFileName); return brmRptFileName; } //------------------------------------------------------------------------------ void WECmdArgs::addJobFilesToVector(std::string& JobName) { // if((!fSchema.empty())&&(!fTable.empty())&&(!fLocFile.empty())) return; WEXmlgetter aXmlGetter(JobName); vector aSections; aSections.push_back("BulkJob"); aSections.push_back("Schema"); aSections.push_back("Table"); // BUG 4163 typedef std::vector TableVec; TableVec aTableVec; aXmlGetter.getConfig(aSections[1], aSections[2], aTableVec); setMultiTableCount(aTableVec.size()); if (getMultiTableCount() > 1) { splitConfigFilePerTable(JobName, aTableVec.size()); } else { fVecJobFiles.push_back(JobName); } } //------------------------------------------------------------------------------ // Set the schema, table, and loadfile name from the xml job file. // If running in binary mode, we also get the list of columns for the table, // so that we can determine the exact fixed record length of the incoming data. //------------------------------------------------------------------------------ void WECmdArgs::setSchemaAndTableFromJobFile(std::string& JobName) { if (((fVecJobFiles.size() == 1) && (!fSchema.empty()) && (!fTable.empty()) && (!fLocFile.empty())) && (fImportDataMode == IMPORT_DATA_TEXT)) return; WEXmlgetter aXmlGetter(JobName); vector aSections; aSections.push_back("BulkJob"); aSections.push_back("Schema"); aSections.push_back("Table"); // Reset the fSchema, fTable, and FLocFile if ((fVecJobFiles.size() > 1) || (fSchema.empty()) || (fTable.empty()) || (fLocFile.empty())) { std::string aSchemaTable; std::string aInputFile; aSchemaTable = aXmlGetter.getAttribute(aSections, "tblName"); if (getDebugLvl() > 1) cout << "schema.table = " << aSchemaTable << endl; aInputFile = aXmlGetter.getAttribute(aSections, "loadName"); if (getDebugLvl() > 1) cout << "xml::InputFile = " << aInputFile << endl; if (aSchemaTable.length() > 0) { char aSchema[64]; char aTable[64]; int aRet = aSchemaTable.find('.'); if (aRet > 0) { int aLen = aSchemaTable.copy(aSchema, aRet); if (getDebugLvl() > 1) cout << "Schema: " << aSchema << endl; aSchema[aLen] = 0; if (fSchema.empty()) fSchema = aSchema; aLen = aSchemaTable.copy(aTable, aSchemaTable.length(), aRet + 1); aTable[aLen] = 0; if (getDebugLvl() > 1) cout << "Table: " << aTable << endl; fTable = aTable; } else throw runtime_error("JobFile ERROR: Can't get Schema and Table Name"); } else { throw runtime_error("JobFile ERROR: Can't get Schema and Table Name"); } if ((fLocFile.empty()) && (!aInputFile.empty())) { string bulkRootPath = config::Config::makeConfig()->getConfig("WriteEngine", "BulkRoot"); if (aInputFile.at(0) == '/') fLocFile = aInputFile; else if ((!fPmFilePath.empty()) && (fMode == 1)) fLocFile = fPmFilePath + "/" + aInputFile; else if ((!bulkRootPath.empty()) && (fPmFilePath.empty())) fLocFile = bulkRootPath + "/data/import/" + aInputFile; else fLocFile = aInputFile; if (fArgMode == 2) fPmFile = fLocFile; } if (getDebugLvl() > 1) cout << "schema = " << fSchema << endl; if (getDebugLvl() > 1) cout << "TableName = " << fTable << endl; if (getDebugLvl() > 1) cout << "Input File = " << fLocFile << endl; } // Reset the list of columns we will be importing from the input data fColFldsFromJobFile.clear(); if (fImportDataMode != IMPORT_DATA_TEXT) { aSections.push_back("Column"); aXmlGetter.getAttributeListForAllChildren(aSections, "colName", fColFldsFromJobFile); } } //------------------------------------------------------------------------------ void WECmdArgs::checkJobIdCase() { if ((fJobId.empty()) || (fJobLogOnly) || (fMode == 3) || (fMode == 0)) return; if (fJobPath.empty()) { string bulkRootPath = config::Config::makeConfig()->getConfig("WriteEngine", "BulkRoot"); // cout << "checkJobIdCase::BulkRoot: " << bulkRootPath << endl; if (!bulkRootPath.empty()) fJobPath = bulkRootPath + "/job"; else throw runtime_error("Config Error: BulkRoot not found in Columnstore.xml"); } char aBuff[256]; if (!fJobPath.empty()) snprintf(aBuff, sizeof(aBuff), "%s/Job_%s.xml", fJobPath.c_str(), fJobId.c_str()); else // for time being snprintf(aBuff, sizeof(aBuff), "/var/log/mariadb/columnstore/data/bulk/job/Job_%s.xml", fJobId.c_str()); std::string aJobFileName(aBuff); // cout << "checkJobIdCase::aJobFileName: " << aJobFileName << endl; // BUG 4171 addJobFilesToVector(aJobFileName); aJobFileName = fVecJobFiles[0]; setSchemaAndTableFromJobFile(aJobFileName); setEnclByAndEscCharFromJobFile(aJobFileName); } //------------------------------------------------------------------------------ std::string WECmdArgs::getTmpFileDir() { if (!fTmpFileDir.empty()) return fTmpFileDir; fTmpFileDir = startup::StartUp::tmpDir(); if (fTmpFileDir.empty()) throw(runtime_error("Config ERROR: TmpFileDir not found!!")); else return fTmpFileDir; } //------------------------------------------------------------------------------ std::string WECmdArgs::getBulkRootDir() { if (!fBulkRoot.empty()) return fBulkRoot; fBulkRoot = config::Config::makeConfig()->getConfig("WriteEngine", "BulkRoot"); if (fBulkRoot.empty()) throw(runtime_error("Config ERROR: not found!!")); else return fBulkRoot; } //------------------------------------------------------------------------------ unsigned int WECmdArgs::getBatchQuantity() { return (fBatchQty >= 10000) ? fBatchQty : 10000; // default Batch Qty is 10000 } //------------------------------------------------------------------------------ void WECmdArgs::setEnclByAndEscCharFromJobFile(std::string& JobName) { if (fEnclosedChar == 0) // check anything in Jobxml file { WEXmlgetter aXmlGetter(JobName); const vector aSections{"BulkJob", "EnclosedByChar"}; try { // std::string aTable = aXmlGetter.getConfig(aSection, aElement); std::string aEnclosedBy = aXmlGetter.getValue(aSections); if (getDebugLvl() > 1) cout << "aEncloseBy = " << aEnclosedBy << endl; if (!aEnclosedBy.empty()) { fEnclosedChar = aEnclosedBy.at(0); } } catch (std::runtime_error&) { // do not do anything } } if (fEscChar == 0) // check anything in Jobxml file { WEXmlgetter aXmlGetter(JobName); const vector aSections{"BulkJob", "EscapeChar"}; try { // std::string aTable = aXmlGetter.getConfig(aSection, aElement); std::string aEscChar = aXmlGetter.getValue(aSections); if (getDebugLvl() > 1) cout << "aEscapeChar = " << aEscChar << endl; if (!aEscChar.empty()) { fEscChar = aEscChar.at(0); } } catch (std::runtime_error&) { // do not do anything } } } //------------------------------------------------------------------------------ std::string WECmdArgs::getFileNameFromPath(const std::string& Path) const { char aBuff[64]; int iDx = Path.find_last_of('/'); iDx++; // compensate for the forward slash int aCx = Path.size() - iDx; Path.copy(aBuff, aCx, iDx); aBuff[aCx] = 0; return aBuff; } //------------------------------------------------------------------------------ std::string WECmdArgs::getModuleID() { oam::Oam oam; oam::oamModuleInfo_t sModInfo; std::string sModuleID; char szModuleIDandPID[64]; int nModuleNumber; try { sModInfo = oam.getModuleInfo(); sModuleID = boost::get<1>(sModInfo); nModuleNumber = boost::get<2>(sModInfo); snprintf(szModuleIDandPID, sizeof(szModuleIDandPID), "%s%d-%d", sModuleID.c_str(), nModuleNumber, getpid()); sModuleID = szModuleIDandPID; } catch (exception&) { sModuleID = "unknown"; } return sModuleID; } //------------------------------------------------------------------------------ void WECmdArgs::splitConfigFilePerTable(std::string& ConfigName, int tblCount) { std::string aOpenTag = " aVecFiles; // std::vector aVecConfigs; for (int aIdx = 1; aIdx <= tblCount; aIdx++) { char aConfName[128]; snprintf(aConfName, sizeof(aConfName), "%s_%d.xml", ConfigName.c_str(), aIdx); // aVecConfigs.push_back(aConfName); fVecJobFiles.push_back(aConfName); std::ofstream* pCopy = new std::ofstream; // pCopy->open(aConfName, std::ios_base::app); pCopy->open(aConfName); aVecFiles.push_back(pCopy); } std::ifstream aMaster; aMaster.open(ConfigName.c_str()); if (aMaster.is_open()) { char aBuff[256]; int aTblNo = 0; size_t aStrPos = std::string::npos; bool aOpenFound = false; bool aCloseFound = false; while (!aMaster.eof()) { aMaster.getline(aBuff, sizeof(aBuff) - 1); unsigned int aLen = aMaster.gcount(); if ((aLen < (sizeof(aBuff) - 2)) && (aLen > 0)) { aBuff[aLen - 1] = '\n'; aBuff[aLen] = 0; string aData = aBuff; // cout << "Data Read " << aBuff; if (!aOpenFound) { aStrPos = aData.find(aOpenTag); if (aStrPos != std::string::npos) { aOpenFound = true; aTblNo++; write2ConfigFiles(aVecFiles, aBuff, aTblNo); } else { if ((!aOpenFound) && (aCloseFound)) { aStrPos = aData.find(aCloseSchemaTag); if (aStrPos != std::string::npos) { aOpenFound = false; aCloseFound = false; aTblNo = 0; } } write2ConfigFiles(aVecFiles, aBuff, aTblNo); } } else { aStrPos = aData.find(aCloseTag); if (aStrPos != std::string::npos) { aOpenFound = false; aCloseFound = true; write2ConfigFiles(aVecFiles, aBuff, aTblNo); } else { write2ConfigFiles(aVecFiles, aBuff, aTblNo); } } } } // while Master.eof } else { throw runtime_error("Could not open Job Config file"); } for (unsigned int Idx = 0; Idx < aVecFiles.size(); Idx++) { aVecFiles[Idx]->close(); delete aVecFiles[Idx]; } aVecFiles.clear(); } //------------------------------------------------------------------------------ void WECmdArgs::write2ConfigFiles(std::vector& Files, char* pBuff, int FileIdx) { if (FileIdx == 0) { std::vector::iterator aIt = Files.begin(); while (aIt != Files.end()) { std::ofstream* pCopy = (*aIt); pCopy->write(pBuff, strlen(pBuff)); ++aIt; } } else { Files[FileIdx - 1]->write(pBuff, strlen(pBuff)); } } //------------------------------------------------------------------------------ void WECmdArgs::updateWithJobFile(int Idx) { setLocFile(""); // resetting the from the previous import std::string aJobFileName = fVecJobFiles[Idx]; setSchemaAndTableFromJobFile(aJobFileName); setEnclByAndEscCharFromJobFile(aJobFileName); setJobFileName(aJobFileName); std::ostringstream aSS; aSS << fOrigJobId << ".xml_" << (Idx + 1); fJobId = aSS.str(); } //------------------------------------------------------------------------------ std::string WECmdArgs::replaceCharInStr(const std::string& Str, char C, char R) { std::stringstream aSs; size_t start = 0, end = 0; end = Str.find_first_of(C); do { if (end != string::npos) { aSs << Str.substr(start, end - start) << R; start = end + 1; } else { aSs << Str.substr(start, end - start); break; } end = Str.find_first_of(C, start); } while (start != end); return aSs.str(); } //------------------------------------------------------------------------------ // Introduced to handle Bug 4342 with Mode 2 std::string WECmdArgs::PrepMode2ListOfFiles(std::string& FileName) { VecArgs aInfileList; std::string bulkRootPath = getBulkRootDir(); // cout << "Inside PrepMode2ListOfFiles("<< FileName << ")" << endl; std::string aFileName = FileName; istringstream iss(aFileName); size_t start = 0, end = 0; const char* sep = " ,|"; end = aFileName.find_first_of(sep); do { if (end != string::npos) { std::string aFile = aFileName.substr(start, end - start); if (getDebugLvl() > 1) cout << "File: " << aFileName.substr(start, end - start) << endl; start = end + 1; aInfileList.push_back(aFile); } else { std::string aFile = aFileName.substr(start, end - start); if (getDebugLvl() > 1) cout << "Next Input File " << aFileName.substr(start, end - start) << endl; aInfileList.push_back(aFile); break; } end = aFileName.find_first_of(sep, start); } while (start != end); std::ostringstream aSS; int aVecSize = aInfileList.size(); int aVecIdx = 0; // Take file list one by one and append it to one string while (aVecIdx < aVecSize) { std::string aNextFile = aInfileList[aVecIdx]; aVecIdx++; // aInfileList.pop_front(); if (aNextFile.at(0) != '/') { aSS << bulkRootPath << "/data/import/" + aNextFile; } else { aSS << aNextFile; } if (aVecIdx < aVecSize) aSS << ","; } // cout << "File list are = " << aSS.str() << endl; return aSS.str(); } //------------------------------------------------------------------------------ // Get set of column names in the "current" table being processed from the // Job xml file. //------------------------------------------------------------------------------ void WECmdArgs::getColumnList(std::set& columnList) const { columnList.clear(); for (unsigned k = 0; k < fColFldsFromJobFile.size(); k++) { columnList.insert(fColFldsFromJobFile[k]); } } //----------------------------------------------------------------------------- // check for the bulkload log directory. If it is not existing, create it // w.r.t Bug 6137 //----------------------------------------------------------------------------- void WECmdArgs::checkForBulkLogDir(const std::string& BulkRoot) { if (!boost::filesystem::exists(BulkRoot.c_str())) { cout << "Creating directory : " << BulkRoot << endl; boost::filesystem::create_directories(BulkRoot.c_str()); } if (boost::filesystem::exists(BulkRoot.c_str())) { // create the job directory also if not existing std::ostringstream aSS; aSS << BulkRoot; aSS << "/job"; std::string jobDir = aSS.str(); if (!boost::filesystem::exists(jobDir.c_str())) { cout << "Creating directory : " << jobDir << endl; bool aSuccess = boost::filesystem::create_directories(jobDir.c_str()); if (!aSuccess) { cout << "\nFailed to create job directory, check permissions\n" << endl; throw runtime_error("Failed to create job directory, check permissions"); } } } else { cout << "\nFailed to create bulk directory, check permissions\n" << endl; throw runtime_error("Failed to create bulk directory, check permissions"); } } } /* namespace WriteEngine */