You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
feature(cpimport): MCOL-5164 ignore all errors (-e all
)
This commit is contained in:
committed by
Leonid Fedorov
parent
7dca1da8f2
commit
1ce46b5e0b
@ -0,0 +1,51 @@
|
|||||||
|
DROP DATABASE IF EXISTS mcol5164;
|
||||||
|
CREATE DATABASE mcol5164;
|
||||||
|
USE mcol5164;
|
||||||
|
CREATE TABLE t1(col1 INT, col2 VARCHAR(64)) ENGINE=Columnstore;
|
||||||
|
SELECT * FROM t1;
|
||||||
|
col1 col2
|
||||||
|
Rejected rows:
|
||||||
|
0,test0,wrong
|
||||||
|
1,test1,wrong
|
||||||
|
2,test2,wrong
|
||||||
|
3,test3,wrong
|
||||||
|
4,test4,wrong
|
||||||
|
5,test5,wrong
|
||||||
|
6,test6,wrong
|
||||||
|
7,test7,wrong
|
||||||
|
8,test8,wrong
|
||||||
|
9,test9,wrong
|
||||||
|
10,test10,wrong
|
||||||
|
TRUNCATE t1;
|
||||||
|
SELECT * FROM t1;
|
||||||
|
col1 col2
|
||||||
|
11 test11-good
|
||||||
|
Rejected rows:
|
||||||
|
0,test0,wrong
|
||||||
|
1,test1,wrong
|
||||||
|
2,test2,wrong
|
||||||
|
3,test3,wrong
|
||||||
|
4,test4,wrong
|
||||||
|
5,test5,wrong
|
||||||
|
6,test6,wrong
|
||||||
|
7,test7,wrong
|
||||||
|
8,test8,wrong
|
||||||
|
9,test9,wrong
|
||||||
|
10,test10,wrong
|
||||||
|
TRUNCATE t1;
|
||||||
|
SELECT * FROM t1;
|
||||||
|
col1 col2
|
||||||
|
11 test11-good
|
||||||
|
Rejected rows:
|
||||||
|
0,test0,wrong
|
||||||
|
1,test1,wrong
|
||||||
|
2,test2,wrong
|
||||||
|
3,test3,wrong
|
||||||
|
4,test4,wrong
|
||||||
|
5,test5,wrong
|
||||||
|
6,test6,wrong
|
||||||
|
7,test7,wrong
|
||||||
|
8,test8,wrong
|
||||||
|
9,test9,wrong
|
||||||
|
10,test10,wrong
|
||||||
|
DROP DATABASE mcol5164;
|
@ -0,0 +1,52 @@
|
|||||||
|
if (!$MYSQL_TEST_ROOT){
|
||||||
|
skip Should be run by root to execute cpimport;
|
||||||
|
}
|
||||||
|
|
||||||
|
-- source ../include/have_columnstore.inc
|
||||||
|
|
||||||
|
--disable_warnings
|
||||||
|
DROP DATABASE IF EXISTS mcol5164;
|
||||||
|
--enable_warnings
|
||||||
|
|
||||||
|
CREATE DATABASE mcol5164;
|
||||||
|
USE mcol5164;
|
||||||
|
|
||||||
|
CREATE TABLE t1(col1 INT, col2 VARCHAR(64)) ENGINE=Columnstore;
|
||||||
|
|
||||||
|
--exec mkdir -p /tmp/mtr-mcol5164
|
||||||
|
--exec awk 'BEGIN { for (i = 0; i < 11; i++) { printf "%d,test%d,wrong\n", i, i; }; printf "%d,test%d-good", i, i; }' > /tmp/mtr-mcol5164/mcol5164.csv
|
||||||
|
|
||||||
|
--disable_result_log
|
||||||
|
--error 1 # exceeds default max-errors
|
||||||
|
--exec $MCS_CPIMPORT -s , -L /tmp/mtr-mcol5164 mcol5164 t1 /tmp/mtr-mcol5164/mcol5164.csv
|
||||||
|
--enable_result_log
|
||||||
|
SELECT * FROM t1;
|
||||||
|
--exec echo Rejected rows:
|
||||||
|
--exec cat /tmp/mtr-mcol5164/mcol5164.csv*.bad
|
||||||
|
--exec rm -f /tmp/mtr-mcol5164/*.err
|
||||||
|
--exec rm -f /tmp/mtr-mcol5164/*.bad
|
||||||
|
TRUNCATE t1;
|
||||||
|
|
||||||
|
# implicitly set max-errors
|
||||||
|
--disable_result_log
|
||||||
|
--exec $MCS_CPIMPORT -s , -e 11 -L /tmp/mtr-mcol5164 mcol5164 t1 /tmp/mtr-mcol5164/mcol5164.csv
|
||||||
|
--enable_result_log
|
||||||
|
SELECT * FROM t1;
|
||||||
|
--exec echo Rejected rows:
|
||||||
|
--exec cat /tmp/mtr-mcol5164/mcol5164.csv*.bad
|
||||||
|
--exec rm -f /tmp/mtr-mcol5164/*.err
|
||||||
|
--exec rm -f /tmp/mtr-mcol5164/*.bad
|
||||||
|
TRUNCATE t1;
|
||||||
|
|
||||||
|
# max-errors = all
|
||||||
|
--disable_result_log
|
||||||
|
--exec $MCS_CPIMPORT -s , -e all -L /tmp/mtr-mcol5164 mcol5164 t1 /tmp/mtr-mcol5164/mcol5164.csv
|
||||||
|
--enable_result_log
|
||||||
|
SELECT * FROM t1;
|
||||||
|
--exec echo Rejected rows:
|
||||||
|
--exec cat /tmp/mtr-mcol5164/mcol5164.csv*.bad
|
||||||
|
|
||||||
|
|
||||||
|
# Clean UP
|
||||||
|
--exec rm -rf /tmp/mtr-mcol5164
|
||||||
|
DROP DATABASE mcol5164;
|
@ -530,14 +530,14 @@ int BulkLoad::preProcess(Job& job, int tableNo, std::shared_ptr<TableInfo>& tabl
|
|||||||
if (pwd)
|
if (pwd)
|
||||||
tableInfo->setUIDGID(pwd->pw_uid, pwd->pw_gid);
|
tableInfo->setUIDGID(pwd->pw_uid, pwd->pw_gid);
|
||||||
|
|
||||||
if (fMaxErrors != -1)
|
if (fMaxErrors != MAX_ERRORS_DEFAULT)
|
||||||
tableInfo->setMaxErrorRows(fMaxErrors);
|
tableInfo->setMaxErrorRows(fMaxErrors);
|
||||||
else
|
else
|
||||||
tableInfo->setMaxErrorRows(job.jobTableList[tableNo].maxErrNum);
|
tableInfo->setMaxErrorRows(job.jobTableList[tableNo].maxErrNum);
|
||||||
|
|
||||||
// @bug 3929: cpimport.bin error messaging using up too much memory.
|
// @bug 3929: cpimport.bin error messaging using up too much memory.
|
||||||
// Validate that max allowed error count is within valid range
|
// Validate that max allowed error count is within valid range
|
||||||
long long maxErrNum = tableInfo->getMaxErrorRows();
|
int maxErrNum = tableInfo->getMaxErrorRows();
|
||||||
|
|
||||||
if (maxErrNum > MAX_ALLOW_ERROR_COUNT)
|
if (maxErrNum > MAX_ALLOW_ERROR_COUNT)
|
||||||
{
|
{
|
||||||
|
@ -129,7 +129,7 @@ class BulkLoad : public FileOp
|
|||||||
void setEscapeChar(char esChar);
|
void setEscapeChar(char esChar);
|
||||||
void setSkipRows(size_t skipRows);
|
void setSkipRows(size_t skipRows);
|
||||||
void setKeepRbMetaFiles(bool keepMeta);
|
void setKeepRbMetaFiles(bool keepMeta);
|
||||||
void setMaxErrorCount(unsigned int maxErrors);
|
void setMaxErrorCount(int maxErrors);
|
||||||
void setNoOfParseThreads(int parseThreads);
|
void setNoOfParseThreads(int parseThreads);
|
||||||
void setNoOfReadThreads(int readThreads);
|
void setNoOfReadThreads(int readThreads);
|
||||||
void setNullStringMode(bool bMode);
|
void setNullStringMode(bool bMode);
|
||||||
@ -184,13 +184,13 @@ class BulkLoad : public FileOp
|
|||||||
|
|
||||||
Log fLog; // logger
|
Log fLog; // logger
|
||||||
|
|
||||||
int fNumOfParser; // total number of parser
|
int fNumOfParser{0}; // total number of parser
|
||||||
char fColDelim{0}; // delimits col values within a row
|
char fColDelim{0}; // delimits col values within a row
|
||||||
|
|
||||||
int fNoOfBuffers{-1}; // Number of read buffers
|
int fNoOfBuffers{-1}; // Number of read buffers
|
||||||
int fBufferSize{-1}; // Read buffer size
|
int fBufferSize{-1}; // Read buffer size
|
||||||
int fFileVbufSize{-1}; // Internal file system buffer size
|
int fFileVbufSize{-1}; // Internal file system buffer size
|
||||||
long long fMaxErrors{-1}; // Max allowable errors per job
|
long long fMaxErrors{MAX_ERRORS_DEFAULT}; // Max allowable errors per job
|
||||||
std::string fAlternateImportDir; // Alternate bulk import directory
|
std::string fAlternateImportDir; // Alternate bulk import directory
|
||||||
std::string fErrorDir; // Opt. where error records record
|
std::string fErrorDir; // Opt. where error records record
|
||||||
std::string fProcessName; // Application process name
|
std::string fProcessName; // Application process name
|
||||||
@ -429,10 +429,7 @@ inline void BulkLoad::setKeepRbMetaFiles(bool keepMeta)
|
|||||||
fKeepRbMetaFiles = keepMeta;
|
fKeepRbMetaFiles = keepMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mutator takes an unsigned int, but we store in a long long, because...
|
inline void BulkLoad::setMaxErrorCount(int maxErrors)
|
||||||
// TableInfo which eventually needs this attribute, takes an unsigned int,
|
|
||||||
// but we want to be able to init to -1, to indicate when it has not been set.
|
|
||||||
inline void BulkLoad::setMaxErrorCount(unsigned int maxErrors)
|
|
||||||
{
|
{
|
||||||
fMaxErrors = maxErrors;
|
fMaxErrors = maxErrors;
|
||||||
}
|
}
|
||||||
|
@ -2049,7 +2049,7 @@ int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, int tokenPos, RID s
|
|||||||
int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
|
int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
|
||||||
size_t* parse_length, size_t& skipRows, RID& totalReadRows,
|
size_t* parse_length, size_t& skipRows, RID& totalReadRows,
|
||||||
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
||||||
unsigned int allowedErrCntThisCall)
|
int allowedErrCntThisCall)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
||||||
reset();
|
reset();
|
||||||
@ -2153,7 +2153,7 @@ int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const ch
|
|||||||
int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows,
|
int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows,
|
||||||
RID& totalReadRows, RID& correctTotalRows,
|
RID& totalReadRows, RID& correctTotalRows,
|
||||||
const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
||||||
unsigned int allowedErrCntThisCall)
|
int allowedErrCntThisCall)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
||||||
reset();
|
reset();
|
||||||
@ -2277,7 +2277,7 @@ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* hand
|
|||||||
// depending on whether the user has enabled the "enclosed by" feature.
|
// depending on whether the user has enabled the "enclosed by" feature.
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
||||||
unsigned int allowedErrCntThisCall, size_t& skipRows)
|
int allowedErrCntThisCall, size_t& skipRows)
|
||||||
{
|
{
|
||||||
unsigned offset = 0; // length of field
|
unsigned offset = 0; // length of field
|
||||||
unsigned curCol = 0; // dest db column counter within a row
|
unsigned curCol = 0; // dest db column counter within a row
|
||||||
@ -2789,7 +2789,7 @@ void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
|||||||
// Quit if we exceed max allowable errors for this call.
|
// Quit if we exceed max allowable errors for this call.
|
||||||
// We set lastRowHead = p, so that the code that follows this
|
// We set lastRowHead = p, so that the code that follows this
|
||||||
// loop won't try to save any data in fOverflowBuf.
|
// loop won't try to save any data in fOverflowBuf.
|
||||||
if (errorCount > allowedErrCntThisCall)
|
if (allowedErrCntThisCall != MAX_ERRORS_ALL && errorCount > static_cast<unsigned>(allowedErrCntThisCall))
|
||||||
{
|
{
|
||||||
lastRowHead = p + 1;
|
lastRowHead = p + 1;
|
||||||
p++;
|
p++;
|
||||||
@ -2928,7 +2928,7 @@ void BulkLoadBuffer::resizeTokenArray()
|
|||||||
// then tokenize() will stop reading data and exit.
|
// then tokenize() will stop reading data and exit.
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
int BulkLoadBuffer::tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
int BulkLoadBuffer::tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
||||||
unsigned int allowedErrCntThisCall, bool bEndOfData)
|
int allowedErrCntThisCall, bool bEndOfData)
|
||||||
{
|
{
|
||||||
unsigned curCol = 0; // dest db column counter within a row
|
unsigned curCol = 0; // dest db column counter within a row
|
||||||
unsigned curRowNum = 0; // "total" number of rows read during this call
|
unsigned curRowNum = 0; // "total" number of rows read during this call
|
||||||
@ -3082,7 +3082,7 @@ int BulkLoadBuffer::tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsI
|
|||||||
errorCount++;
|
errorCount++;
|
||||||
|
|
||||||
// Quit if we exceed max allowable errors for this call
|
// Quit if we exceed max allowable errors for this call
|
||||||
if (errorCount > allowedErrCntThisCall)
|
if (allowedErrCntThisCall != MAX_ERRORS_ALL && errorCount > static_cast<unsigned>(allowedErrCntThisCall))
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,12 +215,12 @@ class BulkLoadBuffer
|
|||||||
|
|
||||||
/** @brief tokenize the buffer contents and fill up the token array.
|
/** @brief tokenize the buffer contents and fill up the token array.
|
||||||
*/
|
*/
|
||||||
void tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall,
|
void tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo, int allowedErrCntThisCall,
|
||||||
size_t& skipRows);
|
size_t& skipRows);
|
||||||
|
|
||||||
/** @brief Binary tokenization of the buffer, and fill up the token array.
|
/** @brief Binary tokenization of the buffer, and fill up the token array.
|
||||||
*/
|
*/
|
||||||
int tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall,
|
int tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo, int allowedErrCntThisCall,
|
||||||
bool bEndOfData);
|
bool bEndOfData);
|
||||||
|
|
||||||
/** @brief Determine if specified value is NULL or not.
|
/** @brief Determine if specified value is NULL or not.
|
||||||
@ -275,13 +275,13 @@ class BulkLoadBuffer
|
|||||||
|
|
||||||
int fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
|
int fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
|
||||||
size_t* parse_length, size_t& skipRows, RID& totalReadRows, RID& correctTotalRows,
|
size_t* parse_length, size_t& skipRows, RID& totalReadRows, RID& correctTotalRows,
|
||||||
const boost::ptr_vector<ColumnInfo>& columnsInfo, unsigned int allowedErrCntThisCall);
|
const boost::ptr_vector<ColumnInfo>& columnsInfo, int allowedErrCntThisCall);
|
||||||
|
|
||||||
/** @brief Read the table data into the buffer
|
/** @brief Read the table data into the buffer
|
||||||
*/
|
*/
|
||||||
int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows, RID& totalRows,
|
int fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, size_t& skipRows, RID& totalRows,
|
||||||
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
||||||
unsigned int allowedErrCntThisCall);
|
int allowedErrCntThisCall);
|
||||||
|
|
||||||
/** @brief Get the overflow size
|
/** @brief Get the overflow size
|
||||||
*/
|
*/
|
||||||
|
@ -70,8 +70,7 @@ WECmdArgs::WECmdArgs(int argc, char** argv)
|
|||||||
DECLARE_INT_ARG("read-buffer-size,c", fReadBufSize, 1, INT_MAX,
|
DECLARE_INT_ARG("read-buffer-size,c", fReadBufSize, 1, INT_MAX,
|
||||||
"Application read buffer size (in bytes)")
|
"Application read buffer size (in bytes)")
|
||||||
DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message")
|
DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message")
|
||||||
DECLARE_INT_ARG("max-errors,e", fMaxErrors, 0, INT_MAX,
|
("max-errors,e", po::value<string>(), "Maximum number (or 'all') of allowable error per table per PM")
|
||||||
"Maximum number of allowable error per table per PM")
|
|
||||||
("file-path,f", po::value<string>(&fPmFilePath),
|
("file-path,f", po::value<string>(&fPmFilePath),
|
||||||
"Data file directory path. Default is current working directory.\n"
|
"Data file directory path. Default is current working directory.\n"
|
||||||
"\tIn Mode 1, represents the local input file path.\n"
|
"\tIn Mode 1, represents the local input file path.\n"
|
||||||
@ -304,6 +303,24 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
|
|||||||
fAllowMissingColumn = true;
|
fAllowMissingColumn = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (vm.contains("max-errors"))
|
||||||
|
{
|
||||||
|
auto optarg= vm["max-errors"].as<string>();
|
||||||
|
if (optarg == "all")
|
||||||
|
{
|
||||||
|
fMaxErrors = MAX_ERRORS_ALL;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
errno = 0;
|
||||||
|
long lValue = strtol(optarg.c_str(), nullptr, 10);
|
||||||
|
if (errno != 0 || lValue < 0 || lValue > INT_MAX)
|
||||||
|
{
|
||||||
|
startupError("Option --max-errors/-e is invalid or out of range");
|
||||||
|
}
|
||||||
|
fMaxErrors = lValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (fArgMode != -1)
|
if (fArgMode != -1)
|
||||||
fMode = fArgMode; // BUG 4210
|
fMode = fArgMode; // BUG 4210
|
||||||
@ -337,10 +354,7 @@ void WECmdArgs::fillParams(BulkLoad& curJob, std::string& sJobIdStr, std::string
|
|||||||
|
|
||||||
curJob.setReadBufferCount(fIOReadBufSize);
|
curJob.setReadBufferCount(fIOReadBufSize);
|
||||||
curJob.setReadBufferSize(fReadBufSize);
|
curJob.setReadBufferSize(fReadBufSize);
|
||||||
if (fMaxErrors >= 0)
|
curJob.setMaxErrorCount(fMaxErrors);
|
||||||
{
|
|
||||||
curJob.setMaxErrorCount(fMaxErrors);
|
|
||||||
}
|
|
||||||
if (!fPmFilePath.empty())
|
if (!fPmFilePath.empty())
|
||||||
{
|
{
|
||||||
importPath = fPmFilePath;
|
importPath = fPmFilePath;
|
||||||
|
@ -91,7 +91,7 @@ private:
|
|||||||
|
|
||||||
int fNoOfReadThrds{1}; // No. of read buffers
|
int fNoOfReadThrds{1}; // No. of read buffers
|
||||||
int fDebugLvl{0}; // Debug level
|
int fDebugLvl{0}; // Debug level
|
||||||
int fMaxErrors{-1}; // Max allowable errors
|
int fMaxErrors{MAX_ERRORS_DEFAULT}; // Max allowable errors
|
||||||
int fReadBufSize{-1}; // Read buffer size
|
int fReadBufSize{-1}; // Read buffer size
|
||||||
int fIOReadBufSize{-1}; // I/O read buffer size
|
int fIOReadBufSize{-1}; // I/O read buffer size
|
||||||
int fSetBufSize{0}; // Buff size w/setvbuf
|
int fSetBufSize{0}; // Buff size w/setvbuf
|
||||||
|
@ -412,7 +412,11 @@ int TableInfo::readTableData()
|
|||||||
// We keep a running total of read errors; fMaxErrorRows specifies
|
// We keep a running total of read errors; fMaxErrorRows specifies
|
||||||
// the error limit. Here's where we see how many more errors we
|
// the error limit. Here's where we see how many more errors we
|
||||||
// still have below the limit, and we pass this to fillFromFile().
|
// still have below the limit, and we pass this to fillFromFile().
|
||||||
unsigned allowedErrCntThisCall = ((fMaxErrorRows > fTotalErrRows) ? (fMaxErrorRows - fTotalErrRows) : 0);
|
int allowedErrCntThisCall;
|
||||||
|
if (fMaxErrorRows == MAX_ERRORS_ALL)
|
||||||
|
allowedErrCntThisCall = MAX_ERRORS_ALL;
|
||||||
|
else
|
||||||
|
allowedErrCntThisCall = static_cast<unsigned>(fMaxErrorRows) > fTotalErrRows ? fMaxErrorRows - fTotalErrRows : 0;
|
||||||
|
|
||||||
// Fill in the specified buffer.
|
// Fill in the specified buffer.
|
||||||
// fTotalReadRowsPerInputFile is ongoing total number of rows read,
|
// fTotalReadRowsPerInputFile is ongoing total number of rows read,
|
||||||
@ -485,7 +489,7 @@ int TableInfo::readTableData()
|
|||||||
writeErrorList(&fBuffers[readBufNo].getErrorRows(), &fBuffers[readBufNo].getExactErrorRows(), false);
|
writeErrorList(&fBuffers[readBufNo].getErrorRows(), &fBuffers[readBufNo].getExactErrorRows(), false);
|
||||||
fBuffers[readBufNo].clearErrRows();
|
fBuffers[readBufNo].clearErrRows();
|
||||||
|
|
||||||
if (fTotalErrRows > fMaxErrorRows)
|
if (fMaxErrorRows != MAX_ERRORS_ALL && fTotalErrRows > static_cast<unsigned>(fMaxErrorRows))
|
||||||
{
|
{
|
||||||
// flush the reject data file and output the rejected rows
|
// flush the reject data file and output the rejected rows
|
||||||
// flush err file and output the rejected row id and the reason.
|
// flush err file and output the rejected row id and the reason.
|
||||||
|
@ -85,7 +85,7 @@ class TableInfo : public WeUIDGID
|
|||||||
// for this table. Is volatile to
|
// for this table. Is volatile to
|
||||||
// insure parser & reader threads
|
// insure parser & reader threads
|
||||||
// see the latest value.
|
// see the latest value.
|
||||||
unsigned fMaxErrorRows; // Maximum error rows
|
int fMaxErrorRows; // Maximum error rows
|
||||||
int fLastBufferId; // Id of the last buffer
|
int fLastBufferId; // Id of the last buffer
|
||||||
char* fFileBuffer; // File buffer passed to setvbuf()
|
char* fFileBuffer; // File buffer passed to setvbuf()
|
||||||
int fCurrentParseBuffer; // Id of leading current buffer being
|
int fCurrentParseBuffer; // Id of leading current buffer being
|
||||||
@ -298,7 +298,7 @@ class TableInfo : public WeUIDGID
|
|||||||
|
|
||||||
/** @brief Get the number of maximum allowed error rows
|
/** @brief Get the number of maximum allowed error rows
|
||||||
*/
|
*/
|
||||||
unsigned getMaxErrorRows() const;
|
int getMaxErrorRows() const;
|
||||||
|
|
||||||
/** @brief retrieve the tuncation as error setting for this
|
/** @brief retrieve the tuncation as error setting for this
|
||||||
* import. When set, this causes char and varchar strings
|
* import. When set, this causes char and varchar strings
|
||||||
@ -309,7 +309,7 @@ class TableInfo : public WeUIDGID
|
|||||||
|
|
||||||
/** @brief set the maximum number of error rows allowed
|
/** @brief set the maximum number of error rows allowed
|
||||||
*/
|
*/
|
||||||
void setMaxErrorRows(const unsigned int maxErrorRows);
|
void setMaxErrorRows(int maxErrorRows);
|
||||||
|
|
||||||
/** @brief Set mode to treat "NULL" string as NULL value or not.
|
/** @brief Set mode to treat "NULL" string as NULL value or not.
|
||||||
*/
|
*/
|
||||||
@ -513,7 +513,7 @@ inline Status TableInfo::getStatusTI() const
|
|||||||
return fStatusTI;
|
return fStatusTI;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline unsigned TableInfo::getMaxErrorRows() const
|
inline int TableInfo::getMaxErrorRows() const
|
||||||
{
|
{
|
||||||
return fMaxErrorRows;
|
return fMaxErrorRows;
|
||||||
}
|
}
|
||||||
@ -630,7 +630,7 @@ inline void TableInfo::setLoadFilesInput(bool bReadFromStdin, bool bReadFromS3,
|
|||||||
fS3Region = s3region;
|
fS3Region = s3region;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void TableInfo::setMaxErrorRows(const unsigned int maxErrorRows)
|
inline void TableInfo::setMaxErrorRows(int maxErrorRows)
|
||||||
{
|
{
|
||||||
fMaxErrorRows = maxErrorRows;
|
fMaxErrorRows = maxErrorRows;
|
||||||
}
|
}
|
||||||
|
@ -144,6 +144,13 @@ enum ImportDataMode
|
|||||||
IMPORT_DATA_BIN_SAT_NULL = 2
|
IMPORT_DATA_BIN_SAT_NULL = 2
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Max number of ignored errors
|
||||||
|
enum MaxErrors
|
||||||
|
{
|
||||||
|
MAX_ERRORS_DEFAULT = -1, // default value
|
||||||
|
MAX_ERRORS_ALL = -2 // special case: ignore all errors
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the set of Calpont column data type names; MUST match ColDataType in
|
* the set of Calpont column data type names; MUST match ColDataType in
|
||||||
* calpontsystemcatalog.h.
|
* calpontsystemcatalog.h.
|
||||||
|
@ -71,8 +71,7 @@ WECmdArgs::WECmdArgs(int argc, char** argv)
|
|||||||
DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message")
|
DECLARE_INT_ARG("debug,d", fDebugLvl, 1, 3, "Print different level(1-3) debug message")
|
||||||
("verbose,v", po::value<string>())
|
("verbose,v", po::value<string>())
|
||||||
("silent,N", po::bool_switch())
|
("silent,N", po::bool_switch())
|
||||||
DECLARE_INT_ARG("max-errors,e", fMaxErrors, 0, INT_MAX,
|
("max-errors,e", po::value<string>(), "Maximum number (or 'all') of allowable error per table per PM")
|
||||||
"Maximum number of allowable error per table per PM")
|
|
||||||
("file-path,f", po::value<string>(&fPmFilePath),
|
("file-path,f", po::value<string>(&fPmFilePath),
|
||||||
"Data file directory path. Default is current working directory.\n"
|
"Data file directory path. Default is current working directory.\n"
|
||||||
"\tIn Mode 1, represents the local input file path.\n"
|
"\tIn Mode 1, represents the local input file path.\n"
|
||||||
@ -230,7 +229,9 @@ std::string WECmdArgs::getCpImportCmdLine(bool skipRows)
|
|||||||
if (fNoOfWriteThrds > 0)
|
if (fNoOfWriteThrds > 0)
|
||||||
aSS << " -w " << fNoOfWriteThrds;
|
aSS << " -w " << fNoOfWriteThrds;
|
||||||
|
|
||||||
if (fMaxErrors >= 0)
|
if (fMaxErrors == MAX_ERRORS_ALL)
|
||||||
|
aSS << " -e all ";
|
||||||
|
else if (fMaxErrors != MAX_ERRORS_DEFAULT)
|
||||||
aSS << " -e " << fMaxErrors;
|
aSS << " -e " << fMaxErrors;
|
||||||
|
|
||||||
// BUG 5088
|
// BUG 5088
|
||||||
@ -446,7 +447,7 @@ bool WECmdArgs::checkForCornerCases()
|
|||||||
cout << "Invalid option -b with Mode 0" << endl;
|
cout << "Invalid option -b with Mode 0" << endl;
|
||||||
throw(runtime_error("Mismatched options."));
|
throw(runtime_error("Mismatched options."));
|
||||||
}
|
}
|
||||||
else if (fMaxErrors >= 0)
|
else if (fMaxErrors >= 0 || fMaxErrors == MAX_ERRORS_ALL)
|
||||||
{
|
{
|
||||||
cout << "Invalid option -e with Mode 0" << endl;
|
cout << "Invalid option -e with Mode 0" << endl;
|
||||||
throw(runtime_error("Mismatched options."));
|
throw(runtime_error("Mismatched options."));
|
||||||
@ -735,6 +736,24 @@ void WECmdArgs::parseCmdLineArgs(int argc, char** argv)
|
|||||||
fBatchQty = 10000;
|
fBatchQty = 10000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (vm.contains("max-errors"))
|
||||||
|
{
|
||||||
|
auto optarg = vm["max-errors"].as<string>();
|
||||||
|
if (optarg == "all")
|
||||||
|
{
|
||||||
|
fMaxErrors = MAX_ERRORS_ALL;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
errno = 0;
|
||||||
|
long lValue = strtol(optarg.c_str(), nullptr, 10);
|
||||||
|
if (errno != 0 || lValue < 0 || lValue > INT_MAX)
|
||||||
|
{
|
||||||
|
throw runtime_error("Option --max-errors/-e is invalid or out of range");
|
||||||
|
}
|
||||||
|
fMaxErrors = lValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (fArgMode != -1)
|
if (fArgMode != -1)
|
||||||
fMode = fArgMode; // BUG 4210
|
fMode = fArgMode; // BUG 4210
|
||||||
|
@ -323,7 +323,7 @@ class WECmdArgs
|
|||||||
int fBatchQty{10000}; // No. of batch Qty.
|
int fBatchQty{10000}; // No. of batch Qty.
|
||||||
int fNoOfReadThrds{0}; // No. of read buffers
|
int fNoOfReadThrds{0}; // No. of read buffers
|
||||||
int fDebugLvl{0}; // Debug level
|
int fDebugLvl{0}; // Debug level
|
||||||
int fMaxErrors{-1}; // Max allowable errors
|
int fMaxErrors{MAX_ERRORS_DEFAULT}; // Max allowable errors
|
||||||
int fReadBufSize{0}; // Read buffer size
|
int fReadBufSize{0}; // Read buffer size
|
||||||
int fIOReadBufSize{0}; // I/O read buffer size
|
int fIOReadBufSize{0}; // I/O read buffer size
|
||||||
int fSetBufSize{0}; // Buff size w/setvbuf
|
int fSetBufSize{0}; // Buff size w/setvbuf
|
||||||
|
@ -230,8 +230,16 @@ void XMLGenProc::makeTableData(const CalpontSystemCatalog::TableName& table, con
|
|||||||
}
|
}
|
||||||
|
|
||||||
xmlTextWriterWriteAttribute(fWriter, BAD_CAST xmlTagTable[TAG_LOAD_NAME], BAD_CAST tmp.c_str());
|
xmlTextWriterWriteAttribute(fWriter, BAD_CAST xmlTagTable[TAG_LOAD_NAME], BAD_CAST tmp.c_str());
|
||||||
xmlTextWriterWriteFormatAttribute(fWriter, BAD_CAST xmlTagTable[TAG_MAX_ERR_ROW], "%d",
|
auto sMaxErrors = fInputMgr->getParm(XMLGenData::MAXERROR);
|
||||||
atoi(fInputMgr->getParm(XMLGenData::MAXERROR).c_str()));
|
if (sMaxErrors == "all")
|
||||||
|
{
|
||||||
|
xmlTextWriterWriteAttribute(fWriter, BAD_CAST xmlTagTable[TAG_MAX_ERR_ROW], BAD_CAST sMaxErrors.c_str());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
xmlTextWriterWriteFormatAttribute(fWriter, BAD_CAST xmlTagTable[TAG_MAX_ERR_ROW], "%d",
|
||||||
|
atoi(sMaxErrors.c_str()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
kount++;
|
kount++;
|
||||||
|
@ -49,9 +49,7 @@ namespace WriteEngine
|
|||||||
// Constructor
|
// Constructor
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
XMLJob::XMLJob()
|
XMLJob::XMLJob()
|
||||||
: fDeleteTempFile(false)
|
: fDeleteTempFile(false), fValidateColList(true), fTimeZone(dataconvert::systemTimeZoneOffset())
|
||||||
, fValidateColList(true)
|
|
||||||
, fTimeZone(dataconvert::systemTimeZoneOffset())
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -197,7 +195,7 @@ void XMLJob::printJobInfo(Log& logger) const
|
|||||||
|
|
||||||
logger.logMsg(oss3.str(), MSGLVL_INFO2);
|
logger.logMsg(oss3.str(), MSGLVL_INFO2);
|
||||||
} // end of loop through columns in a table
|
} // end of loop through columns in a table
|
||||||
} // end of loop through tables
|
} // end of loop through tables
|
||||||
}
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
@ -473,8 +471,13 @@ void XMLJob::setJobDataTable(xmlNode* pNode)
|
|||||||
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_LOAD_NAME], bufString))
|
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_LOAD_NAME], bufString))
|
||||||
curTable.loadFileName = bufString;
|
curTable.loadFileName = bufString;
|
||||||
|
|
||||||
if (getNodeAttribute(pNode, xmlTagTable[TAG_MAX_ERR_ROW], &intVal, TYPE_INT))
|
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_MAX_ERR_ROW], bufString))
|
||||||
curTable.maxErrNum = intVal;
|
{
|
||||||
|
if (bufString == "all")
|
||||||
|
curTable.maxErrNum = MAX_ERRORS_ALL;
|
||||||
|
else
|
||||||
|
curTable.maxErrNum = atoi(bufString.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
fJob.jobTableList.push_back(curTable);
|
fJob.jobTableList.push_back(curTable);
|
||||||
}
|
}
|
||||||
@ -683,7 +686,6 @@ void XMLJob::initSatLimits(JobColumn& curColumn) const
|
|||||||
{
|
{
|
||||||
curColumn.fMaxIntSat = dataconvert::decimalRangeUp<int128_t>(curColumn.precision);
|
curColumn.fMaxIntSat = dataconvert::decimalRangeUp<int128_t>(curColumn.precision);
|
||||||
curColumn.fMinIntSat = -curColumn.fMaxIntSat;
|
curColumn.fMinIntSat = -curColumn.fMaxIntSat;
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL])
|
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL])
|
||||||
{
|
{
|
||||||
@ -987,12 +989,13 @@ void XMLJob::fillInXMLDataNotNullDefault(const std::string& fullTblName,
|
|||||||
{
|
{
|
||||||
if (LIKELY(colType.colWidth == datatypes::MAXDECIMALWIDTH))
|
if (LIKELY(colType.colWidth == datatypes::MAXDECIMALWIDTH))
|
||||||
{
|
{
|
||||||
col.fDefaultWideDecimal = colType.decimal128FromString(col_defaultValue.safeString(), &bDefaultConvertError);
|
col.fDefaultWideDecimal =
|
||||||
|
colType.decimal128FromString(col_defaultValue.safeString(), &bDefaultConvertError);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
col.fDefaultInt = Convertor::convertDecimalString(col_defaultValue.str(),
|
col.fDefaultInt = Convertor::convertDecimalString(col_defaultValue.str(), col_defaultValue.length(),
|
||||||
col_defaultValue.length(), colType.scale);
|
colType.scale);
|
||||||
|
|
||||||
if (errno == ERANGE)
|
if (errno == ERANGE)
|
||||||
bDefaultConvertError = true;
|
bDefaultConvertError = true;
|
||||||
@ -1004,9 +1007,8 @@ void XMLJob::fillInXMLDataNotNullDefault(const std::string& fullTblName,
|
|||||||
case execplan::CalpontSystemCatalog::DATE:
|
case execplan::CalpontSystemCatalog::DATE:
|
||||||
{
|
{
|
||||||
int convertStatus;
|
int convertStatus;
|
||||||
int32_t dt = dataconvert::DataConvert::convertColumnDate(col_defaultValue.str(),
|
int32_t dt = dataconvert::DataConvert::convertColumnDate(
|
||||||
dataconvert::CALPONTDATE_ENUM, convertStatus,
|
col_defaultValue.str(), dataconvert::CALPONTDATE_ENUM, convertStatus, col_defaultValue.length());
|
||||||
col_defaultValue.length());
|
|
||||||
|
|
||||||
if (convertStatus != 0)
|
if (convertStatus != 0)
|
||||||
bDefaultConvertError = true;
|
bDefaultConvertError = true;
|
||||||
@ -1046,9 +1048,8 @@ void XMLJob::fillInXMLDataNotNullDefault(const std::string& fullTblName,
|
|||||||
case execplan::CalpontSystemCatalog::TIME:
|
case execplan::CalpontSystemCatalog::TIME:
|
||||||
{
|
{
|
||||||
int convertStatus;
|
int convertStatus;
|
||||||
int64_t dt = dataconvert::DataConvert::convertColumnTime(col_defaultValue.str(),
|
int64_t dt = dataconvert::DataConvert::convertColumnTime(
|
||||||
dataconvert::CALPONTTIME_ENUM, convertStatus,
|
col_defaultValue.str(), dataconvert::CALPONTTIME_ENUM, convertStatus, col_defaultValue.length());
|
||||||
col_defaultValue.length());
|
|
||||||
|
|
||||||
if (convertStatus != 0)
|
if (convertStatus != 0)
|
||||||
bDefaultConvertError = true;
|
bDefaultConvertError = true;
|
||||||
|
Reference in New Issue
Block a user