You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
Merge pull request #1031 from pleblanc1976/we-splitter-read-from-s3
MCOL-3520: Fix importing files from S3 for mode 1 imports.
This commit is contained in:
committed by
Patrick LeBlanc
parent
f71158601e
commit
3faa1600c3
@ -30,6 +30,7 @@
|
||||
|
||||
#include "we_messages.h"
|
||||
#include "we_sdhandler.h"
|
||||
#include "we_splitterapp.h"
|
||||
|
||||
#include <boost/thread/condition.hpp>
|
||||
#include <boost/scoped_array.hpp>
|
||||
@ -99,6 +100,8 @@ WEFileReadThread::WEFileReadThread(WESDHandler& aSdh): fSdh(aSdh),
|
||||
|
||||
fBuff = new char [fBuffSize];
|
||||
|
||||
const WECmdArgs &args = fSdh.fRef.fCmdArgs;
|
||||
initS3Connection(args);
|
||||
}
|
||||
|
||||
//WEFileReadThread::WEFileReadThread(const WEFileReadThread& rhs):fSdh(rhs.fSdh)
|
||||
@ -122,6 +125,18 @@ WEFileReadThread::~WEFileReadThread()
|
||||
fpThread = 0;
|
||||
delete []fBuff;
|
||||
//cout << "WEFileReadThread destructor called" << endl;
|
||||
|
||||
if (doS3Import)
|
||||
{
|
||||
ms3_deinit(s3Connection);
|
||||
ms3_library_deinit();
|
||||
if (buf)
|
||||
{
|
||||
s3Stream.reset();
|
||||
arrSource.reset();
|
||||
free(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -141,6 +156,14 @@ void WEFileReadThread::reset()
|
||||
fpThread = 0;
|
||||
//cout << "WEFileReadThread destructor called" << endl;
|
||||
this->setContinue(true);
|
||||
|
||||
if (buf)
|
||||
{
|
||||
arrSource.reset();
|
||||
s3Stream.reset();
|
||||
free(buf);
|
||||
buf = NULL;
|
||||
}
|
||||
}
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@ -200,37 +223,33 @@ bool WEFileReadThread::chkForListOfFiles(std::string& FileName)
|
||||
std::string aFileName = FileName;
|
||||
|
||||
istringstream iss(aFileName);
|
||||
ostringstream oss;
|
||||
size_t start = 0, end = 0;
|
||||
const char* sep = " ,|";
|
||||
|
||||
end = aFileName.find_first_of(sep);
|
||||
ms3_status_st ms3status;
|
||||
|
||||
do
|
||||
{
|
||||
if (end != string::npos)
|
||||
{
|
||||
std::string aFile = aFileName.substr(start, end - start);
|
||||
|
||||
if (fSdh.getDebugLvl() > 2)
|
||||
cout << "File: " << aFileName.substr(start, end - start) << endl;
|
||||
|
||||
start = end + 1;
|
||||
fInfileList.push_back(aFile);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string aFile = aFileName.substr(start, end - start);
|
||||
|
||||
if (fSdh.getDebugLvl() > 1)
|
||||
cout << "Next Input File " << aFileName.substr(start, end - start) << endl;
|
||||
|
||||
fInfileList.push_back(aFile);
|
||||
break;
|
||||
}
|
||||
|
||||
end = aFileName.find_first_of(sep, start);
|
||||
std::string aFile = aFileName.substr(start, end - start);
|
||||
if (aFile == "STDIN" || aFile == "stdin")
|
||||
aFile = "/dev/stdin";
|
||||
|
||||
if (fSdh.getDebugLvl() > 1)
|
||||
cout << "Next Input File " << aFile << endl;
|
||||
|
||||
if ((!doS3Import && access(aFile.c_str(), O_RDONLY) != 0) ||
|
||||
(doS3Import && ms3_status(s3Connection, s3Bucket.c_str(),
|
||||
aFile.c_str(), &ms3status) != 0))
|
||||
{
|
||||
oss << "Could not access " << aFile;
|
||||
throw runtime_error(oss.str());
|
||||
}
|
||||
|
||||
fInfileList.push_back(aFile);
|
||||
start = end + 1;
|
||||
}
|
||||
while (start != end);
|
||||
while (end != string::npos);
|
||||
|
||||
//cout << "Going out chkForListOfFiles("<< FileName << ")" << endl;
|
||||
|
||||
@ -267,6 +286,13 @@ void WEFileReadThread::shutdown()
|
||||
|
||||
//if(fInFile.is_open()) fInFile.close(); //@BUG 4326
|
||||
if (fIfFile.is_open()) fIfFile.close();
|
||||
if (buf)
|
||||
{
|
||||
s3Stream.reset();
|
||||
arrSource.reset();
|
||||
free(buf);
|
||||
buf = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -451,9 +477,48 @@ void WEFileReadThread::openInFile()
|
||||
{
|
||||
try
|
||||
{
|
||||
if (fSdh.getDebugLvl()) cout << "Input FileName: " << fInFileName << endl;
|
||||
/* If an S3 transfer
|
||||
use ms3 lib to d/l data into mem
|
||||
use boost::iostreams to wrap the mem in a stream interface
|
||||
point infile's stream buffer to it.
|
||||
*/
|
||||
|
||||
if (fSdh.getDebugLvl()) cout << "Input Filename: " << fInFileName << endl;
|
||||
|
||||
if (fInFileName == "/dev/stdin")
|
||||
if (doS3Import)
|
||||
{
|
||||
size_t bufLen = 0;
|
||||
if (buf)
|
||||
{
|
||||
s3Stream.reset();
|
||||
arrSource.reset();
|
||||
free(buf);
|
||||
buf = NULL;
|
||||
}
|
||||
if (fSdh.getDebugLvl())
|
||||
cout << "Downloading " << fInFileName << endl;
|
||||
int err = ms3_get(s3Connection, s3Bucket.c_str(), fInFileName.c_str(),
|
||||
&buf, &bufLen);
|
||||
if (fSdh.getDebugLvl())
|
||||
cout << "Download complete." << endl;
|
||||
if (err)
|
||||
{
|
||||
ostringstream os;
|
||||
if (ms3_server_error(s3Connection))
|
||||
os << "Download of '" << fInFileName << "' failed. Error from the server: "
|
||||
<< ms3_server_error(s3Connection);
|
||||
else
|
||||
os << "Download of '" << fInFileName << "' failed. Got '" << ms3_error(err)
|
||||
<< "'.";
|
||||
throw runtime_error(os.str());
|
||||
}
|
||||
|
||||
arrSource.reset(new boost::iostreams::array_source((char *) buf, bufLen));
|
||||
s3Stream.reset(new boost::iostreams::stream<boost::iostreams::array_source>(*arrSource));
|
||||
fInFile.rdbuf(s3Stream->rdbuf());
|
||||
}
|
||||
|
||||
else if (fInFileName == "/dev/stdin")
|
||||
{
|
||||
char aDefCon[16], aGreenCol[16];
|
||||
snprintf(aDefCon, sizeof(aDefCon), "\033[0m");
|
||||
@ -463,12 +528,11 @@ void WEFileReadThread::openInFile()
|
||||
cout << aGreenCol
|
||||
<< "trying to read from STDIN... "
|
||||
<< aDefCon << endl;
|
||||
fInFile.rdbuf(cin.rdbuf());
|
||||
}
|
||||
|
||||
cout.flush();
|
||||
|
||||
//@BUG 4326
|
||||
if (fInFileName != "/dev/stdin")
|
||||
else if (fInFileName != "/dev/stdin")
|
||||
{
|
||||
if (!fIfFile.is_open())
|
||||
{
|
||||
@ -592,6 +656,26 @@ int WEFileReadThread::getNextRow(istream& ifs, char* pBuf, int MaxLen)
|
||||
return pEnd - pBuf;
|
||||
}
|
||||
|
||||
void WEFileReadThread::initS3Connection(const WECmdArgs &args)
|
||||
{
|
||||
doS3Import = args.isS3Import();
|
||||
if (doS3Import)
|
||||
{
|
||||
s3Key = args.getS3Key();
|
||||
s3Secret = args.getS3Secret();
|
||||
s3Bucket = args.getS3Bucket();
|
||||
s3Region = args.getS3Region();
|
||||
s3Host = args.getS3Host();
|
||||
ms3_library_init();
|
||||
s3Connection = ms3_init(s3Key.c_str(), s3Secret.c_str(), s3Region.c_str(), (s3Host.empty() ? NULL : s3Host.c_str()));
|
||||
if (!s3Connection)
|
||||
throw runtime_error("failed to get an S3 connection");
|
||||
}
|
||||
else
|
||||
s3Connection = NULL;
|
||||
buf = NULL;
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user