You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-3514 Add support for S3 to cpimport
cpimport now has the ability to use libmarias3 to read an object from an S3 bucket instead of a file on local disk. This also moves libmarias3 to utils/libmarias3.
This commit is contained in:
@ -36,6 +36,7 @@
|
||||
#include <utility>
|
||||
// @bug 2099+
|
||||
#include <iostream>
|
||||
#include <libmarias3/marias3.h>
|
||||
#ifdef _MSC_VER
|
||||
#include <stdlib.h>
|
||||
#else
|
||||
@ -65,7 +66,7 @@ const std::string ERR_FILE_SUFFIX = ".err"; // Job error file suffix
|
||||
const std::string BOLD_START = "\033[0;1m";
|
||||
const std::string BOLD_STOP = "\033[0;39m";
|
||||
}
|
||||
|
||||
|
||||
namespace WriteEngine
|
||||
{
|
||||
//------------------------------------------------------------------------------
|
||||
@ -92,7 +93,7 @@ void TableInfo::sleepMS(long ms)
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// TableInfo constructor
|
||||
//------------------------------------------------------------------------------
|
||||
@ -117,6 +118,7 @@ TableInfo::TableInfo(Log* logger, const BRM::TxnID txnID,
|
||||
fTimeZone("SYSTEM"),
|
||||
fTableLocked(false),
|
||||
fReadFromStdin(false),
|
||||
fReadFromS3(false),
|
||||
fNullStringMode(false),
|
||||
fEnclosedByChar('\0'),
|
||||
fEscapeChar('\\'),
|
||||
@ -154,7 +156,7 @@ TableInfo::~TableInfo()
|
||||
fBRMReporter.sendErrMsgToFile(fBRMRptFileName);
|
||||
freeProcessingBuffers();
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Frees up processing buffer memory. We don't reset fReadBufCount to 0,
|
||||
// because BulkLoad::lockColumnForParse() is calling getNumberOfBuffers()
|
||||
@ -172,7 +174,7 @@ void TableInfo::freeProcessingBuffers()
|
||||
fColumns.clear();
|
||||
fNumberOfColumns = 0;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Close any database column or dictionary store files left open for this table.
|
||||
// Under "normal" circumstances, there should be no files left open when we
|
||||
@ -215,7 +217,7 @@ void TableInfo::closeOpenDbFiles()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Locks this table for reading to the specified thread (locker) "if" the table
|
||||
// has not yet been assigned to a read thread.
|
||||
@ -235,7 +237,7 @@ bool TableInfo::lockForRead(const int& locker)
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Loop thru reading the import file(s) assigned to this TableInfo object.
|
||||
//------------------------------------------------------------------------------
|
||||
@ -395,9 +397,20 @@ int TableInfo::readTableData( )
|
||||
// per input file.
|
||||
// validTotalRows is ongoing total of valid rows read for all files
|
||||
// pertaining to this DB table.
|
||||
int readRc = fBuffers[readBufNo].fillFromFile(
|
||||
int readRc;
|
||||
if (fReadFromS3)
|
||||
{
|
||||
readRc = fBuffers[readBufNo].fillFromMemory(
|
||||
fBuffers[prevReadBuf], fFileBuffer, fS3ReadLength, &fS3ParseLength,
|
||||
totalRowsPerInputFile, validTotalRows, fColumns,
|
||||
allowedErrCntThisCall);
|
||||
}
|
||||
else
|
||||
{
|
||||
readRc = fBuffers[readBufNo].fillFromFile(
|
||||
fBuffers[prevReadBuf], fHandle, totalRowsPerInputFile,
|
||||
validTotalRows, fColumns, allowedErrCntThisCall);
|
||||
}
|
||||
|
||||
if (readRc != NO_ERROR)
|
||||
{
|
||||
@ -501,7 +514,7 @@ int TableInfo::readTableData( )
|
||||
fCurrentReadBuffer = (fCurrentReadBuffer + 1) % fReadBufCount;
|
||||
|
||||
// bufferCount++;
|
||||
if ( feof(fHandle) )
|
||||
if ( (fHandle && feof(fHandle)) || (fS3ReadLength == fS3ParseLength) )
|
||||
{
|
||||
timeval readFinished;
|
||||
gettimeofday(&readFinished, NULL);
|
||||
@ -517,6 +530,15 @@ int TableInfo::readTableData( )
|
||||
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
|
||||
MSGLVL_INFO2 );
|
||||
}
|
||||
else if(fReadFromS3)
|
||||
{
|
||||
fLog->logMsg( "Finished loading " + fTableName + " from S3" +
|
||||
", Time taken = " + Convertor::int2Str((int)
|
||||
(readFinished.tv_sec - readStart.tv_sec)) +
|
||||
" seconds",
|
||||
//" seconds; bufferCount-"+Convertor::int2Str(bufferCount),
|
||||
MSGLVL_INFO2 );
|
||||
}
|
||||
else
|
||||
{
|
||||
fLog->logMsg( "Finished reading file " + fFileName +
|
||||
@ -574,7 +596,7 @@ int TableInfo::readTableData( )
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// writeErrorList()
|
||||
// errorRows - vector of row numbers and corresponding error messages
|
||||
@ -612,7 +634,7 @@ void TableInfo::writeErrorList(const std::vector< std::pair<RID,
|
||||
fTotalErrRows += errorRowsCount;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Parse the specified column (columnId) in the specified buffer (bufferId).
|
||||
//------------------------------------------------------------------------------
|
||||
@ -634,7 +656,7 @@ int TableInfo::parseColumn(const int& columnId, const int& bufferId,
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Mark the specified column (columnId) in the specified buffer (bufferId) as
|
||||
// PARSE_COMPLETE. If this is the last column to be parsed for this buffer,
|
||||
@ -935,7 +957,7 @@ int TableInfo::setParseComplete(const int& columnId,
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Report summary totals to applicable destination (stdout, cpimport.bin log
|
||||
// file, BRMReport file (for mode1) etc).
|
||||
@ -1034,7 +1056,7 @@ void TableInfo::reportTotals(double elapsedTime)
|
||||
(fTotalReadRows - fTotalErrRows),
|
||||
satCounts );
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Report BRM updates to a report file or to BRM directly.
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1076,7 +1098,7 @@ void TableInfo::setParseError( )
|
||||
boost::mutex::scoped_lock lock(fSyncUpdatesTI);
|
||||
fStatusTI = WriteEngine::ERR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Locks a column from the specified buffer (bufferId) for the specified parse
|
||||
// thread (id); and returns the column id. A return value of -1 means no
|
||||
@ -1181,7 +1203,7 @@ int TableInfo::getColumnForParse(const int& id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Check if the specified buffer is ready for parsing (status == READ_COMPLETE)
|
||||
// @bug 2099. Temporary hack to diagnose deadlock. Added report parm
|
||||
@ -1212,16 +1234,16 @@ bool TableInfo::bufferReadyForParse(const int& bufferId, bool report) const
|
||||
|
||||
return (stat == WriteEngine::READ_COMPLETE) ? true : false;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Create the specified number (noOfBuffer) of BulkLoadBuffer objects and store
|
||||
// them in fBuffers. jobFieldRefList lists the fields in this import.
|
||||
// fixedBinaryRecLen is fixed record length for binary imports (it is n/a
|
||||
// for text bulk loads).
|
||||
//------------------------------------------------------------------------------
|
||||
void TableInfo::initializeBuffers(int noOfBuffers,
|
||||
const JobFieldRefList& jobFieldRefList,
|
||||
unsigned int fixedBinaryRecLen)
|
||||
int TableInfo::initializeBuffers(int noOfBuffers,
|
||||
const JobFieldRefList& jobFieldRefList,
|
||||
unsigned int fixedBinaryRecLen)
|
||||
{
|
||||
#ifdef _MSC_VER
|
||||
|
||||
@ -1254,6 +1276,19 @@ void TableInfo::initializeBuffers(int noOfBuffers,
|
||||
buffer->setTimeZone(fTimeZone);
|
||||
fBuffers.push_back(buffer);
|
||||
}
|
||||
if (!fS3Key.empty())
|
||||
{
|
||||
ms3_library_init();
|
||||
ms3 = ms3_init(fS3Key.c_str(), fS3Secret.c_str(), fS3Region.c_str(), fS3Host.c_str());
|
||||
if (!ms3)
|
||||
{
|
||||
ostringstream oss;
|
||||
oss << "Error initiating S3 library";
|
||||
fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
|
||||
return ERR_FILE_OPEN;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1267,7 +1302,7 @@ void TableInfo::addColumn(ColumnInfo* info)
|
||||
fExtentStrAlloc.addColumn( info->column.mapOid,
|
||||
info->column.width );
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Open the file corresponding to fFileName so that we can import it's contents.
|
||||
// A buffer is also allocated and passed to setvbuf().
|
||||
@ -1300,6 +1335,27 @@ int TableInfo::openTableFile()
|
||||
fTableName << "..." << BOLD_STOP;
|
||||
fLog->logMsg( oss.str(), MSGLVL_INFO1 );
|
||||
}
|
||||
else if (fReadFromS3)
|
||||
{
|
||||
int res;
|
||||
res = ms3_get(ms3, fS3Bucket.c_str(), fFileName.c_str(), (uint8_t**)&fFileBuffer, &fS3ReadLength);
|
||||
fS3ParseLength = 0;
|
||||
if (res)
|
||||
{
|
||||
ostringstream oss;
|
||||
oss << "Error retrieving file " << fFileName << " from S3: ";
|
||||
if (ms3_server_error(ms3))
|
||||
{
|
||||
oss << ms3_server_error(ms3);
|
||||
}
|
||||
else
|
||||
{
|
||||
oss << ms3_error(res);
|
||||
}
|
||||
fLog->logMsg( oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR );
|
||||
return ERR_FILE_OPEN;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (fImportDataMode == IMPORT_DATA_TEXT)
|
||||
@ -1331,7 +1387,7 @@ int TableInfo::openTableFile()
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Close the current open file we have been importing.
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1352,8 +1408,12 @@ void TableInfo::closeTableFile()
|
||||
|
||||
fHandle = 0;
|
||||
}
|
||||
else if (ms3)
|
||||
{
|
||||
ms3_free((uint8_t*)fFileBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// "Grabs" the current read buffer for TableInfo so that the read thread that
|
||||
// is calling this function, can read the next buffer's set of data.
|
||||
@ -1389,7 +1449,7 @@ bool TableInfo::isBufferAvailable(bool report)
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Report whether rows were rejected, and if so, then list them out into the
|
||||
// reject file.
|
||||
@ -1422,7 +1482,14 @@ void TableInfo::writeBadRows( const std::vector<std::string>* errorDatRows,
|
||||
}
|
||||
else
|
||||
{
|
||||
rejectFileName << getFileName();
|
||||
if (fReadFromS3)
|
||||
{
|
||||
rejectFileName << basename(getFileName().c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
rejectFileName << getFileName();
|
||||
}
|
||||
}
|
||||
|
||||
rejectFileName << ".Job_" << fJobId <<
|
||||
@ -1494,7 +1561,7 @@ void TableInfo::writeBadRows( const std::vector<std::string>* errorDatRows,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Report whether rows were rejected, and if so, then list out the row numbers
|
||||
// and error reasons into the error file.
|
||||
@ -1528,7 +1595,14 @@ void TableInfo::writeErrReason( const std::vector< std::pair<RID,
|
||||
}
|
||||
else
|
||||
{
|
||||
errFileName << getFileName();
|
||||
if (fReadFromS3)
|
||||
{
|
||||
errFileName << basename(getFileName().c_str());
|
||||
}
|
||||
else
|
||||
{
|
||||
errFileName << getFileName();
|
||||
}
|
||||
}
|
||||
|
||||
errFileName << ".Job_" << fJobId <<
|
||||
@ -1601,7 +1675,7 @@ void TableInfo::writeErrReason( const std::vector< std::pair<RID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Logs "Bulkload |Job" message along with the specified message text
|
||||
// (messageText) to the critical log.
|
||||
@ -1621,7 +1695,7 @@ void TableInfo::logToDataMods(const string& jobFile, const string& messageText)
|
||||
m.format(args);
|
||||
messageLog.logInfoMessage(m);
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Acquires DB table lock for this TableInfo object.
|
||||
// Function employs retry logic based on the SystemConfig/WaitPeriod.
|
||||
@ -1732,7 +1806,7 @@ int TableInfo::acquireTableLock( bool disableTimeOut )
|
||||
|
||||
return ERR_TBLLOCK_GET_LOCK_LOCKED;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Change table lock state (to cleanup)
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1792,7 +1866,7 @@ int TableInfo::changeTableLockState( )
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Releases DB table lock assigned to this TableInfo object.
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1860,7 +1934,7 @@ int TableInfo::releaseTableLock( )
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Delete bulk rollback metadata file.
|
||||
//------------------------------------------------------------------------------
|
||||
@ -1890,7 +1964,7 @@ void TableInfo::deleteMetaDataRollbackFile( )
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Changes to "existing" DB files must be confirmed on HDFS system.
|
||||
// This function triggers this action.
|
||||
@ -1924,7 +1998,7 @@ int TableInfo::confirmDBFileChanges( )
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Temporary swap files must be deleted on HDFS system.
|
||||
// This function triggers this action.
|
||||
@ -1962,7 +2036,7 @@ void TableInfo::deleteTempDBFileChanges( )
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Validates the correctness of the current HWMs for this table.
|
||||
// The HWMs for all the 1 byte columns should be identical. Same goes
|
||||
@ -2273,7 +2347,7 @@ errorCheck:
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// DESCRIPTION:
|
||||
// Initialize the bulk rollback metadata writer for this table.
|
||||
@ -2297,7 +2371,7 @@ int TableInfo::initBulkRollbackMetaData( )
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// DESCRIPTION:
|
||||
// Saves snapshot of extentmap into a bulk rollback meta data file, for
|
||||
@ -2369,7 +2443,7 @@ int TableInfo::saveBulkRollbackMetaData( Job& job,
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Synchronize system catalog auto-increment next value with BRM.
|
||||
// This function is called at the end of normal processing to get the system
|
||||
@ -2399,7 +2473,7 @@ int TableInfo::synchronizeAutoInc( )
|
||||
|
||||
return NO_ERROR;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Rollback changes made to "this" table by the current import job, delete the
|
||||
// meta-data files, and release the table lock. This function only applies to
|
||||
@ -2484,7 +2558,7 @@ int TableInfo::rollbackWork( )
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Allocate extent from BRM (through the stripe allocator).
|
||||
//------------------------------------------------------------------------------
|
||||
|
Reference in New Issue
Block a user