diff --git a/.gitignore b/.gitignore index fff4df904..0becb5fd0 100644 --- a/.gitignore +++ b/.gitignore @@ -110,3 +110,6 @@ columnstoreversion.h .build /.vs /CMakeSettings.json +tags +*.orig +*.diff diff --git a/.gitmodules b/.gitmodules index 4ff4be4ea..18b13dc8f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "storage-manager/libmarias3"] - path = storage-manager/libmarias3 + path = utils/libmarias3/libmarias3 url = https://github.com/mariadb-corporation/libmarias3.git diff --git a/storage-manager/CMakeLists.txt b/storage-manager/CMakeLists.txt index dbd7bb9fd..e3fcd9c6a 100755 --- a/storage-manager/CMakeLists.txt +++ b/storage-manager/CMakeLists.txt @@ -51,23 +51,6 @@ if (SM_LOG_TO_STDERR) add_definitions(-DDEBUG) endif() -set(S3API_DIR ${CMAKE_CURRENT_SOURCE_DIR}/libmarias3) - -include(ExternalProject) -ExternalProject_Add(ms3 - DOWNLOAD_COMMAND cd ${CMAKE_SOURCE_DIR} && git submodule update --init - CONFIGURE_COMMAND autoreconf -fi ${S3API_DIR} && ${S3API_DIR}/configure --enable-shared --disable-static --prefix=${CMAKE_BINARY_DIR} ${S3_CONFIGURE_OPT} - BUILD_COMMAND make - BUILD_IN_SOURCE 0 - EXCLUDE_FROM_ALL TRUE -) - -add_library(marias3 SHARED IMPORTED) -set_target_properties(marias3 PROPERTIES IMPORTED_LOCATION ${CMAKE_BINARY_DIR}/lib/libmarias3.so.3.1.2) -add_dependencies(marias3 ms3) - -set(S3API_DEPS marias3 curl xml2) - # get linkage right link_directories(${CMAKE_BINARY_DIR}/lib) set(CMAKE_INSTALL_RPATH $ORIGIN $ORIGIN/../lib) @@ -148,13 +131,6 @@ install(TARGETS StorageManager smcat smput smls smrm COMPONENT platform ) -install(PROGRAMS - ${CMAKE_BINARY_DIR}/lib/libmarias3.so.3.1.2 - ${CMAKE_BINARY_DIR}/lib/libmarias3.so.3 - ${CMAKE_BINARY_DIR}/lib/libmarias3.so - DESTINATION ${INSTALL_ENGINE}/lib - COMPONENT platform -) install(FILES storagemanager.cnf DESTINATION ${ENGINE_SYSCONFDIR}/columnstore diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index 405241232..bac0ef474 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -27,4 +27,4 @@ add_subdirectory(clusterTester) add_subdirectory(libmysql_client) add_subdirectory(regr) add_subdirectory(cloudio) - +add_subdirectory(libmarias3) diff --git a/utils/libmarias3/CMakeLists.txt b/utils/libmarias3/CMakeLists.txt new file mode 100644 index 000000000..9dd46ce65 --- /dev/null +++ b/utils/libmarias3/CMakeLists.txt @@ -0,0 +1,24 @@ +set(S3API_DIR ${CMAKE_CURRENT_SOURCE_DIR}/libmarias3 CACHE INTERNAL "S3API_DIR") + +include(ExternalProject) +ExternalProject_Add(ms3 + DOWNLOAD_COMMAND cd ${CMAKE_SOURCE_DIR} && git submodule update --init + CONFIGURE_COMMAND autoreconf -fi ${S3API_DIR} && ${S3API_DIR}/configure --enable-shared --disable-static --prefix=${CMAKE_BINARY_DIR} ${S3_CONFIGURE_OPT} + BUILD_COMMAND make + BUILD_IN_SOURCE 0 + EXCLUDE_FROM_ALL TRUE +) + +add_library(marias3 SHARED IMPORTED) +set_target_properties(marias3 PROPERTIES IMPORTED_LOCATION ${CMAKE_BINARY_DIR}/lib/libmarias3.so.3.1.2) +add_dependencies(marias3 ms3) + +set(S3API_DEPS marias3 curl xml2 CACHE INTERNAL "S3API_DEPS") + +install(PROGRAMS + ${CMAKE_BINARY_DIR}/lib/libmarias3.so.3.1.2 + ${CMAKE_BINARY_DIR}/lib/libmarias3.so.3 + ${CMAKE_BINARY_DIR}/lib/libmarias3.so + DESTINATION ${INSTALL_ENGINE}/lib + COMPONENT platform +) diff --git a/storage-manager/libmarias3 b/utils/libmarias3/libmarias3 similarity index 100% rename from storage-manager/libmarias3 rename to utils/libmarias3/libmarias3 diff --git a/writeengine/bulk/CMakeLists.txt b/writeengine/bulk/CMakeLists.txt index 772dcb8b8..ac934eb84 100644 --- a/writeengine/bulk/CMakeLists.txt +++ b/writeengine/bulk/CMakeLists.txt @@ -1,6 +1,7 @@ -include_directories( ${ENGINE_COMMON_INCLUDES} ) +include_directories( ${ENGINE_COMMON_INCLUDES} ${S3API_DIR} ) +link_directories(${CMAKE_BINARY_DIR}/lib) ########### next target ############### @@ -37,7 +38,7 @@ set(cpimport.bin_SRCS cpimport.cpp) add_executable(cpimport.bin ${cpimport.bin_SRCS}) -target_link_libraries(cpimport.bin ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} we_bulk we_xml) +target_link_libraries(cpimport.bin ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} ${S3API_DEPS} we_bulk we_xml) install(TARGETS cpimport.bin DESTINATION ${ENGINE_BINDIR} COMPONENT platform) diff --git a/writeengine/bulk/cpimport.cpp b/writeengine/bulk/cpimport.cpp index 129ab0beb..4f596ba7a 100644 --- a/writeengine/bulk/cpimport.cpp +++ b/writeengine/bulk/cpimport.cpp @@ -166,7 +166,12 @@ void printUsage() " -L send *.err and *.bad (reject) files here" << endl << " -T Timezone used for TIMESTAMP datatype" << endl << " Possible values: \"SYSTEM\" (default)" << endl << - " : Offset in the form +/-HH:MM" << endl << endl; + " : Offset in the form +/-HH:MM" << endl << endl << + " -y S3 Authentication Key (for S3 imports)" << endl << + " -K S3 Authentication Secret (for S3 imports)" << endl << + " -t S3 Bucket (for S3 imports)" << endl << + " -H S3 Hostname (for S3 imports, Amazon's S3 default)" << endl << + " -g S3 Regions (for S3 imports)" << endl; cout << " Example1:" << endl << " cpimport.bin -j 1234" << endl << @@ -317,7 +322,7 @@ void parseCmdLineArgs( std::string jobUUID; while ( (option = getopt( - argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:")) != EOF ) + argc, argv, "b:c:d:e:f:hij:kl:m:n:p:r:s:u:w:B:C:DE:I:P:R:ST:X:NL:y:K:t:H:g:")) != EOF ) { switch (option) { @@ -707,6 +712,37 @@ void parseCmdLineArgs( break; } + case 'y': + { + curJob.setS3Key(optarg); + break; + } + + case 'K': + { + curJob.setS3Secret(optarg); + break; + } + + case 't': + { + curJob.setS3Bucket(optarg); + break; + } + + case 'H': + { + curJob.setS3Host(optarg); + break; + } + + case 'g': + { + curJob.setS3Region(optarg); + break; + } + + default : { ostringstream oss; @@ -842,9 +878,14 @@ void parseCmdLineArgs( //------------------------------------------------------------------------------ void printInputSource( const std::string& alternateImportDir, - const std::string& jobDescFile) + const std::string& jobDescFile, + const std::string& S3Bucket) { - if (alternateImportDir.size() > 0) + if (!S3Bucket.empty()) + { + cout << "Input file will be read from S3 Bucket : " << S3Bucket << ", file/object : " << jobDescFile << endl; + } + else if (alternateImportDir.size() > 0) { if (alternateImportDir == IMPORT_PATH_CWD) { @@ -930,6 +971,7 @@ void constructTempXmlFile( const std::string& xmlGenSchema, const std::string& xmlGenTable, const std::string& alternateImportDir, + const std::string& S3Bucket, boost::filesystem::path& sFileName) { // Construct the job description file name @@ -957,7 +999,7 @@ void constructTempXmlFile( startupError( oss.str(), false ); } - printInputSource( alternateImportDir, sFileName.string() ); + printInputSource( alternateImportDir, sFileName.string(), S3Bucket ); TempXMLGenData genData( sJobIdStr, xmlGenSchema, xmlGenTable ); XMLGenProc genProc( &genData, @@ -1275,6 +1317,7 @@ int main(int argc, char** argv) xmlGenSchema, xmlGenTable, curJob.getAlternateImportDir(), + curJob.getS3Bucket(), sFileName); } else // create user's persistent job file name @@ -1300,7 +1343,7 @@ int main(int argc, char** argv) startupError( oss.str(), false ); } - printInputSource( curJob.getAlternateImportDir(), sFileName.string()); + printInputSource( curJob.getAlternateImportDir(), sFileName.string(), curJob.getS3Bucket()); } if (bDebug) diff --git a/writeengine/bulk/we_bulkload.cpp b/writeengine/bulk/we_bulkload.cpp index d8630ec70..9642e19fc 100644 --- a/writeengine/bulk/we_bulkload.cpp +++ b/writeengine/bulk/we_bulkload.cpp @@ -797,9 +797,11 @@ int BulkLoad::preProcess( Job& job, int tableNo, } // Initialize BulkLoadBuffers after we have added all the columns - tableInfo->initializeBuffers(fNoOfBuffers, - job.jobTableList[tableNo].fFldRefs, - fixedBinaryRecLen); + rc = tableInfo->initializeBuffers(fNoOfBuffers, + job.jobTableList[tableNo].fFldRefs, + fixedBinaryRecLen); + if (rc) + return rc; fTableInfo.push_back(tableInfo); @@ -1246,31 +1248,38 @@ int BulkLoad::manageImportDataFileList(Job& job, { std::string importDir; - if ( fAlternateImportDir == IMPORT_PATH_CWD ) // current working dir + if (!fS3Key.empty()) { - char cwdBuf[4096]; - importDir = ::getcwd(cwdBuf, sizeof(cwdBuf)); - importDir += '/'; + loadFilesList.push_back(loadFileName); } - else if ( fAlternateImportDir.size() > 0 ) // -f path + else { - importDir = fAlternateImportDir; - } - else // /data/import - { - importDir = fRootDir; - importDir += DIR_BULK_IMPORT; - } + if ( fAlternateImportDir == IMPORT_PATH_CWD ) // current working dir + { + char cwdBuf[4096]; + importDir = ::getcwd(cwdBuf, sizeof(cwdBuf)); + importDir += '/'; + } + else if ( fAlternateImportDir.size() > 0 ) // -f path + { + importDir = fAlternateImportDir; + } + else // /data/import + { + importDir = fRootDir; + importDir += DIR_BULK_IMPORT; + } - // Break down loadFileName into vector of file names in case load- - // FileName contains a list of files or 1 or more wildcards. - int rc = buildImportDataFileList(importDir, - loadFileName, - loadFilesList); + // Break down loadFileName into vector of file names in case load- + // FileName contains a list of files or 1 or more wildcards. + int rc = buildImportDataFileList(importDir, + loadFileName, + loadFilesList); - if (rc != NO_ERROR) - { - return rc; + if (rc != NO_ERROR) + { + return rc; + } } // No filenames is considered a fatal error, except for remote mode2. @@ -1284,7 +1293,7 @@ int BulkLoad::manageImportDataFileList(Job& job, if (fBulkMode == BULK_MODE_REMOTE_MULTIPLE_SRC) { - tableInfo->setLoadFilesInput(bUseStdin, loadFilesList); + tableInfo->setLoadFilesInput(bUseStdin, (!fS3Key.empty()), loadFilesList, fS3Host, fS3Key, fS3Secret, fS3Bucket, fS3Region); tableInfo->markTableComplete( ); fLog.logMsg( oss.str(), MSGLVL_INFO1 ); return NO_ERROR; @@ -1300,33 +1309,36 @@ int BulkLoad::manageImportDataFileList(Job& job, // We also used to check to make sure the input file is not empty, and // if it were, we threw an error at this point, but we removed that // check. With shared-nothing, an empty file is now acceptable. - for (unsigned ndx = 0; ndx < loadFilesList.size(); ndx++ ) + if (fS3Key.empty()) { - // in addition to being more portable due to the use of boost, this change - // actually fixes an inherent bug with cpimport reading from a named pipe. - // Only the first open call gets any data passed through the pipe so the - // here that used to do an open to test for existence meant cpimport would - // never get data from the pipe. - boost::filesystem::path pathFile(loadFilesList[ndx]); + for (unsigned ndx = 0; ndx < loadFilesList.size(); ndx++ ) + { + // in addition to being more portable due to the use of boost, this change + // actually fixes an inherent bug with cpimport reading from a named pipe. + // Only the first open call gets any data passed through the pipe so the + // here that used to do an open to test for existence meant cpimport would + // never get data from the pipe. + boost::filesystem::path pathFile(loadFilesList[ndx]); - if ( !boost::filesystem::exists( pathFile ) ) - { - ostringstream oss; - oss << "input data file " << loadFilesList[ndx] << " does not exist"; - fLog.logMsg( oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR ); - tableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); - return ERR_FILE_NOT_EXIST; - } - else - { - ostringstream oss; - oss << "input data file " << loadFilesList[ndx]; - fLog.logMsg( oss.str(), MSGLVL_INFO1 ); + if ( !boost::filesystem::exists( pathFile ) ) + { + ostringstream oss; + oss << "input data file " << loadFilesList[ndx] << " does not exist"; + fLog.logMsg( oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR ); + tableInfo->fBRMReporter.addToErrMsgEntry(oss.str()); + return ERR_FILE_NOT_EXIST; + } + else + { + ostringstream oss; + oss << "input data file " << loadFilesList[ndx]; + fLog.logMsg( oss.str(), MSGLVL_INFO1 ); + } } } } - tableInfo->setLoadFilesInput(bUseStdin, loadFilesList); + tableInfo->setLoadFilesInput(bUseStdin, (!fS3Key.empty()), loadFilesList, fS3Host, fS3Key, fS3Secret, fS3Bucket, fS3Region); return NO_ERROR; } diff --git a/writeengine/bulk/we_bulkload.h b/writeengine/bulk/we_bulkload.h index dee60d535..7ea707177 100644 --- a/writeengine/bulk/we_bulkload.h +++ b/writeengine/bulk/we_bulkload.h @@ -119,6 +119,11 @@ public: const std::string& getJobDir ( ) const; const std::string& getSchema ( ) const; const std::string& getTempJobDir ( ) const; + const std::string& getS3Key ( ) const; + const std::string& getS3Secret ( ) const; + const std::string& getS3Bucket ( ) const; + const std::string& getS3Host ( ) const; + const std::string& getS3Region ( ) const; bool getTruncationAsError ( ) const; BulkModeType getBulkLoadMode ( ) const; bool getContinue ( ) const; @@ -151,6 +156,11 @@ public: void setJobUUID ( const std::string& jobUUID ); void setErrorDir ( const std::string& errorDir ); void setTimeZone ( const std::string& timeZone ); + void setS3Key ( const std::string& key ); + void setS3Secret ( const std::string& secret ); + void setS3Bucket ( const std::string& bucket ); + void setS3Host ( const std::string& host ); + void setS3Region ( const std::string& region ); // Timer functions void startTimer ( ); void stopTimer ( ); @@ -230,6 +240,11 @@ private: boost::uuids::uuid fUUID; // job UUID static bool fNoConsoleOutput; // disable output to console std::string fTimeZone; // Timezone to use for TIMESTAMP data type + std::string fS3Key; // S3 Key + std::string fS3Secret; // S3 Secret + std::string fS3Host; // S3 Host + std::string fS3Bucket; // S3 Bucket + std::string fS3Region; // S3 Region //-------------------------------------------------------------------------- // Private Functions @@ -342,6 +357,31 @@ inline const std::string& BulkLoad::getTempJobDir( ) const return DIR_BULK_TEMP_JOB; } +inline const std::string& BulkLoad::getS3Key( ) const +{ + return fS3Key; +} + +inline const std::string& BulkLoad::getS3Secret( ) const +{ + return fS3Secret; +} + +inline const std::string& BulkLoad::getS3Bucket( ) const +{ + return fS3Bucket; +} + +inline const std::string& BulkLoad::getS3Host( ) const +{ + return fS3Host; +} + +inline const std::string& BulkLoad::getS3Region( ) const +{ + return fS3Region; +} + inline bool BulkLoad::getTruncationAsError ( ) const { return fbTruncationAsError; @@ -472,6 +512,31 @@ inline void BulkLoad::setTimeZone( const std::string& timeZone ) fTimeZone = timeZone; } +inline void BulkLoad::setS3Key( const std::string& key ) +{ + fS3Key = key; +} + +inline void BulkLoad::setS3Secret( const std::string& secret ) +{ + fS3Secret = secret; +} + +inline void BulkLoad::setS3Bucket( const std::string& bucket ) +{ + fS3Bucket = bucket; +} + +inline void BulkLoad::setS3Host( const std::string& host ) +{ + fS3Host = host; +} + +inline void BulkLoad::setS3Region( const std::string& region ) +{ + fS3Region = region; +} + inline void BulkLoad::startTimer( ) { gettimeofday( &fStartTime, 0 ); diff --git a/writeengine/bulk/we_bulkloadbuffer.cpp b/writeengine/bulk/we_bulkloadbuffer.cpp index fe0eaa70f..1c65c34d3 100644 --- a/writeengine/bulk/we_bulkloadbuffer.cpp +++ b/writeengine/bulk/we_bulkloadbuffer.cpp @@ -2039,6 +2039,106 @@ int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, return rc; } + +int BulkLoadBuffer::fillFromMemory( + const BulkLoadBuffer& overFlowBufIn, + const char* input, size_t length, size_t *parse_length, RID& totalReadRows, + RID& correctTotalRows, const boost::ptr_vector& columnsInfo, + unsigned int allowedErrCntThisCall ) +{ + boost::mutex::scoped_lock lock(fSyncUpdatesBLB); + reset(); + copyOverflow( overFlowBufIn ); + size_t readSize = 0; + + // Copy the overflow data from the last buffer, that did not get written + if (fOverflowSize != 0) + { + memcpy( fData, fOverflowBuf, fOverflowSize ); + + if (fOverflowBuf != NULL) + { + delete [] fOverflowBuf; + fOverflowBuf = NULL; + } + } + + readSize = fBufferSize - fOverflowSize; + if (readSize > (length - *parse_length)) + { + readSize = length - *parse_length; + } + memcpy(fData + fOverflowSize, input + *parse_length, readSize); + *parse_length += readSize; + + bool bEndOfData = false; + + if (length == *parse_length) + { + bEndOfData = true; + } + + if ( bEndOfData && // @bug 3516: Add '\n' if missing from last record + (fImportDataMode == IMPORT_DATA_TEXT) ) // Only applies to ascii mode + { + if ( (fOverflowSize > 0) | (readSize > 0) ) + { + if ( fData[ fOverflowSize + readSize - 1 ] != '\n' ) + { + // Should be safe to add byte to fData w/o risk of overflowing, + // since we hit EOF. That should mean fread() did not read all + // the bytes we requested, meaning we have room to add a byte. + fData[ fOverflowSize + readSize ] = '\n'; + readSize++; + } + } + } + + // Lazy allocation of fToken memory as needed + if (fTokens == 0) + { + resizeTokenArray(); + } + + if ((readSize > 0) || (fOverflowSize > 0)) + { + if (fOverflowBuf != NULL) + { + delete [] fOverflowBuf; + fOverflowBuf = NULL; + } + + fReadSize = readSize + fOverflowSize; + fStartRow = correctTotalRows; + fStartRowForLogging = totalReadRows; + + if (fImportDataMode == IMPORT_DATA_TEXT) + { + tokenize( columnsInfo, allowedErrCntThisCall ); + } + else + { + int rc = tokenizeBinary( columnsInfo, allowedErrCntThisCall, + bEndOfData ); + + if (rc != NO_ERROR) + return rc; + } + + // If we read a full buffer without hitting any new lines, then + // terminate import because row size is greater than read buffer size. + if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize)) + { + return ERR_BULK_ROW_FILL_BUFFER; + } + + totalReadRows += fTotalReadRowsForLog; + correctTotalRows += fTotalReadRows; + } + + return NO_ERROR; +} + //------------------------------------------------------------------------------ // Read the next set of rows from the input import file (for the specified // table), into "this" BulkLoadBuffer. diff --git a/writeengine/bulk/we_bulkloadbuffer.h b/writeengine/bulk/we_bulkloadbuffer.h index e5d93c3a1..6f2a431fa 100644 --- a/writeengine/bulk/we_bulkloadbuffer.h +++ b/writeengine/bulk/we_bulkloadbuffer.h @@ -265,6 +265,12 @@ public: */ bool tryAndLockColumn(const int& columnId, const int& id); + int fillFromMemory( + const BulkLoadBuffer& overFlowBufIn, + const char* input, size_t length, size_t *parse_length, RID& totalReadRows, + RID& correctTotalRows, const boost::ptr_vector& columnsInfo, + unsigned int allowedErrCntThisCall ); + /** @brief Read the table data into the buffer */ int fillFromFile(const BulkLoadBuffer& overFlowBufIn, diff --git a/writeengine/bulk/we_tableinfo.cpp b/writeengine/bulk/we_tableinfo.cpp index 0d93c12b7..1a9debf3e 100644 --- a/writeengine/bulk/we_tableinfo.cpp +++ b/writeengine/bulk/we_tableinfo.cpp @@ -36,6 +36,7 @@ #include // @bug 2099+ #include +#include #ifdef _MSC_VER #include #else @@ -65,7 +66,7 @@ const std::string ERR_FILE_SUFFIX = ".err"; // Job error file suffix const std::string BOLD_START = "\033[0;1m"; const std::string BOLD_STOP = "\033[0;39m"; } - + namespace WriteEngine { //------------------------------------------------------------------------------ @@ -92,7 +93,7 @@ void TableInfo::sleepMS(long ms) #endif } - + //------------------------------------------------------------------------------ // TableInfo constructor //------------------------------------------------------------------------------ @@ -117,6 +118,7 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID, fTimeZone("SYSTEM"), fTableLocked(false), fReadFromStdin(false), + fReadFromS3(false), fNullStringMode(false), fEnclosedByChar('\0'), fEscapeChar('\\'), @@ -154,7 +156,7 @@ TableInfo::~TableInfo() fBRMReporter.sendErrMsgToFile(fBRMRptFileName); freeProcessingBuffers(); } - + //------------------------------------------------------------------------------ // Frees up processing buffer memory. We don't reset fReadBufCount to 0, // because BulkLoad::lockColumnForParse() is calling getNumberOfBuffers() @@ -172,7 +174,7 @@ void TableInfo::freeProcessingBuffers() fColumns.clear(); fNumberOfColumns = 0; } - + //------------------------------------------------------------------------------ // Close any database column or dictionary store files left open for this table. // Under "normal" circumstances, there should be no files left open when we @@ -215,7 +217,7 @@ void TableInfo::closeOpenDbFiles() } } } - + //------------------------------------------------------------------------------ // Locks this table for reading to the specified thread (locker) "if" the table // has not yet been assigned to a read thread. @@ -235,7 +237,7 @@ bool TableInfo::lockForRead(const int& locker) return false; } - + //------------------------------------------------------------------------------ // Loop thru reading the import file(s) assigned to this TableInfo object. //------------------------------------------------------------------------------ @@ -395,9 +397,20 @@ int TableInfo::readTableData( ) // per input file. // validTotalRows is ongoing total of valid rows read for all files // pertaining to this DB table. - int readRc = fBuffers[readBufNo].fillFromFile( + int readRc; + if (fReadFromS3) + { + readRc = fBuffers[readBufNo].fillFromMemory( + fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength, &fS3ParseLength, + totalRowsPerInputFile, validTotalRows, fColumns, + allowedErrCntThisCall); + } + else + { + readRc = fBuffers[readBufNo].fillFromFile( fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile, validTotalRows, fColumns, allowedErrCntThisCall); + } if (readRc != NO_ERROR) { @@ -501,7 +514,7 @@ int TableInfo::readTableData( ) fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount; // bufferCount++; - if ( feof(fHandle) ) + if ( (fHandle && feof(fHandle)) || (fS3ReadLength == fS3ParseLength) ) { timeval readFinished; gettimeofday(&readFinished, NULL); @@ -517,6 +530,15 @@ int TableInfo::readTableData( ) //" seconds; bufferCount-"+Convertor::int2Str(bufferCount), MSGLVL_INFO2 ); } + else if(fReadFromS3) + { + fLog->logMsg( "Finished loading " + fTableName + " from S3" + + ", Time taken = " + Convertor::int2Str((int) + (readFinished.tv_sec - readStart.tv_sec)) + + " seconds", + //" seconds; bufferCount-"+Convertor::int2Str(bufferCount), + MSGLVL_INFO2 ); + } else { fLog->logMsg( "Finished reading file " + fFileName + @@ -574,7 +596,7 @@ int TableInfo::readTableData( ) return NO_ERROR; } - + //------------------------------------------------------------------------------ // writeErrorList() // errorRows - vector of row numbers and corresponding error messages @@ -612,7 +634,7 @@ void TableInfo::writeErrorList(const std::vector< std::pairsetTimeZone(fTimeZone); fBuffers.push_back(buffer); } + if (!fS3Key.empty()) + { + ms3_library_init(); + ms3 = ms3_init(fS3Key.c_str(), fS3Secret.c_str(), fS3Region.c_str(), fS3Host.c_str()); + if (!ms3) + { + ostringstream oss; + oss << "Error initiating S3 library"; + fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR ); + return ERR_FILE_OPEN; + } + } + return 0; } //------------------------------------------------------------------------------ @@ -1267,7 +1302,7 @@ void TableInfo::addColumn(ColumnInfo* info) fExtentStrAlloc.addColumn( info->column.mapOid, info->column.width ); } - + //------------------------------------------------------------------------------ // Open the file corresponding to fFileName so that we can import it's contents. // A buffer is also allocated and passed to setvbuf(). @@ -1300,6 +1335,27 @@ int TableInfo::openTableFile() fTableName << "..." << BOLD_STOP; fLog->logMsg( oss.str(), MSGLVL_INFO1 ); } + else if (fReadFromS3) + { + int res; + res = ms3_get(ms3, fS3Bucket.c_str(), fFileName.c_str(), (uint8_t**)&fFileBuffer, &fS3ReadLength); + fS3ParseLength = 0; + if (res) + { + ostringstream oss; + oss << "Error retrieving file " << fFileName << " from S3: "; + if (ms3_server_error(ms3)) + { + oss << ms3_server_error(ms3); + } + else + { + oss << ms3_error(res); + } + fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR ); + return ERR_FILE_OPEN; + } + } else { if (fImportDataMode == IMPORT_DATA_TEXT) @@ -1331,7 +1387,7 @@ int TableInfo::openTableFile() return NO_ERROR; } - + //------------------------------------------------------------------------------ // Close the current open file we have been importing. //------------------------------------------------------------------------------ @@ -1352,8 +1408,12 @@ void TableInfo::closeTableFile() fHandle = 0; } + else if (ms3) + { + ms3_free((uint8_t*)fFileBuffer); + } } - + //------------------------------------------------------------------------------ // "Grabs" the current read buffer for TableInfo so that the read thread that // is calling this function, can read the next buffer's set of data. @@ -1389,7 +1449,7 @@ bool TableInfo::isBufferAvailable(bool report) return false; } - + //------------------------------------------------------------------------------ // Report whether rows were rejected, and if so, then list them out into the // reject file. @@ -1422,7 +1482,14 @@ void TableInfo::writeBadRows( const std::vector* errorDatRows, } else { - rejectFileName << getFileName(); + if (fReadFromS3) + { + rejectFileName << basename(getFileName().c_str()); + } + else + { + rejectFileName << getFileName(); + } } rejectFileName << ".Job_" << fJobId << @@ -1494,7 +1561,7 @@ void TableInfo::writeBadRows( const std::vector* errorDatRows, } } } - + //------------------------------------------------------------------------------ // Report whether rows were rejected, and if so, then list out the row numbers // and error reasons into the error file. @@ -1528,7 +1595,14 @@ void TableInfo::writeErrReason( const std::vector< std::pair #include +#include + #include "we_type.h" #include "we_colop.h" #include "we_fileop.h" @@ -63,7 +65,7 @@ private: int fTableId; // Table id int fBufferSize; // Size of buffer used by BulkLoadBuffer - int fFileBufSize; // Size of fFileBuffer passed to setvbuf + size_t fFileBufSize; // Size of fFileBuffer passed to setvbuf // to read import files. Comes from // writeBufferSize tag in job xml file char fColDelim; // Used to delimit col values in a row @@ -133,6 +135,15 @@ private: volatile bool fTableLocked; // Do we have db table lock bool fReadFromStdin; // Read import file from STDIN + bool fReadFromS3; // Read import file from S3 + std::string fS3Host; // S3 hostname + std::string fS3Key; // S3 key + std::string fS3Secret; // S3 secret key + std::string fS3Bucket; // S3 bucket + std::string fS3Region; + ms3_st* ms3; // S3 object + size_t fS3ReadLength; + size_t fS3ParseLength; bool fNullStringMode; // Treat "NULL" as a null value char fEnclosedByChar; // Character to enclose col values char fEscapeChar; // Escape character used in conjunc- @@ -366,9 +377,9 @@ public: * @param fixedBinaryRecLen In binary mode, this is the fixed record length * used to read the buffer; in text mode, this value is not used. */ - void initializeBuffers(int noOfBuffers, - const JobFieldRefList& jobFieldRefList, - unsigned int fixedBinaryRecLen); + int initializeBuffers(int noOfBuffers, + const JobFieldRefList& jobFieldRefList, + unsigned int fixedBinaryRecLen); /** @brief Read the table data into the read buffer */ @@ -412,8 +423,10 @@ public: /** @brief set list of import files and STDIN usage flag */ - void setLoadFilesInput(bool bReadFromStdin, - const std::vector& files); + void setLoadFilesInput(bool bReadFromStdin, bool bReadFromS3, + const std::vector& files, const std::string& s3host, + const std::string &s3key, const std::string &s3secret, + const std::string &s3bucket, const std::string &s3region); /** @brief set job file name under process. */ @@ -609,11 +622,19 @@ inline void TableInfo::setJobId(int jobId) fJobId = jobId; } -inline void TableInfo::setLoadFilesInput(bool bReadFromStdin, - const std::vector& files) +inline void TableInfo::setLoadFilesInput(bool bReadFromStdin, bool bReadFromS3, + const std::vector& files, const std::string& s3host, + const std::string &s3key, const std::string &s3secret, + const std::string &s3bucket, const std::string &s3region) { fReadFromStdin = bReadFromStdin; + fReadFromS3 = bReadFromS3; fLoadFileList = files; + fS3Host = s3host; + fS3Key = s3key; + fS3Secret = s3secret; + fS3Bucket = s3bucket; + fS3Region = s3region; } inline void TableInfo::setMaxErrorRows(const unsigned int maxErrorRows) diff --git a/writeengine/splitter/we_cmdargs.cpp b/writeengine/splitter/we_cmdargs.cpp index 8c520924d..110f126a6 100644 --- a/writeengine/splitter/we_cmdargs.cpp +++ b/writeengine/splitter/we_cmdargs.cpp @@ -217,6 +217,21 @@ std::string WECmdArgs::getCpImportCmdLine() if (fbTruncationAsError) aSS << " -S "; + if (!fS3Key.empty()) + { + if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty()) + throw (runtime_error("Not all requried S3 options provided")); + aSS << " -y " << fS3Key; + aSS << " -K " << fS3Secret; + aSS << " -t " << fS3Bucket; + aSS << " -g " << fS3Region; + + if (!fS3Host.empty()) + { + aSS << " -H " << fS3Host; + } + } + if ((fJobId.length() > 0) && (fMode == 1) && (!fJobLogOnly)) { // if JobPath provided, make it w.r.t WES @@ -541,7 +556,12 @@ void WECmdArgs::usage() << "\t\t\t3 - input files will be loaded on the local PM.\n" << "\t-T\tTimezone used for TIMESTAMP datatype.\n" << "\t\tPossible values: \"SYSTEM\" (default)\n" - << "\t\t : Offset in the form +/-HH:MM\n"; + << "\t\t : Offset in the form +/-HH:MM\n" + << "\t-y\tS3 Authentication Key (for S3 imports)\n" + << "\t-K\tS3 Authentication Secret (for S3 imports)\n" + << "\t-t\tS3 Bucket (for S3 imports)\n" + << "\t-H\tS3 Hostname (for S3 imports, Amazon's S3 default)\n" + << "\t-g\tS3 Region (for S3 imports)\n"; cout << "\nExample1: Traditional usage\n" << "\tcpimport -j 1234"; @@ -580,7 +600,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) // fPrgmName = "/home/bpaul/genii/export/bin/cpimport"; while ((aCh = getopt(argc, argv, - "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:N")) + "d:j:w:s:v:l:r:b:e:B:f:q:ihm:E:C:P:I:n:p:c:ST:Ny:K:t:H:g:")) != EOF) { switch (aCh) @@ -885,6 +905,36 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) break; } + case 'y': //-y S3 Key + { + fS3Key = optarg; + break; + } + + case 'K': //-K S3 Secret + { + fS3Secret = optarg; + break; + } + + case 'H': //-H S3 Host + { + fS3Host = optarg; + break; + } + + case 't': //-t S3 bucket + { + fS3Bucket = optarg; + break; + } + + case 'g': //-g S3 Region + { + fS3Region = optarg; + break; + } + default: { std::string aErr = "Unknown command line option " + aCh; @@ -1028,7 +1078,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) { fPmFile = argv[optind]; - if (fPmFile.at(0) != '/') + if ((fPmFile.at(0) != '/') && (fS3Key.empty())) { std::string aTmp = fPmFile; fPmFile = bulkRootPath + "/data/import/" + aTmp; @@ -1044,7 +1094,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) else fPmFile = fLocFile; - if (fPmFile.at(0) != '/') //should be true all the time + if ((fPmFile.at(0) != '/') && (fS3Key.empty())) //should be true all the time { std::string aTmp = fPmFile; fPmFile = bulkRootPath + "/data/import/" + aTmp; @@ -1135,7 +1185,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv) if (2 == fArgMode) { //BUG 4342 - if (fPmFile.at(0) != '/') + if ((fPmFile.at(0) != '/') && (fS3Key.empty())) { std::string aTmp = fPmFile; fPmFile = PrepMode2ListOfFiles(aTmp); diff --git a/writeengine/splitter/we_cmdargs.h b/writeengine/splitter/we_cmdargs.h index a7eb8ed4f..d02600070 100644 --- a/writeengine/splitter/we_cmdargs.h +++ b/writeengine/splitter/we_cmdargs.h @@ -274,6 +274,11 @@ private: // variables for SplitterApp std::string fTmpFileDir;// Temp file directory. std::string fBulkRoot; // Bulk Root path std::string fJobFile; // Job File Name + std::string fS3Key; // S3 key + std::string fS3Secret; // S3 Secret + std::string fS3Bucket; // S3 Bucket + std::string fS3Host; // S3 Host + std::string fS3Region; // S3 Region unsigned int fBatchQty; // No. of batch Qty. int fNoOfReadThrds; // No. of read buffers