1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-11-27 08:21:15 +03:00

MCOL-3520: Fix cpimport S3 multi-PM imports

Impl'd mode-1 imports from S3 & fixed a few other minor things I noticed.
This commit is contained in:
Patrick LeBlanc
2020-02-06 18:09:52 -05:00
parent 65de8b8e63
commit 53afb3c71e
7 changed files with 117 additions and 94 deletions

View File

@@ -1,5 +1,5 @@
include_directories( ${ENGINE_COMMON_INCLUDES} ) include_directories( ${ENGINE_COMMON_INCLUDES} ${S3API_DIR} )
########### next target ############### ########### next target ###############
@@ -17,7 +17,7 @@ set(cpimport_SRCS
add_executable(cpimport ${cpimport_SRCS}) add_executable(cpimport ${cpimport_SRCS})
target_link_libraries(cpimport ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} batchloader threadpool) target_link_libraries(cpimport ${ENGINE_LDFLAGS} ${NETSNMP_LIBRARIES} ${MARIADB_CLIENT_LIBS} ${ENGINE_WRITE_LIBS} batchloader threadpool marias3)
install(TARGETS cpimport DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-platform) install(TARGETS cpimport DESTINATION ${ENGINE_BINDIR} COMPONENT columnstore-platform)

View File

@@ -217,10 +217,10 @@ std::string WECmdArgs::getCpImportCmdLine()
if (fbTruncationAsError) if (fbTruncationAsError)
aSS << " -S "; aSS << " -S ";
if (!fS3Key.empty()) if (!fS3Key.empty() && !(fMode == 0 || fMode == 1))
{ {
if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty()) if (fS3Secret.empty() || fS3Bucket.empty() || fS3Region.empty())
throw (runtime_error("Not all requried S3 options provided")); throw (runtime_error("Not all required S3 options provided"));
aSS << " -y " << fS3Key; aSS << " -y " << fS3Key;
aSS << " -K " << fS3Secret; aSS << " -K " << fS3Secret;
aSS << " -t " << fS3Bucket; aSS << " -t " << fS3Bucket;
@@ -433,7 +433,7 @@ bool WECmdArgs::checkForCornerCases()
"a fully qualified path for the remote file." "a fully qualified path for the remote file."
"\nTry 'cpimport -h' for more information.")); "\nTry 'cpimport -h' for more information."));
if (!fS3Key.empty()) if (!fS3Key.empty())
throw(runtime_error("Mode 2 & an input file on S3 does not make sense.")); throw(runtime_error("Mode 2 & an input file from S3 does not make sense."));
} }
if (fMode == 3) if (fMode == 3)
@@ -975,7 +975,7 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
} }
if (optind < argc) // see if input file name is given if (optind < argc) // see if input file name is given
{ {
// 3rd pos parm // 3rd pos parm
fLocFile = argv[optind]; fLocFile = argv[optind];
@@ -1235,6 +1235,12 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
throw (runtime_error("No schema or local filename specified.")); throw (runtime_error("No schema or local filename specified."));
} }
} }
/* check for all-or-nothing cmdline args to enable S3 import */
int s3Tmp = (fS3Key.empty() ? 0 : 1) + (fS3Bucket.empty() ? 0 : 1) +
(fS3Secret.empty() ? 0 : 1) + (fS3Region.empty() ? 0 : 1);
if (s3Tmp != 0 && s3Tmp != 4)
throw runtime_error("The access key, secret, bucket, and region are all required to import from S3");
} }
std::string WECmdArgs::getJobFileName() std::string WECmdArgs::getJobFileName()

View File

@@ -38,6 +38,9 @@ public:
WECmdArgs(int argc, char** argv); WECmdArgs(int argc, char** argv);
virtual ~WECmdArgs() {} virtual ~WECmdArgs() {}
typedef std::vector<unsigned int> VecInts;
typedef std::vector<std::string> VecArgs;
void appTestFunction(); void appTestFunction();
void parseCmdLineArgs(int argc, char** argv); void parseCmdLineArgs(int argc, char** argv);
std::string getCpImportCmdLine(); std::string getCpImportCmdLine();
@@ -268,9 +271,7 @@ public:
void getColumnList( std::set<std::string>& columnList ) const; void getColumnList( std::set<std::string>& columnList ) const;
private: // variables for SplitterApp private: // variables for SplitterApp
typedef std::vector<std::string> VecArgs;
VecArgs fVecArgs; VecArgs fVecArgs;
typedef std::vector<unsigned int> VecInts;
VecInts fPmVec; VecInts fPmVec;
VecArgs fVecJobFiles; //JobFiles splitter from master JobFile VecArgs fVecJobFiles; //JobFiles splitter from master JobFile

View File

@@ -30,6 +30,7 @@
#include "we_messages.h" #include "we_messages.h"
#include "we_sdhandler.h" #include "we_sdhandler.h"
#include "we_splitterapp.h"
#include <boost/thread/condition.hpp> #include <boost/thread/condition.hpp>
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
@@ -98,22 +99,9 @@ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh): fSdh(aSdh),
} }
fBuff = new char [fBuffSize]; fBuff = new char [fBuffSize];
/* Get S3 import params from fSdh.fRef, which is a ref to the we_splitterapp
*/
const WECmdArgs &args = fSdh.fRef.fCmdArgs; const WECmdArgs &args = fSdh.fRef.fCmdArgs;
initS3Connection(args); initS3Connection(args);
doS3Import = args.isS3Import();
if (doS3Import)
{
s3Key = args.getS3Key();
s3Secret = args.getS3Secret();
s3Bucket = args.getS3Bucket();
s3Region = args.getS3Region();
s3Host = args.getS3Host();
ms3_library_init();
connection = ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? NULL : s3Host.c_str()));)
}
} }
//WEFileReadThread::WEFileReadThread(const WEFileReadThread& rhs):fSdh(rhs.fSdh) //WEFileReadThread::WEFileReadThread(const WEFileReadThread& rhs):fSdh(rhs.fSdh)
@@ -140,8 +128,14 @@ WEFileReadThread::~WEFileReadThread()
if (doS3Import) if (doS3Import)
{ {
ms3_deinit(connection); ms3_deinit(s3Connection);
ms3_library_deinit(); ms3_library_deinit();
if (buf)
{
s3Stream.reset();
arrSource.reset();
free(buf);
}
} }
} }
@@ -162,6 +156,14 @@ void WEFileReadThread::reset()
fpThread = 0; fpThread = 0;
//cout << "WEFileReadThread destructor called" << endl; //cout << "WEFileReadThread destructor called" << endl;
this->setContinue(true); this->setContinue(true);
if (buf)
{
arrSource.reset();
s3Stream.reset();
free(buf);
buf = NULL;
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -221,37 +223,33 @@ bool WEFileReadThread::chkForListOfFiles(std::string& FileName)
std::string aFileName = FileName; std::string aFileName = FileName;
istringstream iss(aFileName); istringstream iss(aFileName);
ostringstream oss;
size_t start = 0, end = 0; size_t start = 0, end = 0;
const char* sep = " ,|"; const char* sep = " ,|";
ms3_status_st ms3status;
end = aFileName.find_first_of(sep);
do do
{ {
if (end != string::npos)
{
std::string aFile = aFileName.substr(start, end - start);
if (fSdh.getDebugLvl() > 2)
cout << "File: " << aFileName.substr(start, end - start) << endl;
start = end + 1;
fInfileList.push_back(aFile);
}
else
{
std::string aFile = aFileName.substr(start, end - start);
if (fSdh.getDebugLvl() > 1)
cout << "Next Input File " << aFileName.substr(start, end - start) << endl;
fInfileList.push_back(aFile);
break;
}
end = aFileName.find_first_of(sep, start); end = aFileName.find_first_of(sep, start);
std::string aFile = aFileName.substr(start, end - start);
if (aFile == "STDIN" || aFile == "stdin")
aFile = "/dev/stdin";
if (fSdh.getDebugLvl() > 1)
cout << "Next Input File " << aFile << endl;
if ((!doS3Import && access(aFile.c_str(), O_RDONLY) != 0) ||
(doS3Import && ms3_status(s3Connection, s3Bucket.c_str(),
aFile.c_str(), &ms3status) != 0))
{
oss << "Could not access " << aFile;
throw runtime_error(oss.str());
}
fInfileList.push_back(aFile);
start = end + 1;
} }
while (start != end); while (end != string::npos);
//cout << "Going out chkForListOfFiles("<< FileName << ")" << endl; //cout << "Going out chkForListOfFiles("<< FileName << ")" << endl;
@@ -288,6 +286,13 @@ void WEFileReadThread::shutdown()
//if(fInFile.is_open()) fInFile.close(); //@BUG 4326 //if(fInFile.is_open()) fInFile.close(); //@BUG 4326
if (fIfFile.is_open()) fIfFile.close(); if (fIfFile.is_open()) fIfFile.close();
if (buf)
{
s3Stream.reset();
arrSource.reset();
free(buf);
buf = NULL;
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -474,21 +479,46 @@ void WEFileReadThread::openInFile()
{ {
/* If an S3 transfer /* If an S3 transfer
use ms3 lib to d/l data into mem use ms3 lib to d/l data into mem
use boost::iostreams to wrap the mem in an fstream interface use boost::iostreams to wrap the mem in a stream interface
assign fiffile and/or finfile to it? point infile's stream buffer to it.
example (change file_destriptor_source to array_source)
FILE* pipe = popen("find . -type f", "r");
io::stream_buffer<io::file_descriptor_source> fpstream(fileno(pipe));
std::istream in(&fpstream);
*/ */
if (fSdh.getDebugLvl()) cout << "Input FileName: " << fInFileName << endl; if (fSdh.getDebugLvl()) cout << "Input Filename: " << fInFileName << endl;
WORKING HERE if (doS3Import)
{
size_t bufLen = 0;
if (buf)
{
s3Stream.reset();
arrSource.reset();
free(buf);
buf = NULL;
}
if (fSdh.getDebugLvl())
cout << "Downloading " << fInFileName << endl;
int err = ms3_get(s3Connection, s3Bucket.c_str(), fInFileName.c_str(),
&buf, &bufLen);
if (fSdh.getDebugLvl())
cout << "Download complete." << endl;
if (err)
{
ostringstream os;
if (ms3_server_error(s3Connection))
os << "Download of '" << fInFileName << "' failed. Error from the server: "
<< ms3_server_error(s3Connection);
else
os << "Download of '" << fInFileName << "' failed. Got '" << ms3_error(err)
<< "'.";
throw runtime_error(os.str());
}
arrSource.reset(new boost::iostreams::array_source((char *) buf, bufLen));
s3Stream.reset(new boost::iostreams::stream<boost::iostreams::array_source>(*arrSource));
fInFile.rdbuf(s3Stream->rdbuf());
}
if (fInFileName == "/dev/stdin") else if (fInFileName == "/dev/stdin")
{ {
char aDefCon[16], aGreenCol[16]; char aDefCon[16], aGreenCol[16];
snprintf(aDefCon, sizeof(aDefCon), "\033[0m"); snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
@@ -498,10 +528,11 @@ void WEFileReadThread::openInFile()
cout << aGreenCol cout << aGreenCol
<< "trying to read from STDIN... " << "trying to read from STDIN... "
<< aDefCon << endl; << aDefCon << endl;
fInFile.rdbuf(cin.rdbuf());
} }
//@BUG 4326 //@BUG 4326
if (fInFileName != "/dev/stdin") else if (fInFileName != "/dev/stdin")
{ {
if (!fIfFile.is_open()) if (!fIfFile.is_open())
{ {
@@ -625,7 +656,7 @@ int WEFileReadThread::getNextRow(istream& ifs, char* pBuf, int MaxLen)
return pEnd - pBuf; return pEnd - pBuf;
} }
void WEFileReadThread::initS3Connection(const WE_CmdArgs &args) void WEFileReadThread::initS3Connection(const WECmdArgs &args)
{ {
doS3Import = args.isS3Import(); doS3Import = args.isS3Import();
if (doS3Import) if (doS3Import)
@@ -636,10 +667,13 @@ void WEFileReadThread::initS3Connection(const WE_CmdArgs &args)
s3Region = args.getS3Region(); s3Region = args.getS3Region();
s3Host = args.getS3Host(); s3Host = args.getS3Host();
ms3_library_init(); ms3_library_init();
connection = ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? NULL : s3Host.c_str()));) s3Connection = ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? NULL : s3Host.c_str()));
if (!connection) if (!s3Connection)
throw runtime_error("WEFileReadThread::initS3Connection(): Failed to init S3 connection"); throw runtime_error("failed to get an S3 connection");
} }
else
s3Connection = NULL;
buf = NULL;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -30,7 +30,10 @@
#ifndef WE_FILEREADTHREAD_H_ #ifndef WE_FILEREADTHREAD_H_
#define WE_FILEREADTHREAD_H_ #define WE_FILEREADTHREAD_H_
#include "we_cmdargs.h"
#include "libmarias3/marias3.h" #include "libmarias3/marias3.h"
#include <boost/iostreams/device/array.hpp>
#include <boost/iostreams/stream.hpp>
namespace WriteEngine namespace WriteEngine
{ {
@@ -164,7 +167,10 @@ private:
std::string s3Bucket; std::string s3Bucket;
std::string s3Region; std::string s3Region;
std::string s3Host; std::string s3Host;
ms3_st *s3connection; ms3_st *s3Connection;
uint8_t *buf;
std::unique_ptr<boost::iostreams::array_source> arrSource;
std::unique_ptr<boost::iostreams::stream<boost::iostreams::array_source> > s3Stream;
}; };
} /* namespace WriteEngine */ } /* namespace WriteEngine */

View File

@@ -501,14 +501,9 @@ void WESDHandler::setup()
//fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), false); //fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), false);
fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), getConsoleLog()); fLog.setLogFileName(aLogName.c_str(), aErrLogName.c_str(), getConsoleLog());
// In mode 0 and Mode 1, we need to check for local file availability // In mode 0 and Mode 1, we need to construct the input file list and check availability
if ((0 == fRef.fCmdArgs.getMode()) || (1 == fRef.fCmdArgs.getMode())) if (0 == fRef.fCmdArgs.getMode() || 1 == fRef.fCmdArgs.getMode())
{ setInputFileList(fRef.getLocFile());
if (!check4InputFile(fRef.getLocFile()))
{
throw (runtime_error("Could not open Input file " + fRef.getLocFile()));
}
}
fImportRslt.startTimer(); fImportRslt.startTimer();
@@ -2726,28 +2721,9 @@ std::string WESDHandler::getTime2Str() const
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
bool WESDHandler::check4InputFile(std::string InFileName) void WESDHandler::setInputFileList(std::string InFileName)
{ {
bool aRet = false; fFileReadThread.chkForListOfFiles(InFileName);
if ((0 == InFileName.compare("STDIN")) || (0 == InFileName.compare("stdin")))
{
fFileReadThread.add2InputDataFileList(InFileName);
return true;
}
else
{
//BUG 4342 - Need to support "list of infiles"
fFileReadThread.chkForListOfFiles(InFileName);
std::string aFileName = fFileReadThread.getNextInputDataFile();
std::ifstream aFile(aFileName.c_str());
aRet = (aFile.good()) ? true : false;
// add back to list, which we pop_front for checking the file.
if (aRet) fFileReadThread.add2InputDataFileList(aFileName);
}
return aRet;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -111,7 +111,7 @@ public:
bool releaseTableLocks(); bool releaseTableLocks();
void check4CpiInvokeMode(); void check4CpiInvokeMode();
bool check4PmArguments(); bool check4PmArguments();
bool check4InputFile(std::string InFileName); void setInputFileList(std::string InFileName);
bool check4CriticalErrMsgs(std::string& Entry); bool check4CriticalErrMsgs(std::string& Entry);
void onStartCpiResponse(int PmId); void onStartCpiResponse(int PmId);