From 3faa1600c34f5af0ba5227f04b7e521afdd14c87 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Mon, 10 Feb 2020 11:57:33 -0500 Subject: [PATCH] Merge pull request #1031 from pleblanc1976/we-splitter-read-from-s3 MCOL-3520: Fix importing files from S3 for mode 1 imports. --- writeengine/splitter/CMakeLists.txt | 4 +- writeengine/splitter/we_cmdargs.cpp | 13 +- writeengine/splitter/we_cmdargs.h | 52 +++++--- writeengine/splitter/we_filereadthread.cpp | 142 ++++++++++++++++----- writeengine/splitter/we_filereadthread.h | 18 +++ writeengine/splitter/we_sdhandler.cpp | 34 +---- writeengine/splitter/we_sdhandler.h | 2 +- 7 files changed, 186 insertions(+), 79 deletions(-) diff --git a/writeengine/splitter/CMakeLists.txt b/writeengine/splitter/CMakeLists.txt index c02645534..1ab3f4b20 100644 --- a/writeengine/splitter/CMakeLists.txt +++ b/writeengine/splitter/CMakeLists.txt @@ -1,5 +1,5 @@ -include_directories( ${ENGINE_COMMON_INCLUDES} ) +include_directories( ${ENGINE_COMMON_INCLUDES} ${S3API_DIR} ) ########### next target ############### @@ -17,7 +17,7 @@ set(cpimport_SRCS add_executable(cpimport ${cpimport_SRCS}) -target_link_libraries(cpimport ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} batchloader threadpool) +target_link_libraries(cpimport ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} batchloader threadpool marias3) install(TARGETS cpimport DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-platform) diff --git a/writeengine/splitter/we_cmdargs.cpp b/writeengine/splitter/we_cmdargs.cpp index f10dc690c..eafa6e559 100644 --- a/writeengine/splitter/we_cmdargs.cpp +++ b/writeengine/splitter/we_cmdargs.cpp @@ -217,10 +217,10 @@ std::string WECmdArgs::getCpImportCmdLine() if (fbTruncationAsError) aSS << " -S "; - if (!fS3Key.empty()) + if (!fS3Key.empty() && !(fMode == 0 || fMode == 1)) { if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty()) - throw (runtime_error("Not all requried S3 options provided")); + throw (runtime_error("Not all required S3 options provided")); aSS << " -y " << fS3Key; aSS << " -K " << fS3Secret; aSS << " -t " << fS3Bucket; @@ -432,6 +432,8 @@ bool WECmdArgs::checkForCornerCases() throw(runtime_error("Mode 2 require remote file opts -f and -l or "\ "a fully qualified path for the remote file." "\nTry 'cpimport -h' for more information.")); + if (!fS3Key.empty()) + throw(runtime_error("Mode 2 & an input file from S3 does not make sense.")); } if (fMode == 3) @@ -1233,7 +1235,12 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) throw (runtime_error("No schema or local filename specified.")); } } - + + /* check for all-or-nothing cmdline args to enable S3 import */ + int s3Tmp = (fS3Key.empty() ? 0 : 1) + (fS3Bucket.empty() ? 0 : 1) + + (fS3Secret.empty() ? 0 : 1) + (fS3Region.empty() ? 0 : 1); + if (s3Tmp != 0 && s3Tmp != 4) + throw runtime_error("The access key, secret, bucket, and region are all required to import from S3"); } std::string WECmdArgs::getJobFileName() diff --git a/writeengine/splitter/we_cmdargs.h b/writeengine/splitter/we_cmdargs.h index d02600070..8c2b89c97 100644 --- a/writeengine/splitter/we_cmdargs.h +++ b/writeengine/splitter/we_cmdargs.h @@ -37,7 +37,10 @@ class WECmdArgs public: WECmdArgs(int argc, char** argv); virtual ~WECmdArgs() {} -public: + + typedef std::vector VecInts; + typedef std::vector VecArgs; + void appTestFunction(); void parseCmdLineArgs(int argc, char** argv); std::string getCpImportCmdLine(); @@ -54,7 +57,6 @@ public: char* pBuff, int FileIdx); void updateWithJobFile(int Idx); -public: std::string getJobFileName(); std::string getBrmRptFileName(); std::string getTmpFileDir(); @@ -195,19 +197,6 @@ public: return fTimeZone; } - -private: // variables for SplitterApp - typedef std::vector VecArgs; - VecArgs fVecArgs; - typedef std::vector VecInts; - VecInts fPmVec; - - VecArgs fVecJobFiles; //JobFiles splitter from master JobFile - int fMultiTableCount; //MultiTable count - VecArgs fColFldsFromJobFile;//List of columns from any job file, that - // represent fields in the import data - -public: bool getPmStatus(int Id); bool str2PmList(std::string& PmList, VecInts& V); int getPmVecSize() @@ -252,11 +241,44 @@ public: { fMultiTableCount = Count; } + + bool isS3Import() const + { + return !fS3Key.empty(); + } + std::string getS3Key() const + { + return fS3Key; + } + std::string getS3Bucket() const + { + return fS3Bucket; + } + std::string getS3Host() const + { + return fS3Host; + } + std::string getS3Secret() const + { + return fS3Secret; + } + std::string getS3Region() const + { + return fS3Region; + } std::string PrepMode2ListOfFiles(std::string& FileName); // Bug 4342 void getColumnList( std::set& columnList ) const; private: // variables for SplitterApp + VecArgs fVecArgs; + VecInts fPmVec; + + VecArgs fVecJobFiles; //JobFiles splitter from master JobFile + int fMultiTableCount; //MultiTable count + VecArgs fColFldsFromJobFile;//List of columns from any job file, that + // represent fields in the import data + std::string fJobId; // JobID std::string fOrigJobId; // Original JobID, in case we have to split it bool fJobLogOnly; // Job number is only for log filename only diff --git a/writeengine/splitter/we_filereadthread.cpp b/writeengine/splitter/we_filereadthread.cpp index 2d7fb4731..f6aa8f3b1 100644 --- a/writeengine/splitter/we_filereadthread.cpp +++ b/writeengine/splitter/we_filereadthread.cpp @@ -30,6 +30,7 @@ #include "we_messages.h" #include "we_sdhandler.h" +#include "we_splitterapp.h" #include #include @@ -99,6 +100,8 @@ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh): fSdh(aSdh), fBuff = new char [fBuffSize]; + const WECmdArgs &args = fSdh.fRef.fCmdArgs; + initS3Connection(args); } //WEFileReadThread::WEFileReadThread(const WEFileReadThread& rhs):fSdh(rhs.fSdh) @@ -122,6 +125,18 @@ WEFileReadThread::~WEFileReadThread() fpThread = 0; delete []fBuff; //cout << "WEFileReadThread destructor called" << endl; + + if (doS3Import) + { + ms3_deinit(s3Connection); + ms3_library_deinit(); + if (buf) + { + s3Stream.reset(); + arrSource.reset(); + free(buf); + } + } } //------------------------------------------------------------------------------ @@ -141,6 +156,14 @@ void WEFileReadThread::reset() fpThread = 0; //cout << "WEFileReadThread destructor called" << endl; this->setContinue(true); + + if (buf) + { + arrSource.reset(); + s3Stream.reset(); + free(buf); + buf = NULL; + } } //------------------------------------------------------------------------------ @@ -200,37 +223,33 @@ bool WEFileReadThread::chkForListOfFiles(std::string& FileName) std::string aFileName = FileName; istringstream iss(aFileName); + ostringstream oss; size_t start = 0, end = 0; const char* sep = " ,|"; - - end = aFileName.find_first_of(sep); + ms3_status_st ms3status; do { - if (end != string::npos) - { - std::string aFile = aFileName.substr(start, end - start); - - if (fSdh.getDebugLvl() > 2) - cout << "File: " << aFileName.substr(start, end - start) << endl; - - start = end + 1; - fInfileList.push_back(aFile); - } - else - { - std::string aFile = aFileName.substr(start, end - start); - - if (fSdh.getDebugLvl() > 1) - cout << "Next Input File " << aFileName.substr(start, end - start) << endl; - - fInfileList.push_back(aFile); - break; - } - end = aFileName.find_first_of(sep, start); + std::string aFile = aFileName.substr(start, end - start); + if (aFile == "STDIN" || aFile == "stdin") + aFile = "/dev/stdin"; + + if (fSdh.getDebugLvl() > 1) + cout << "Next Input File " << aFile << endl; + + if ((!doS3Import && access(aFile.c_str(), O_RDONLY) != 0) || + (doS3Import && ms3_status(s3Connection, s3Bucket.c_str(), + aFile.c_str(), &ms3status) != 0)) + { + oss << "Could not access " << aFile; + throw runtime_error(oss.str()); + } + + fInfileList.push_back(aFile); + start = end + 1; } - while (start != end); + while (end != string::npos); //cout << "Going out chkForListOfFiles("<< FileName << ")" << endl; @@ -267,6 +286,13 @@ void WEFileReadThread::shutdown() //if(fInFile.is_open()) fInFile.close(); //@BUG 4326 if (fIfFile.is_open()) fIfFile.close(); + if (buf) + { + s3Stream.reset(); + arrSource.reset(); + free(buf); + buf = NULL; + } } //------------------------------------------------------------------------------ @@ -451,9 +477,48 @@ void WEFileReadThread::openInFile() { try { - if (fSdh.getDebugLvl()) cout << "Input FileName: " << fInFileName << endl; + /* If an S3 transfer + use ms3 lib to d/l data into mem + use boost::iostreams to wrap the mem in a stream interface + point infile's stream buffer to it. + */ + + if (fSdh.getDebugLvl()) cout << "Input Filename: " << fInFileName << endl; - if (fInFileName == "/dev/stdin") + if (doS3Import) + { + size_t bufLen = 0; + if (buf) + { + s3Stream.reset(); + arrSource.reset(); + free(buf); + buf = NULL; + } + if (fSdh.getDebugLvl()) + cout << "Downloading " << fInFileName << endl; + int err = ms3_get(s3Connection, s3Bucket.c_str(), fInFileName.c_str(), + &buf, &bufLen); + if (fSdh.getDebugLvl()) + cout << "Download complete." << endl; + if (err) + { + ostringstream os; + if (ms3_server_error(s3Connection)) + os << "Download of '" << fInFileName << "' failed. Error from the server: " + << ms3_server_error(s3Connection); + else + os << "Download of '" << fInFileName << "' failed. Got '" << ms3_error(err) + << "'."; + throw runtime_error(os.str()); + } + + arrSource.reset(new boost::iostreams::array_source((char *) buf, bufLen)); + s3Stream.reset(new boost::iostreams::stream(*arrSource)); + fInFile.rdbuf(s3Stream->rdbuf()); + } + + else if (fInFileName == "/dev/stdin") { char aDefCon[16], aGreenCol[16]; snprintf(aDefCon, sizeof(aDefCon), "\033[0m"); @@ -463,12 +528,11 @@ void WEFileReadThread::openInFile() cout << aGreenCol << "trying to read from STDIN... " << aDefCon << endl; + fInFile.rdbuf(cin.rdbuf()); } - cout.flush(); - //@BUG 4326 - if (fInFileName != "/dev/stdin") + else if (fInFileName != "/dev/stdin") { if (!fIfFile.is_open()) { @@ -592,6 +656,26 @@ int WEFileReadThread::getNextRow(istream& ifs, char* pBuf, int MaxLen) return pEnd - pBuf; } +void WEFileReadThread::initS3Connection(const WECmdArgs &args) +{ + doS3Import = args.isS3Import(); + if (doS3Import) + { + s3Key = args.getS3Key(); + s3Secret = args.getS3Secret(); + s3Bucket = args.getS3Bucket(); + s3Region = args.getS3Region(); + s3Host = args.getS3Host(); + ms3_library_init(); + s3Connection = ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? NULL : s3Host.c_str())); + if (!s3Connection) + throw runtime_error("failed to get an S3 connection"); + } + else + s3Connection = NULL; + buf = NULL; +} + //------------------------------------------------------------------------------ diff --git a/writeengine/splitter/we_filereadthread.h b/writeengine/splitter/we_filereadthread.h index 2fb92332c..f81f5d492 100644 --- a/writeengine/splitter/we_filereadthread.h +++ b/writeengine/splitter/we_filereadthread.h @@ -30,6 +30,11 @@ #ifndef WE_FILEREADTHREAD_H_ #define WE_FILEREADTHREAD_H_ +#include "we_cmdargs.h" +#include "libmarias3/marias3.h" +#include +#include + namespace WriteEngine { @@ -153,6 +158,19 @@ private: char fDelim; // Column Delimit char char* fBuff; // main data buffer int fBuffSize; + + /* To support mode 1 imports from objects on S3 */ + void initS3Connection(const WECmdArgs &); + bool doS3Import; + std::string s3Key; + std::string s3Secret; + std::string s3Bucket; + std::string s3Region; + std::string s3Host; + ms3_st *s3Connection; + uint8_t *buf; + std::unique_ptr arrSource; + std::unique_ptr > s3Stream; }; } /* namespace WriteEngine */ diff --git a/writeengine/splitter/we_sdhandler.cpp b/writeengine/splitter/we_sdhandler.cpp index b23f61120..a819dbda2 100644 --- a/writeengine/splitter/we_sdhandler.cpp +++ b/writeengine/splitter/we_sdhandler.cpp @@ -501,14 +501,9 @@ void WESDHandler::setup() //fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), false); fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), getConsoleLog()); - // In mode 0 and Mode 1, we need to check for local file availability - if ((0 == fRef.fCmdArgs.getMode()) || (1 == fRef.fCmdArgs.getMode())) - { - if (!check4InputFile(fRef.getLocFile())) - { - throw (runtime_error("Could not open Input file " + fRef.getLocFile())); - } - } + // In mode 0 and Mode 1, we need to construct the input file list and check availability + if (0 == fRef.fCmdArgs.getMode() || 1 == fRef.fCmdArgs.getMode()) + setInputFileList(fRef.getLocFile()); fImportRslt.startTimer(); @@ -2726,28 +2721,9 @@ std::string WESDHandler::getTime2Str() const //------------------------------------------------------------------------------ -bool WESDHandler::check4InputFile(std::string InFileName) +void WESDHandler::setInputFileList(std::string InFileName) { - bool aRet = false; - - if ((0 == InFileName.compare("STDIN")) || (0 == InFileName.compare("stdin"))) - { - fFileReadThread.add2InputDataFileList(InFileName); - return true; - } - else - { - //BUG 4342 - Need to support "list of infiles" - fFileReadThread.chkForListOfFiles(InFileName); - std::string aFileName = fFileReadThread.getNextInputDataFile(); - std::ifstream aFile(aFileName.c_str()); - aRet = (aFile.good()) ? true : false; - - // add back to list, which we pop_front for checking the file. - if (aRet) fFileReadThread.add2InputDataFileList(aFileName); - } - - return aRet; + fFileReadThread.chkForListOfFiles(InFileName); } //------------------------------------------------------------------------------ diff --git a/writeengine/splitter/we_sdhandler.h b/writeengine/splitter/we_sdhandler.h index 551df4d9f..e8d624e1b 100644 --- a/writeengine/splitter/we_sdhandler.h +++ b/writeengine/splitter/we_sdhandler.h @@ -111,7 +111,7 @@ public: bool releaseTableLocks(); void check4CpiInvokeMode(); bool check4PmArguments(); - bool check4InputFile(std::string InFileName); + void setInputFileList(std::string InFileName); bool check4CriticalErrMsgs(std::string& Entry); void onStartCpiResponse(int PmId);