mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
4919 lines
145 KiB
C++
4919 lines
145 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*********************************************************************
|
|
* $Id: we_bulkloadbuffer.cpp 4661 2013-06-04 12:59:50Z dcathey $
|
|
*
|
|
********************************************************************/
|
|
|
|
#include <sys/time.h>
|
|
#include <sstream>
|
|
#include <string>
|
|
#include <stdint.h>
|
|
#include <cerrno>
|
|
#include <cstring>
|
|
#include <cstdlib> // includes <alloca.h> on linux
|
|
#include <cmath>
|
|
#include <ctype.h>
|
|
#include <cfloat>
|
|
|
|
#include "we_bulkload.h"
|
|
#include "we_bulkloadbuffer.h"
|
|
#include "we_brm.h"
|
|
#include "we_convertor.h"
|
|
#include "we_log.h"
|
|
#include "brmtypes.h"
|
|
#include "dataconvert.h"
|
|
#include "exceptclasses.h"
|
|
#include "mcs_decimal.h"
|
|
#include "mcs_datatype.h"
|
|
|
|
#include "joblisttypes.h"
|
|
|
|
#include "utils_utf8.h" // utf8_truncate_point()
|
|
|
|
#include <arrow/io/api.h>
|
|
#include <parquet/arrow/reader.h>
|
|
#include <parquet/exception.h>
|
|
|
|
using namespace std;
|
|
using namespace boost;
|
|
using namespace execplan;
|
|
|
|
namespace
|
|
{
|
|
const std::string INPUT_ERROR_WRONG_NO_COLUMNS = "Data contains wrong number of columns";
|
|
const std::string INPUT_ERROR_TOO_LONG = "Data in wrong format; exceeds max field length; ";
|
|
const std::string INPUT_ERROR_NULL_CONSTRAINT = "Data violates NOT NULL constraint with no default";
|
|
const std::string INPUT_ERROR_ODD_VARBINARY_LENGTH = "VarBinary column is incomplete; odd number of bytes; ";
|
|
const std::string INPUT_ERROR_STRING_TOO_LONG = "Character data exceeds max field length; ";
|
|
const char NULL_CHAR = 'N';
|
|
const char* NULL_VALUE_STRING = "NULL";
|
|
const char NULL_AUTO_INC_0 = '0';
|
|
const unsigned long long NULL_AUTO_INC_0_BINARY = 0;
|
|
const char NEWLINE_CHAR = '\n';
|
|
|
|
// Enumeration states related to parsing a column value
|
|
enum FieldParsingState
|
|
{
|
|
FLD_PARSE_LEADING_CHAR_STATE = 1, // parsing leading character
|
|
FLD_PARSE_ENCLOSED_STATE = 2, // parsing an enclosed column value
|
|
FLD_PARSE_TRAILING_CHAR_STATE = 3, // parsing bytes after an
|
|
// enclosed column value
|
|
FLD_PARSE_NORMAL_STATE = 4 // parsing non-enclosed column value
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Expand pRowData to size "newArrayCapacity", preserving contents, and
|
|
// deleting old pointer
|
|
//------------------------------------------------------------------------------
|
|
inline void resizeRowDataArray(char** pRowData, unsigned int dataLength, unsigned int newArrayCapacity)
|
|
{
|
|
char* tmpRaw = new char[newArrayCapacity];
|
|
memcpy(tmpRaw, *pRowData, dataLength);
|
|
delete[] *pRowData;
|
|
*pRowData = tmpRaw;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
// #define DEBUG_TOKEN_PARSING 1
|
|
|
|
namespace WriteEngine
|
|
{
|
|
//------------------------------------------------------------------------------
|
|
// BulkLoadBuffer constructor
|
|
//------------------------------------------------------------------------------
|
|
BulkLoadBuffer::BulkLoadBuffer(unsigned numberOfCols, unsigned bufferSize, Log* logger, int bufferId,
|
|
const std::string& tableName, const JobFieldRefList& jobFieldRefList)
|
|
: fOverflowSize(0)
|
|
, fParseComplete(0)
|
|
, fTotalRows(0)
|
|
, fStartRow(0)
|
|
, fStartRowForLogging(0)
|
|
, fAutoIncGenCount(0)
|
|
, fAutoIncNextValue(0)
|
|
, fReadSize(0)
|
|
, fLog(logger)
|
|
, fNullStringMode(false)
|
|
, fEnclosedByChar('\0')
|
|
, fEscapeChar('\\')
|
|
, fBufferId(bufferId)
|
|
, fTableName(tableName)
|
|
, fbTruncationAsError(false)
|
|
, fImportDataMode(IMPORT_DATA_TEXT)
|
|
, fTimeZone(dataconvert::systemTimeZoneOffset())
|
|
, fFixedBinaryRecLen(0)
|
|
{
|
|
// if it's non-parquet case, initialize the fData
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
fData = new char[bufferSize];
|
|
fOverflowBuf = NULL;
|
|
fStatusBLB = WriteEngine::NEW;
|
|
fNumberOfColumns = numberOfCols;
|
|
fBufferSize = bufferSize;
|
|
|
|
fColumnLocks.clear();
|
|
|
|
fTokens = 0;
|
|
|
|
fRowStatus.clear();
|
|
fErrRows.clear();
|
|
|
|
struct LockInfo info;
|
|
info.locker = -1;
|
|
info.status = WriteEngine::NEW;
|
|
|
|
fColumnLocks.resize(numberOfCols);
|
|
fColumnLocks.assign(fNumberOfColumns, info);
|
|
|
|
fTotalReadRowsParser = 0;
|
|
fStartRowParser = 0;
|
|
fDataParser = 0;
|
|
fTokensParser = 0;
|
|
fStartRowForLoggingParser = 0;
|
|
fAutoIncGenCountParser = 0;
|
|
fNumFieldsInFile = 0;
|
|
fNumColsInFile = 0;
|
|
|
|
// Count the total number of fields in the input file (fNumFieldsInFile)
|
|
// and the number of db columns that will be loaded from those fields
|
|
// (fNumColsInFile). Keep in mind that fNumColsInFile may be less than
|
|
// fNumFieldsInFile, because there may be fields we are to ignore, and/or
|
|
// some db columns may get default loaded without a corresponding field
|
|
// in the input file.
|
|
fFieldList.resize(jobFieldRefList.size());
|
|
|
|
for (unsigned k = 0; k < jobFieldRefList.size(); k++)
|
|
{
|
|
fFieldList[k] = jobFieldRefList[k];
|
|
|
|
switch (jobFieldRefList[k].fFldColType)
|
|
{
|
|
case BULK_FLDCOL_COLUMN_FIELD:
|
|
{
|
|
fNumColsInFile++;
|
|
fNumFieldsInFile++;
|
|
break;
|
|
}
|
|
|
|
case BULK_FLDCOL_IGNORE_FIELD:
|
|
{
|
|
fNumFieldsInFile++;
|
|
break;
|
|
}
|
|
|
|
case BULK_FLDCOL_COLUMN_DEFAULT:
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// BulkLoadBuffer destructor
|
|
//------------------------------------------------------------------------------
|
|
BulkLoadBuffer::~BulkLoadBuffer()
|
|
{
|
|
if (fData != NULL)
|
|
delete[] fData;
|
|
|
|
if (fOverflowBuf != NULL)
|
|
delete[] fOverflowBuf;
|
|
|
|
fColumnLocks.clear();
|
|
|
|
if (fTokens != NULL)
|
|
{
|
|
for (unsigned int i = 0; i < fTotalRows; ++i)
|
|
{
|
|
delete[] fTokens[i];
|
|
}
|
|
|
|
delete[] fTokens;
|
|
}
|
|
|
|
fRowStatus.clear();
|
|
fErrRows.clear();
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Resets state of buffer.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::reset()
|
|
{
|
|
fStartRow = fTotalReadRows = fTotalReadRowsForLog = 0;
|
|
fAutoIncGenCount = 0;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Resets state of buffer's column locks.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::resetColumnLocks()
|
|
{
|
|
fParseComplete = 0;
|
|
|
|
struct LockInfo info;
|
|
fColumnLocks.assign(fNumberOfColumns, info);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Copy overflow leftover from previous buffer into the start of "this" buffer
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::copyOverflow(const BulkLoadBuffer& buffer)
|
|
{
|
|
if (fOverflowBuf != NULL)
|
|
{
|
|
delete[] fOverflowBuf;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
|
|
fOverflowSize = buffer.fOverflowSize;
|
|
|
|
if (fOverflowSize != 0)
|
|
{
|
|
fOverflowBuf = new char[buffer.fOverflowSize];
|
|
memcpy(fOverflowBuf, buffer.fOverflowBuf, buffer.fOverflowSize);
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parse/convert the given "field" value based on the specified length and type.
|
|
// field (in) - the input field value to be parsed
|
|
// fieldLength (in) - number of bytes of data in "field"
|
|
// nullFlag (in) - indicates if NULL value is to be assigned to "output"
|
|
// rather than parsing the data in "field"
|
|
// output (out) - the parsed value taken from "field"
|
|
// column (in) - column information for the column we are parsing
|
|
// bufStats:
|
|
// minBufferVal (in/out) - ongoing min value for the Read buffer we are parsing
|
|
// maxBufferVal (in/out) - ongoing max value for the Read buffer we are parsing
|
|
// satCount (in/out) - ongoing saturation row count for buffer being parsed
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::convert(char* field, int fieldLength, bool nullFlag, unsigned char* output,
|
|
const JobColumn& column, BLBufferStats& bufStats)
|
|
{
|
|
char biVal;
|
|
int iVal;
|
|
float fVal;
|
|
double dVal;
|
|
short siVal;
|
|
void* pVal;
|
|
int32_t iDate;
|
|
char charTmpBuf[MAX_COLUMN_BOUNDARY + 1] = {0};
|
|
long long llVal = 0, llDate = 0;
|
|
int128_t bigllVal = 0;
|
|
uint64_t tmp64;
|
|
uint32_t tmp32;
|
|
uint8_t ubiVal;
|
|
uint16_t usiVal;
|
|
uint32_t uiVal;
|
|
uint64_t ullVal;
|
|
|
|
int width = column.width;
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Parse based on column data type
|
|
//--------------------------------------------------------------------------
|
|
switch (column.weType)
|
|
{
|
|
//----------------------------------------------------------------------
|
|
// FLOAT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_FLOAT:
|
|
{
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
fVal = column.fDefaultDbl;
|
|
pVal = &fVal;
|
|
}
|
|
else
|
|
{
|
|
tmp32 = joblist::FLOATNULL;
|
|
pVal = &tmp32;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
float minFltSat = column.fMinDblSat;
|
|
float maxFltSat = column.fMaxDblSat;
|
|
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&fVal, field, sizeof(fVal));
|
|
|
|
if (isnan(fVal))
|
|
{
|
|
if (signbit(fVal))
|
|
fVal = minFltSat;
|
|
else
|
|
fVal = maxFltSat;
|
|
|
|
bufStats.satCount++;
|
|
}
|
|
else
|
|
{
|
|
if (fVal > maxFltSat)
|
|
{
|
|
fVal = maxFltSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (fVal < minFltSat)
|
|
{
|
|
fVal = minFltSat;
|
|
bufStats.satCount++;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
|
|
fVal = strtof(field, 0);
|
|
|
|
if (errno == ERANGE)
|
|
{
|
|
if (abs(fVal) == HUGE_VALF)
|
|
{
|
|
if (fVal > 0)
|
|
fVal = maxFltSat;
|
|
else
|
|
fVal = minFltSat;
|
|
|
|
bufStats.satCount++;
|
|
}
|
|
else
|
|
fVal = 0;
|
|
}
|
|
else
|
|
{
|
|
if (fVal > maxFltSat)
|
|
{
|
|
fVal = maxFltSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (fVal < minFltSat)
|
|
{
|
|
fVal = minFltSat;
|
|
bufStats.satCount++;
|
|
}
|
|
if (fVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
fVal = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
pVal = &fVal;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// DOUBLE
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_DOUBLE:
|
|
{
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
dVal = column.fDefaultDbl;
|
|
pVal = &dVal;
|
|
}
|
|
else
|
|
{
|
|
tmp64 = joblist::DOUBLENULL;
|
|
pVal = &tmp64;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&dVal, field, sizeof(dVal));
|
|
|
|
if (std::isnan(dVal))
|
|
{
|
|
if (signbit(dVal))
|
|
dVal = column.fMinDblSat;
|
|
else
|
|
dVal = column.fMaxDblSat;
|
|
|
|
bufStats.satCount++;
|
|
}
|
|
else
|
|
{
|
|
if (dVal > column.fMaxDblSat)
|
|
{
|
|
dVal = column.fMaxDblSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (dVal < column.fMinDblSat)
|
|
{
|
|
dVal = column.fMinDblSat;
|
|
bufStats.satCount++;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
|
|
dVal = strtod(field, 0);
|
|
|
|
if (errno == ERANGE)
|
|
{
|
|
if (abs(dVal) == HUGE_VALL)
|
|
{
|
|
if (dVal > 0)
|
|
dVal = column.fMaxDblSat;
|
|
else
|
|
dVal = column.fMinDblSat;
|
|
|
|
bufStats.satCount++;
|
|
}
|
|
else
|
|
dVal = 0;
|
|
}
|
|
else
|
|
{
|
|
if (dVal > column.fMaxDblSat)
|
|
{
|
|
dVal = column.fMaxDblSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (dVal < column.fMinDblSat)
|
|
{
|
|
dVal = column.fMinDblSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (dVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
dVal = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
pVal = &dVal;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// CHARACTER
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_CHAR:
|
|
{
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
int defLen = column.fDefaultChr.length();
|
|
const char* defData = column.fDefaultChr.str();
|
|
|
|
if (defLen > column.definedWidth)
|
|
memcpy(charTmpBuf, defData, column.definedWidth);
|
|
else
|
|
memcpy(charTmpBuf, defData, defLen);
|
|
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
idbassert(width <= 8);
|
|
|
|
for (int i = 0; i < width - 1; i++)
|
|
{
|
|
charTmpBuf[i] = '\377';
|
|
}
|
|
|
|
charTmpBuf[width - 1] = '\376';
|
|
|
|
pVal = charTmpBuf;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// truncate string if it is too long
|
|
// @Bug 3040. Use definedWidth for the data truncation to keep
|
|
// from storing characters beyond the column's defined width.
|
|
// It contains the column definition width rather than the bytes
|
|
// on disk (e.g. 5 for a varchar(5) instead of 8).
|
|
if (column.cs->mbmaxlen > 1)
|
|
{
|
|
const CHARSET_INFO* cs = column.cs;
|
|
const char* start = (const char*)field;
|
|
const char* end = (const char*)(field + fieldLength);
|
|
size_t numChars = cs->numchars(start, end);
|
|
size_t maxCharLength = column.definedWidth / cs->mbmaxlen;
|
|
|
|
if (numChars > maxCharLength)
|
|
{
|
|
MY_STRCOPY_STATUS status;
|
|
cs->well_formed_char_length(start, end, maxCharLength, &status);
|
|
fieldLength = status.m_source_end_pos - start;
|
|
bufStats.satCount++;
|
|
}
|
|
}
|
|
else // cs->mbmaxlen == 1
|
|
{
|
|
if (fieldLength > column.definedWidth)
|
|
{
|
|
fieldLength = column.definedWidth;
|
|
bufStats.satCount++;
|
|
}
|
|
}
|
|
|
|
memcpy(charTmpBuf, field, fieldLength);
|
|
}
|
|
|
|
// Swap byte order before comparing character string
|
|
// Compare must be unsigned
|
|
uint64_t compChar = uint64ToStr(*(reinterpret_cast<uint64_t*>(charTmpBuf)));
|
|
int64_t binChar = static_cast<int64_t>(compChar);
|
|
|
|
// Update min/max range
|
|
uint64_t minVal = static_cast<uint64_t>(bufStats.minBufferVal);
|
|
uint64_t maxVal = static_cast<uint64_t>(bufStats.maxBufferVal);
|
|
if (compChar < minVal)
|
|
bufStats.minBufferVal = binChar;
|
|
if (compChar > maxVal)
|
|
bufStats.maxBufferVal = binChar;
|
|
|
|
pVal = charTmpBuf;
|
|
// cout << "In convert: fieldLength = " << fieldLength <<endl;
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// SHORT INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_SHORT:
|
|
{
|
|
long long origVal;
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
siVal = joblist::SMALLINTNULL;
|
|
pVal = &siVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
short int siVal2;
|
|
memcpy(&siVal2, field, sizeof(siVal2));
|
|
origVal = siVal2;
|
|
}
|
|
else
|
|
{
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
// errno is initialized and set in convertDecimalString
|
|
origVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
origVal = strtol(field, 0, 10);
|
|
}
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
|
|
// Saturate the value
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
origVal = 1;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (origVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (origVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
siVal = origVal;
|
|
pVal = &siVal;
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED SHORT INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_USHORT:
|
|
{
|
|
int64_t origVal = 0;
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
usiVal = joblist::USMALLINTNULL;
|
|
pVal = &usiVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
unsigned short int siVal2;
|
|
memcpy(&siVal2, field, sizeof(siVal2));
|
|
origVal = siVal2;
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
|
|
origVal = strtoll(field, 0, 10);
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
|
|
// Saturate the value (saturates any negative value to 0)
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
origVal = 1;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
usiVal = origVal;
|
|
pVal = &usiVal;
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// TINY INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_BYTE:
|
|
{
|
|
long long origVal;
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
biVal = joblist::TINYINTNULL;
|
|
pVal = &biVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
char biVal2;
|
|
memcpy(&biVal2, field, sizeof(biVal2));
|
|
origVal = biVal2;
|
|
}
|
|
else
|
|
{
|
|
if (isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
strcpy(field, "1");
|
|
fieldLength = 1;
|
|
}
|
|
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
// errno is initialized and set in convertDecimalString
|
|
origVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
origVal = strtol(field, 0, 10);
|
|
}
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
|
|
// Saturate the value
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (origVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (origVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
biVal = origVal;
|
|
pVal = &biVal;
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED TINY INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_UBYTE:
|
|
{
|
|
int64_t origVal = 0;
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
ubiVal = joblist::UTINYINTNULL;
|
|
pVal = &ubiVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
uint8_t biVal2;
|
|
memcpy(&biVal2, field, sizeof(biVal2));
|
|
origVal = biVal2;
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
|
|
origVal = strtoll(field, 0, 10);
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
|
|
// Saturate the value (saturates any negative value to 0)
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
origVal = 1;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
ubiVal = origVal;
|
|
pVal = &ubiVal;
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// BIG INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_LONGLONG:
|
|
{
|
|
bool bSatVal = false;
|
|
|
|
if (column.dataType != CalpontSystemCatalog::DATETIME &&
|
|
column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME)
|
|
{
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llVal = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
llVal = joblist::BIGINTNULL;
|
|
pVal = &llVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
llVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&llVal, field, sizeof(llVal));
|
|
}
|
|
else
|
|
{
|
|
if (isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
strcpy(field, "1");
|
|
fieldLength = 1;
|
|
}
|
|
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
// errno is initialized and set in convertDecimalString
|
|
llVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
llVal = strtoll(field, 0, 10);
|
|
}
|
|
}
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
|
|
// Saturate the value
|
|
if (llVal < column.fMinIntSat)
|
|
{
|
|
llVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (llVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
// llVal can be > fMaxIntSat if this is a decimal column
|
|
llVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (llVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llVal;
|
|
|
|
if (llVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llVal;
|
|
|
|
pVal = &llVal;
|
|
}
|
|
else if (column.dataType == CalpontSystemCatalog::TIME)
|
|
{
|
|
// time conversion
|
|
int rc = 0;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::TIMENULL;
|
|
pVal = &llDate;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&llDate, field, sizeof(llDate));
|
|
|
|
if (!dataconvert::DataConvert::isColumnTimeValid(llDate))
|
|
rc = -1;
|
|
}
|
|
else
|
|
{
|
|
llDate = dataconvert::DataConvert::convertColumnTime(field, dataconvert::CALPONTTIME_ENUM, rc,
|
|
fieldLength);
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
}
|
|
else if (column.dataType == CalpontSystemCatalog::TIMESTAMP)
|
|
{
|
|
// timestamp conversion
|
|
int rc = 0;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::TIMESTAMPNULL;
|
|
pVal = &llDate;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&llDate, field, sizeof(llDate));
|
|
|
|
if (!dataconvert::DataConvert::isColumnTimeStampValid(llDate))
|
|
rc = -1;
|
|
}
|
|
else
|
|
{
|
|
llDate = dataconvert::DataConvert::convertColumnTimestamp(
|
|
field, dataconvert::CALPONTDATETIME_ENUM, rc, fieldLength, fTimeZone);
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
llDate = 0;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
}
|
|
else
|
|
{
|
|
// datetime conversion
|
|
int rc = 0;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::DATETIMENULL;
|
|
pVal = &llDate;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&llDate, field, sizeof(llDate));
|
|
|
|
if (!dataconvert::DataConvert::isColumnDateTimeValid(llDate))
|
|
rc = -1;
|
|
}
|
|
else
|
|
{
|
|
llDate = dataconvert::DataConvert::convertColumnDatetime(field, dataconvert::CALPONTDATETIME_ENUM,
|
|
rc, fieldLength);
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
llDate = 0;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// WIDE DECIMAL
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_BINARY:
|
|
{
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
bigllVal = column.fDefaultWideDecimal;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
bigllVal = datatypes::Decimal128Null;
|
|
pVal = &bigllVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// TODO MCOL-641 Add support for int128_t version of
|
|
// fAutoIncNextValue
|
|
bigllVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&bigllVal, field, sizeof(bigllVal));
|
|
}
|
|
else
|
|
{
|
|
if (isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
strcpy(field, "1");
|
|
fieldLength = 1;
|
|
}
|
|
|
|
bool dummy = false;
|
|
// Value saturation to 9999... or -9999... is handled by
|
|
// number_int_value(), and the bSatVal flag is set to true
|
|
dataconvert::number_int_value(
|
|
string(field), column.dataType,
|
|
datatypes::TypeAttributesStd(column.width, column.scale, column.precision), dummy, false,
|
|
bigllVal, &bSatVal);
|
|
}
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (bigllVal < bufStats.bigMinBufferVal)
|
|
bufStats.bigMinBufferVal = bigllVal;
|
|
|
|
if (bigllVal > bufStats.bigMaxBufferVal)
|
|
bufStats.bigMaxBufferVal = bigllVal;
|
|
|
|
pVal = &bigllVal;
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED BIG INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_ULONGLONG:
|
|
{
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
ullVal = column.fDefaultUInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
ullVal = joblist::UBIGINTNULL;
|
|
pVal = &ullVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ullVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&ullVal, field, sizeof(ullVal));
|
|
}
|
|
else
|
|
{
|
|
// Check for negative. strtoull doesn't do this for us.
|
|
// I considered using boost::trim_left here, but part of the
|
|
// exercise is to minimize cpu cycles, so I do it the old
|
|
// fashioned way. isspace() uses more cycles than direct
|
|
// compare to ' ', '\t', etc. but the payoff is that it
|
|
// works with Locale, so it ought to work well with utf-8
|
|
// input.
|
|
int idx1;
|
|
|
|
for (idx1 = 0; idx1 < fieldLength; idx1++)
|
|
{
|
|
if (!isspace(field[idx1]))
|
|
break;
|
|
}
|
|
|
|
if ((idx1 < fieldLength) && (field[idx1] == '-'))
|
|
{
|
|
ullVal = static_cast<uint64_t>(column.fMinIntSat);
|
|
bSatVal = true;
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
|
|
ullVal = strtoull(field, 0, 10);
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Saturate the value
|
|
if (ullVal > column.fMaxIntSat)
|
|
{
|
|
ullVal = column.fMaxIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (ullVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
ullVal = 1;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (ullVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = static_cast<int64_t>(ullVal);
|
|
|
|
if (ullVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = static_cast<int64_t>(ullVal);
|
|
|
|
pVal = &ullVal;
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED MEDIUM INTEGER AND UNSIGNED INTEGER
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_UMEDINT:
|
|
case WriteEngine::WR_UINT:
|
|
{
|
|
int64_t origVal;
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
uiVal = joblist::UINTNULL;
|
|
pVal = &uiVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
unsigned int iVal2;
|
|
memcpy(&iVal2, field, sizeof(iVal2));
|
|
origVal = iVal2;
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
|
|
origVal = strtoll(field, 0, 10);
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
|
|
// Saturate the value (saturates any negative value to 0)
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal == 0 && isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
origVal = 1;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
uiVal = origVal;
|
|
pVal = &uiVal;
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// MEDIUM INTEGER AND INTEGER
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_MEDINT:
|
|
case WriteEngine::WR_INT:
|
|
default:
|
|
{
|
|
if (column.dataType != CalpontSystemCatalog::DATE)
|
|
{
|
|
long long origVal;
|
|
bool bSatVal = false;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
iVal = joblist::INTNULL;
|
|
pVal = &iVal;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
int iVal2;
|
|
memcpy(&iVal2, field, sizeof(iVal2));
|
|
origVal = iVal2;
|
|
}
|
|
else
|
|
{
|
|
if (isTrueWord(const_cast<const char*>(field), fieldLength))
|
|
{
|
|
strcpy(field, "1");
|
|
fieldLength = 1;
|
|
}
|
|
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
// errno is initialized and set in convertDecimalString
|
|
origVal = Convertor::convertDecimalString(field, fieldLength, column.scale);
|
|
}
|
|
else
|
|
{
|
|
errno = 0;
|
|
origVal = strtol(field, 0, 10);
|
|
}
|
|
|
|
if (errno == ERANGE)
|
|
bSatVal = true;
|
|
}
|
|
}
|
|
|
|
// Saturate the value
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (origVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (origVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
iVal = (int)origVal;
|
|
pVal = &iVal;
|
|
}
|
|
else
|
|
{
|
|
// date conversion
|
|
int rc = 0;
|
|
|
|
if (nullFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
iDate = column.fDefaultInt;
|
|
// fall through to update saturation and min/max
|
|
}
|
|
else
|
|
{
|
|
iDate = joblist::DATENULL;
|
|
pVal = &iDate;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_TEXT)
|
|
{
|
|
memcpy(&iDate, field, sizeof(iDate));
|
|
|
|
if (!dataconvert::DataConvert::isColumnDateValid(iDate))
|
|
rc = -1;
|
|
}
|
|
else
|
|
{
|
|
iDate = dataconvert::DataConvert::convertColumnDate(field, dataconvert::CALPONTDATE_ENUM, rc,
|
|
fieldLength);
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (iDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = iDate;
|
|
|
|
if (iDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = iDate;
|
|
}
|
|
else
|
|
{
|
|
iDate = 0;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &iDate;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
memcpy(output, pVal, width);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parse the contents of the Read buffer based on whether it is a dictionary
|
|
// column or not.
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoadBuffer::parse(ColumnInfo& columnInfo)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
// Rather than locking fSyncUpdatesBLB for the entire life of parse(),
|
|
// we only briefly lock, and force a synchronization with the relevant
|
|
// class variables from reader threads (by copying to Parser specific
|
|
// variables). It should be okay to reference a copy of these variables
|
|
// as no other thread should be changing them while we are in parse().
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
|
fTotalReadRowsParser = fTotalReadRows;
|
|
fStartRowParser = fStartRow;
|
|
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
{
|
|
fDataParser = fData;
|
|
fTokensParser = fTokens;
|
|
}
|
|
else
|
|
{
|
|
fParquetBatchParser = fParquetBatch;
|
|
}
|
|
|
|
fStartRowForLoggingParser = fStartRowForLogging;
|
|
fAutoIncGenCountParser = fAutoIncGenCount;
|
|
}
|
|
|
|
// Bug806 - If buffer is empty then return early.
|
|
if (fTotalReadRowsParser == 0)
|
|
return rc;
|
|
|
|
// If this is the first batch of rows, create the starting DB file
|
|
// if this PM did not have a DB file (delayed file creation).
|
|
RETURN_ON_ERROR(columnInfo.createDelayedFileIfNeeded(fTableName));
|
|
|
|
if (columnInfo.column.colType == COL_TYPE_DICT)
|
|
{
|
|
rc = parseDict(columnInfo);
|
|
}
|
|
else
|
|
{
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
rc = parseCol(columnInfo);
|
|
else
|
|
rc = parseColParquet(columnInfo);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
// Parse the data and fill up a buffer; which is written to output file
|
|
uint32_t nRowsParsed;
|
|
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
oss << "ColResSecIn: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << fStartRowParser << " "
|
|
<< fTotalReadRowsParser;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
ColumnBufferSection* section = 0;
|
|
RID lastInputRowInExtent;
|
|
RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(fStartRowParser, fTotalReadRowsParser, nRowsParsed,
|
|
§ion, lastInputRowInExtent));
|
|
unsigned int columnId = columnInfo.id;
|
|
|
|
int64_t nullCount = 0;
|
|
bool isNonAuxColumn = columnId < fNumberOfColumns - 1;
|
|
if (isNonAuxColumn)
|
|
{
|
|
nullCount = fParquetBatchParser->column(columnId)->null_count();
|
|
}
|
|
|
|
if (nRowsParsed > 0)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_PARSE_COL);
|
|
#endif
|
|
|
|
// Reserve auto-increment numbers we need to generate
|
|
if ((columnInfo.column.autoIncFlag) && (nullCount > 0))
|
|
{
|
|
rc = columnInfo.reserveAutoIncNums(nullCount, fAutoIncNextValue);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseCol: error generating auto-increment values "
|
|
"for table-"
|
|
<< fTableName << ", column-" << columnInfo.column.colName << "; OID-" << columnInfo.column.mapOid
|
|
<< "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
BulkLoad::addErrorMsg2BrmUpdater(fTableName, oss);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// create a buffer for the size of the rows being written.
|
|
unsigned char* buf = new unsigned char[fTotalReadRowsParser * columnInfo.column.width];
|
|
|
|
// Initialize min/max buffer values. We initialize to a sufficient
|
|
// range to force the first value to automatically update the range.
|
|
// If we are managing char data, minBufferVal and maxBufferVal are
|
|
// maintained in reverse byte order to facilitate string comparisons
|
|
BLBufferStats bufStats(columnInfo.column.dataType);
|
|
bool updateCPInfoPendingFlag = false;
|
|
|
|
// Point column data in one batch
|
|
std::shared_ptr<arrow::Array> columnData;
|
|
|
|
// not aux column
|
|
if (isNonAuxColumn)
|
|
{
|
|
columnData = fParquetBatch->column(columnId);
|
|
}
|
|
else // aux column
|
|
{
|
|
try
|
|
{
|
|
arrow::NullBuilder nullBuilder;
|
|
PARQUET_THROW_NOT_OK(nullBuilder.Reserve(fTotalReadRowsParser));
|
|
PARQUET_THROW_NOT_OK(nullBuilder.AppendNulls(fTotalReadRowsParser));
|
|
PARQUET_THROW_NOT_OK(nullBuilder.Finish(&columnData));
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Error in creating aux column when importing.";
|
|
fLog->logMsg(oss.str(), ERR_PARQUET_AUX, MSGLVL_ERROR);
|
|
|
|
return ERR_PARQUET_AUX;
|
|
}
|
|
}
|
|
|
|
convertParquet(columnData, buf, columnInfo.column, bufStats, lastInputRowInExtent, columnInfo,
|
|
updateCPInfoPendingFlag, section);
|
|
|
|
if (updateCPInfoPendingFlag)
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
else
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
}
|
|
|
|
if (bufStats.satCount) // @bug 3504: increment row saturation count
|
|
{
|
|
// If we don't want to allow saturated values for auto inc columns.
|
|
// then this is where we handle it. Too late to reject a single
|
|
// row from the parsing thread, so we abort the job.
|
|
// if (columnInfo.column.autoIncFlag)
|
|
//{
|
|
// rc = ERR_AUTOINC_USER_OUT_OF_RANGE;
|
|
// WErrorCodes ec;
|
|
// ostringstream oss;
|
|
// oss << "parseCol: error with auto-increment values "
|
|
// "for table-" << fTableName <<
|
|
// ", column-" << columnInfo.column.colName <<
|
|
// "; OID-" << columnInfo.column.mapOid <<
|
|
// "; " << ec.errorString(rc);
|
|
// fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
|
|
// return rc;
|
|
//}
|
|
columnInfo.incSaturatedCnt(bufStats.satCount);
|
|
}
|
|
|
|
section->write(buf, fTotalReadRowsParser);
|
|
delete[] buf;
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_PARSE_COL);
|
|
#endif
|
|
|
|
// TODO MCOL-641 Add support here.
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
RID rid1 = section->startRowId();
|
|
RID rid2 = section->endRowId();
|
|
oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows2: " << rid1 << " "
|
|
<< (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset()
|
|
<< "; lastExtentRow: " << lastInputRowInExtent;
|
|
parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);
|
|
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section));
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------------
|
|
// Convert arrow/parquet column data
|
|
// columnData (in) - the input column data of one batch
|
|
// column (in) - the column information
|
|
// bufStats:
|
|
// minBufferVal (in/out) - ongoing min value for the Read buffer we are parsing
|
|
// maxBufferVal (in/out) - ongoing max value for the Read buffer we are parsing
|
|
// satCount (in/out) - ongoing saturation row count for buffer being parsed
|
|
// buf (out) - the parsed values take from columnData
|
|
// fTotalReadRowsParser (in) - current batch size(row number)
|
|
// fAutoIncNextValue (in) - first auto increment number of this batch
|
|
//-----------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::convertParquet(std::shared_ptr<arrow::Array> columnData, unsigned char* buf,
|
|
const JobColumn& column, BLBufferStats& bufStats,
|
|
RID& lastInputRowInExtent, ColumnInfo& columnInfo,
|
|
bool& updateCPInfoPendingFlag, ColumnBufferSection* section)
|
|
{
|
|
char biVal;
|
|
int iVal;
|
|
float fVal;
|
|
double dVal;
|
|
short siVal;
|
|
void* pVal;
|
|
int32_t iDate;
|
|
long long llVal = 0, llDate = 0;
|
|
int128_t bigllVal = 0;
|
|
uint64_t tmp64;
|
|
uint32_t tmp32;
|
|
uint8_t ubiVal;
|
|
uint16_t usiVal;
|
|
uint32_t uiVal;
|
|
uint64_t ullVal;
|
|
|
|
int width = column.width;
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Parse based on column data type
|
|
//--------------------------------------------------------------------------
|
|
switch (column.weType)
|
|
{
|
|
//----------------------------------------------------------------------
|
|
// FLOAT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_FLOAT:
|
|
{
|
|
const float* dataPtr = columnData->data()->GetValues<float>(1);
|
|
|
|
for (uint32_t i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
void* p = buf + i * width;
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
fVal = column.fDefaultDbl;
|
|
pVal = &fVal;
|
|
}
|
|
else
|
|
{
|
|
tmp32 = joblist::FLOATNULL;
|
|
pVal = &tmp32;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
float minFltSat = column.fMinDblSat;
|
|
float maxFltSat = column.fMaxDblSat;
|
|
memcpy(&fVal, dataPtr + i, width);
|
|
|
|
if (fVal > maxFltSat)
|
|
{
|
|
fVal = maxFltSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (fVal < minFltSat)
|
|
{
|
|
fVal = minFltSat;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &fVal;
|
|
}
|
|
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// DOUBLE
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_DOUBLE:
|
|
{
|
|
const double* dataPtr = columnData->data()->GetValues<double>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
dVal = column.fDefaultDbl;
|
|
pVal = &dVal;
|
|
}
|
|
else
|
|
{
|
|
tmp64 = joblist::DOUBLENULL;
|
|
pVal = &tmp64;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
memcpy(&dVal, dataPtr + i, width);
|
|
|
|
if (dVal > column.fMaxDblSat)
|
|
{
|
|
dVal = column.fMaxDblSat;
|
|
bufStats.satCount++;
|
|
}
|
|
else if (dVal < column.fMinDblSat)
|
|
{
|
|
dVal = column.fMinDblSat;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &dVal;
|
|
}
|
|
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// CHARACTER
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_CHAR:
|
|
{
|
|
std::shared_ptr<arrow::BinaryArray> binaryArray;
|
|
std::shared_ptr<arrow::FixedSizeBinaryArray> fixedSizeBinaryArray;
|
|
int tokenLen;
|
|
|
|
if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY)
|
|
binaryArray = std::static_pointer_cast<arrow::BinaryArray>(columnData);
|
|
else
|
|
fixedSizeBinaryArray = std::static_pointer_cast<arrow::FixedSizeBinaryArray>(columnData);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
char charTmpBuf[MAX_COLUMN_BOUNDARY + 1] = {0};
|
|
void* p = buf + width * i;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
int defLen = column.fDefaultChr.length();
|
|
const char* defData = column.fDefaultChr.str();
|
|
|
|
if (defLen > column.definedWidth)
|
|
memcpy(charTmpBuf, defData, column.definedWidth);
|
|
else
|
|
memcpy(charTmpBuf, defData, defLen);
|
|
}
|
|
else
|
|
{
|
|
idbassert(width <= 8);
|
|
for (int j = 0; j < width - 1; j++)
|
|
{
|
|
charTmpBuf[j] = '\377';
|
|
}
|
|
charTmpBuf[width - 1] = '\376';
|
|
pVal = charTmpBuf;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
const uint8_t* data;
|
|
if (binaryArray != nullptr)
|
|
{
|
|
data = binaryArray->GetValue(i, &tokenLen);
|
|
}
|
|
else
|
|
{
|
|
data = fixedSizeBinaryArray->GetValue(i);
|
|
std::shared_ptr<arrow::DataType> tType = fixedSizeBinaryArray->type();
|
|
tokenLen = tType->byte_width();
|
|
}
|
|
|
|
const char* dataPtr = reinterpret_cast<const char*>(data);
|
|
|
|
if (tokenLen > column.definedWidth)
|
|
{
|
|
uint8_t truncate_point = utf8::utf8_truncate_point(dataPtr, column.definedWidth);
|
|
memcpy(charTmpBuf, dataPtr, column.definedWidth - truncate_point);
|
|
bufStats.satCount++;
|
|
}
|
|
else
|
|
{
|
|
memcpy(charTmpBuf, dataPtr, tokenLen);
|
|
}
|
|
}
|
|
|
|
uint64_t compChar = uint64ToStr(*(reinterpret_cast<uint64_t*>(charTmpBuf)));
|
|
int64_t binChar = static_cast<int64_t>(compChar);
|
|
|
|
// Update min/max range
|
|
uint64_t minVal = static_cast<uint64_t>(bufStats.minBufferVal);
|
|
uint64_t maxVal = static_cast<uint64_t>(bufStats.maxBufferVal);
|
|
|
|
if (compChar < minVal)
|
|
bufStats.minBufferVal = binChar;
|
|
if (compChar > maxVal)
|
|
bufStats.maxBufferVal = binChar;
|
|
|
|
pVal = charTmpBuf;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// SHORT INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_SHORT:
|
|
{
|
|
long long origVal;
|
|
const short* dataPtr = columnData->data()->GetValues<short>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
siVal = joblist::SMALLINTNULL;
|
|
pVal = &siVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
const int128_t* dataPtr1 = reinterpret_cast<const int128_t*>(dataPtr);
|
|
origVal = *(dataPtr1 + i);
|
|
}
|
|
else
|
|
{
|
|
origVal = *(dataPtr + i);
|
|
}
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
if (origVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = origVal;
|
|
if (origVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
siVal = origVal;
|
|
pVal = &siVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED SHORT INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_USHORT:
|
|
{
|
|
int64_t origVal = 0;
|
|
const uint16_t* dataPtr = columnData->data()->GetValues<uint16_t>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
}
|
|
else
|
|
{
|
|
usiVal = joblist::USMALLINTNULL;
|
|
pVal = &usiVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = *(dataPtr + i);
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
usiVal = origVal;
|
|
pVal = &usiVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// TINY INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_BYTE:
|
|
{
|
|
long long origVal;
|
|
// if use int8_t here, it will take 8 bool value of parquet array
|
|
std::shared_ptr<arrow::BooleanArray> boolArray =
|
|
std::static_pointer_cast<arrow::BooleanArray>(columnData);
|
|
const int8_t* dataPtr = columnData->data()->GetValues<int8_t>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
biVal = joblist::TINYINTNULL;
|
|
pVal = &biVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
const int128_t* dataPtr1 = reinterpret_cast<const int128_t*>(dataPtr);
|
|
origVal = *(dataPtr1 + i);
|
|
}
|
|
else if (columnData->type_id() == arrow::Type::type::BOOL)
|
|
{
|
|
origVal = boolArray->Value(i);
|
|
}
|
|
else
|
|
{
|
|
origVal = *(dataPtr + i);
|
|
}
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
if (origVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (origVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
biVal = origVal;
|
|
pVal = &biVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED TINY INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_UBYTE:
|
|
{
|
|
int64_t origVal = 0;
|
|
|
|
// special handling for aux column to fix segmentation error
|
|
if (columnData->type_id() != arrow::Type::type::NA)
|
|
{
|
|
const uint8_t* dataPtr = columnData->data()->GetValues<uint8_t>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
}
|
|
else
|
|
{
|
|
ubiVal = joblist::UTINYINTNULL;
|
|
pVal = &ubiVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = *(dataPtr + i);
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
ubiVal = origVal;
|
|
pVal = &ubiVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
// this condition is for aux column which is default null
|
|
// no auto increment here
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
}
|
|
else
|
|
{
|
|
ubiVal = joblist::UTINYINTNULL;
|
|
pVal = &ubiVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
ubiVal = origVal;
|
|
pVal = &ubiVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// BIG INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_LONGLONG:
|
|
{
|
|
if (column.dataType != CalpontSystemCatalog::DATETIME &&
|
|
column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME)
|
|
{
|
|
const long long* dataPtr = columnData->data()->GetValues<long long>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
void* p = buf + i * width;
|
|
bool bSatVal = false;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llVal = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
llVal = joblist::BIGINTNULL;
|
|
pVal = &llVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
llVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
const int128_t* dataPtr1 = reinterpret_cast<const int128_t*>(dataPtr);
|
|
llVal = *(dataPtr1 + i);
|
|
}
|
|
else
|
|
{
|
|
llVal = *(dataPtr + i);
|
|
}
|
|
}
|
|
|
|
if (llVal < column.fMinIntSat)
|
|
{
|
|
llVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (llVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
llVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
if (llVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llVal;
|
|
|
|
if (llVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llVal;
|
|
|
|
pVal = &llVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
datatypes::TypeAttributesStd dummyTypeAttribute;
|
|
const auto* typeHandler = datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute);
|
|
|
|
if (column.dataType == CalpontSystemCatalog::TIME)
|
|
{
|
|
// time conversion here
|
|
int rc = 0;
|
|
|
|
// for parquet, there are two time type, time32 and time64
|
|
// if it's time32, unit is millisecond, int32
|
|
if (columnData->type_id() == arrow::Type::type::TIME32 ||
|
|
columnData->type_id() == arrow::Type::type::NA)
|
|
{
|
|
std::shared_ptr<arrow::Time32Array> timeArray =
|
|
std::static_pointer_cast<arrow::Time32Array>(columnData);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::TIMENULL;
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// timeVal is millisecond since midnight
|
|
int32_t timeVal = timeArray->Value(i);
|
|
const datatypes::TypeHandlerTime* typeHandlerTime =
|
|
dynamic_cast<const datatypes::TypeHandlerTime*>(typeHandler);
|
|
idbassert(typeHandlerTime);
|
|
|
|
llDate = typeHandlerTime->convertArrowColumnTime32(timeVal, rc);
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section,
|
|
i);
|
|
}
|
|
}
|
|
}
|
|
// if it's time64, unit is microsecond, int64
|
|
else if (columnData->type_id() == arrow::Type::type::TIME64)
|
|
{
|
|
std::shared_ptr<arrow::Time64Array> timeArray =
|
|
std::static_pointer_cast<arrow::Time64Array>(columnData);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::TIMENULL;
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// timeVal is macrosecond since midnight
|
|
int64_t timeVal = timeArray->Value(i);
|
|
const datatypes::TypeHandlerTime* typeHandlerTime =
|
|
dynamic_cast<const datatypes::TypeHandlerTime*>(typeHandler);
|
|
idbassert(typeHandlerTime);
|
|
|
|
llDate = typeHandlerTime->convertArrowColumnTime64(timeVal, rc);
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section,
|
|
i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if (column.dataType == CalpontSystemCatalog::TIMESTAMP)
|
|
{
|
|
// timestamp conversion here
|
|
// default column type is TIMESTAMP
|
|
// default unit is millisecond
|
|
std::shared_ptr<arrow::TimestampArray> timeStampArray =
|
|
std::static_pointer_cast<arrow::TimestampArray>(columnData);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
int rc = 0;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::TIMESTAMPNULL;
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
int64_t timeVal = timeStampArray->Value(i);
|
|
std::shared_ptr<arrow::TimestampType> fType =
|
|
std::static_pointer_cast<arrow::TimestampType>(columnData->type());
|
|
|
|
const datatypes::TypeHandlerTimestamp* typeHandlerTimestamp =
|
|
dynamic_cast<const datatypes::TypeHandlerTimestamp*>(typeHandler);
|
|
idbassert(typeHandlerTimestamp);
|
|
|
|
if (fType->unit() == arrow::TimeUnit::MILLI)
|
|
{
|
|
llDate = typeHandlerTimestamp->convertArrowColumnTimestamp(timeVal, rc);
|
|
}
|
|
else
|
|
{
|
|
llDate = typeHandlerTimestamp->convertArrowColumnTimestampUs(timeVal, rc);
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
llDate = 0;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// datetime conversion here
|
|
// default column type is TIMESTAMP
|
|
std::shared_ptr<arrow::TimestampArray> dateTimeArray =
|
|
std::static_pointer_cast<arrow::TimestampArray>(columnData);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
int rc = 0;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
llDate = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
llDate = joblist::DATETIMENULL;
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
int64_t timeVal = dateTimeArray->Value(i);
|
|
std::shared_ptr<arrow::TimestampType> fType =
|
|
std::static_pointer_cast<arrow::TimestampType>(columnData->type());
|
|
|
|
const datatypes::TypeHandlerDatetime* typeHandlerDateTime =
|
|
dynamic_cast<const datatypes::TypeHandlerDatetime*>(typeHandler);
|
|
idbassert(typeHandlerDateTime);
|
|
|
|
if (fType->unit() == arrow::TimeUnit::MILLI)
|
|
{
|
|
llDate = typeHandlerDateTime->convertArrowColumnDatetime(timeVal, rc);
|
|
}
|
|
else
|
|
{
|
|
llDate = typeHandlerDateTime->convertArrowColumnDatetimeUs(timeVal, rc);
|
|
}
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (llDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = llDate;
|
|
|
|
if (llDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = llDate;
|
|
}
|
|
else
|
|
{
|
|
llDate = 0;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &llDate;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// WIDE DECIMAL
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_BINARY:
|
|
{
|
|
std::shared_ptr<arrow::Decimal128Array> decimalArray =
|
|
std::static_pointer_cast<arrow::Decimal128Array>(columnData);
|
|
std::shared_ptr<arrow::DecimalType> fType =
|
|
std::static_pointer_cast<arrow::DecimalType>(decimalArray->type());
|
|
const int128_t* dataPtr = decimalArray->data()->GetValues<int128_t>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
void* p = buf + i * width;
|
|
bool bSatVal = false;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
bigllVal = column.fDefaultWideDecimal;
|
|
}
|
|
else
|
|
{
|
|
bigllVal = datatypes::Decimal128Null;
|
|
pVal = &bigllVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
bigllVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// data imported should match the column data type, so directly
|
|
// copy data here
|
|
memcpy(&bigllVal, dataPtr + i, sizeof(int128_t));
|
|
}
|
|
|
|
// Parquet data imported should fit its precision and
|
|
// scale, so the data won't saturate
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
if (bigllVal < bufStats.bigMinBufferVal)
|
|
bufStats.bigMinBufferVal = bigllVal;
|
|
|
|
if (bigllVal > bufStats.bigMaxBufferVal)
|
|
bufStats.bigMaxBufferVal = bigllVal;
|
|
|
|
pVal = &bigllVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED BIG INT
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_ULONGLONG:
|
|
{
|
|
const uint64_t* dataPtr = columnData->data()->GetValues<uint64_t>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
ullVal = column.fDefaultUInt;
|
|
}
|
|
else
|
|
{
|
|
ullVal = joblist::UBIGINTNULL;
|
|
pVal = &ullVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ullVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
memcpy(&ullVal, dataPtr + i, width);
|
|
}
|
|
|
|
if (ullVal > column.fMaxIntSat)
|
|
{
|
|
ullVal = column.fMaxIntSat;
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
if (ullVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = static_cast<int64_t>(ullVal);
|
|
|
|
if (ullVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = static_cast<int64_t>(ullVal);
|
|
|
|
pVal = &ullVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// UNSIGNED MEDIUM INTEGER AND UNSIGNED INTEGER
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_UMEDINT:
|
|
case WriteEngine::WR_UINT:
|
|
{
|
|
int64_t origVal;
|
|
const uint32_t* dataPtr = columnData->data()->GetValues<uint32_t>(1);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = static_cast<int64_t>(column.fDefaultUInt);
|
|
}
|
|
else
|
|
{
|
|
uiVal = joblist::UINTNULL;
|
|
pVal = &uiVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = *(dataPtr + i);
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
// Update min/max range
|
|
uint64_t uVal = origVal;
|
|
|
|
if (uVal < static_cast<uint64_t>(bufStats.minBufferVal))
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (uVal > static_cast<uint64_t>(bufStats.maxBufferVal))
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
uiVal = origVal;
|
|
pVal = &uiVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// MEDIUM INTEGER AND INTEGER
|
|
//----------------------------------------------------------------------
|
|
case WriteEngine::WR_MEDINT:
|
|
case WriteEngine::WR_INT:
|
|
default:
|
|
{
|
|
if (column.dataType != CalpontSystemCatalog::DATE)
|
|
{
|
|
const int* dataPtr = columnData->data()->GetValues<int>(1);
|
|
long long origVal;
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
bool bSatVal = false;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (!column.autoIncFlag)
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
origVal = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
iVal = joblist::INTNULL;
|
|
pVal = &iVal;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
origVal = fAutoIncNextValue++;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if ((column.dataType == CalpontSystemCatalog::DECIMAL) ||
|
|
(column.dataType == CalpontSystemCatalog::UDECIMAL))
|
|
{
|
|
const int128_t* dataPtr1 = reinterpret_cast<const int128_t*>(dataPtr);
|
|
origVal = *(dataPtr1 + i);
|
|
}
|
|
else
|
|
{
|
|
origVal = *(dataPtr + i);
|
|
}
|
|
}
|
|
|
|
if (origVal < column.fMinIntSat)
|
|
{
|
|
origVal = column.fMinIntSat;
|
|
bSatVal = true;
|
|
}
|
|
else if (origVal > static_cast<int64_t>(column.fMaxIntSat))
|
|
{
|
|
origVal = static_cast<int64_t>(column.fMaxIntSat);
|
|
bSatVal = true;
|
|
}
|
|
|
|
if (bSatVal)
|
|
bufStats.satCount++;
|
|
|
|
if (origVal < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = origVal;
|
|
|
|
if (origVal > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = origVal;
|
|
|
|
iVal = (int)origVal;
|
|
pVal = &iVal;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Parquet support.
|
|
std::shared_ptr<arrow::Date32Array> dateArray =
|
|
std::static_pointer_cast<arrow::Date32Array>(columnData);
|
|
datatypes::TypeAttributesStd dummyTypeAttribute;
|
|
const datatypes::TypeHandlerDate* typeHandlerDate = dynamic_cast<const datatypes::TypeHandlerDate*>(
|
|
datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute));
|
|
idbassert(typeHandlerDate);
|
|
|
|
for (unsigned int i = 0; i < fTotalReadRowsParser; i++)
|
|
{
|
|
int rc = 0;
|
|
void* p = buf + i * width;
|
|
|
|
if (columnData->IsNull(i))
|
|
{
|
|
if (column.fWithDefault)
|
|
{
|
|
iDate = column.fDefaultInt;
|
|
}
|
|
else
|
|
{
|
|
iDate = joblist::DATENULL;
|
|
pVal = &iDate;
|
|
memcpy(p, pVal, width);
|
|
continue;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
int32_t dayVal = dateArray->Value(i);
|
|
iDate = typeHandlerDate->convertArrowColumnDate(dayVal, rc);
|
|
}
|
|
|
|
if (rc == 0)
|
|
{
|
|
if (iDate < bufStats.minBufferVal)
|
|
bufStats.minBufferVal = iDate;
|
|
|
|
if (iDate > bufStats.maxBufferVal)
|
|
bufStats.maxBufferVal = iDate;
|
|
}
|
|
else
|
|
{
|
|
iDate = 0;
|
|
bufStats.satCount++;
|
|
}
|
|
|
|
pVal = &iDate;
|
|
memcpy(p, pVal, width);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent,
|
|
BLBufferStats& bufStats, bool& updateCPInfoPendingFlag,
|
|
ColumnBufferSection* section, uint32_t curRow)
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
else
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
|
|
// TODO MCOL-641 Add support here.
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows1: " << section->startRowId()
|
|
<< " " << curRow + 1 << "; lastExtentRow: " << lastInputRowInExtent;
|
|
parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);
|
|
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
lastInputRowInExtent += columnInfo.rowsPerExtent();
|
|
|
|
if (isUnsigned(columnInfo.column.dataType))
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
bufStats.minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
|
|
bufStats.maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
|
|
}
|
|
else
|
|
{
|
|
bufStats.bigMinBufferVal = -1;
|
|
bufStats.bigMaxBufferVal = 0;
|
|
}
|
|
updateCPInfoPendingFlag = false;
|
|
}
|
|
else
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
bufStats.minBufferVal = MAX_BIGINT;
|
|
bufStats.maxBufferVal = MIN_BIGINT;
|
|
}
|
|
else
|
|
{
|
|
utils::int128Max(bufStats.bigMinBufferVal);
|
|
utils::int128Min(bufStats.bigMaxBufferVal);
|
|
}
|
|
updateCPInfoPendingFlag = false;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parse nonDictionary column Read buffer. Parsed row values are added to
|
|
// fColBufferMgr, which stores them into an output buffer before writing them
|
|
// out to the applicable column segment file.
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoadBuffer::parseCol(ColumnInfo& columnInfo)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
// Parse the data and fill up a buffer; which is written to output file
|
|
uint32_t nRowsParsed;
|
|
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
oss << "ColResSecIn: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << fStartRowParser << " "
|
|
<< fTotalReadRowsParser;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
ColumnBufferSection* section = 0;
|
|
RID lastInputRowInExtent;
|
|
RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(fStartRowParser, fTotalReadRowsParser, nRowsParsed,
|
|
§ion, lastInputRowInExtent));
|
|
|
|
if (nRowsParsed > 0)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_PARSE_COL);
|
|
#endif
|
|
|
|
// Reserve auto-increment numbers we need to generate
|
|
if ((columnInfo.column.autoIncFlag) && (fAutoIncGenCountParser > 0))
|
|
{
|
|
rc = columnInfo.reserveAutoIncNums(fAutoIncGenCountParser, fAutoIncNextValue);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseCol: error generating auto-increment values "
|
|
"for table-"
|
|
<< fTableName << ", column-" << columnInfo.column.colName << "; OID-" << columnInfo.column.mapOid
|
|
<< "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
BulkLoad::addErrorMsg2BrmUpdater(fTableName, oss);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// create a buffer for the size of the rows being written.
|
|
unsigned char* buf = new unsigned char[fTotalReadRowsParser * columnInfo.column.width];
|
|
char* field = new char[MAX_FIELD_SIZE + 1];
|
|
|
|
// Initialize min/max buffer values. We initialize to a sufficient
|
|
// range to force the first value to automatically update the range.
|
|
// If we are managing char data, minBufferVal and maxBufferVal are
|
|
// maintained in reverse byte order to facilitate string comparisons
|
|
BLBufferStats bufStats(columnInfo.column.dataType);
|
|
bool updateCPInfoPendingFlag = false;
|
|
|
|
int tokenLength = 0;
|
|
bool tokenNullFlag = false;
|
|
|
|
for (uint32_t i = 0; i < fTotalReadRowsParser; ++i)
|
|
{
|
|
char* p = fDataParser + fTokensParser[i][columnInfo.id].start;
|
|
|
|
if (fTokensParser[i][columnInfo.id].offset > 0)
|
|
{
|
|
memcpy(field, p, fTokensParser[i][columnInfo.id].offset);
|
|
field[fTokensParser[i][columnInfo.id].offset] = '\0';
|
|
tokenLength = fTokensParser[i][columnInfo.id].offset;
|
|
tokenNullFlag = false;
|
|
}
|
|
else
|
|
{
|
|
field[0] = '\0';
|
|
tokenLength = 0;
|
|
tokenNullFlag = true;
|
|
}
|
|
|
|
// convert the data into appropriate format and update CP values
|
|
convert(field, tokenLength, tokenNullFlag, buf + i * columnInfo.column.width, columnInfo.column,
|
|
bufStats);
|
|
updateCPInfoPendingFlag = true;
|
|
|
|
// Update CP min/max if this is last row in this extent
|
|
if ((fStartRowParser + i) == lastInputRowInExtent)
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
else
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
|
|
// TODO MCOL-641 Add support here.
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid
|
|
<< "; StartRID/Rows1: " << section->startRowId() << " " << i + 1
|
|
<< "; lastExtentRow: " << lastInputRowInExtent;
|
|
parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);
|
|
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
lastInputRowInExtent += columnInfo.rowsPerExtent();
|
|
|
|
if (isUnsigned(columnInfo.column.dataType))
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
bufStats.minBufferVal = static_cast<int64_t>(MAX_UBIGINT);
|
|
bufStats.maxBufferVal = static_cast<int64_t>(MIN_UBIGINT);
|
|
}
|
|
else
|
|
{
|
|
bufStats.bigMinBufferVal = -1;
|
|
bufStats.bigMaxBufferVal = 0;
|
|
}
|
|
updateCPInfoPendingFlag = false;
|
|
}
|
|
else
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
bufStats.minBufferVal = MAX_BIGINT;
|
|
bufStats.maxBufferVal = MIN_BIGINT;
|
|
}
|
|
else
|
|
{
|
|
utils::int128Max(bufStats.bigMinBufferVal);
|
|
utils::int128Min(bufStats.bigMaxBufferVal);
|
|
}
|
|
updateCPInfoPendingFlag = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (updateCPInfoPendingFlag)
|
|
{
|
|
if (columnInfo.column.width <= 8)
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
else
|
|
{
|
|
columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal,
|
|
columnInfo.column.dataType, columnInfo.column.width);
|
|
}
|
|
}
|
|
|
|
if (bufStats.satCount) // @bug 3504: increment row saturation count
|
|
{
|
|
// If we don't want to allow saturated values for auto inc columns.
|
|
// then this is where we handle it. Too late to reject a single
|
|
// row from the parsing thread, so we abort the job.
|
|
// if (columnInfo.column.autoIncFlag)
|
|
//{
|
|
// rc = ERR_AUTOINC_USER_OUT_OF_RANGE;
|
|
// WErrorCodes ec;
|
|
// ostringstream oss;
|
|
// oss << "parseCol: error with auto-increment values "
|
|
// "for table-" << fTableName <<
|
|
// ", column-" << columnInfo.column.colName <<
|
|
// "; OID-" << columnInfo.column.mapOid <<
|
|
// "; " << ec.errorString(rc);
|
|
// fLog->logMsg( oss.str(), rc, MSGLVL_ERROR );
|
|
// return rc;
|
|
//}
|
|
columnInfo.incSaturatedCnt(bufStats.satCount);
|
|
}
|
|
|
|
delete[] field;
|
|
section->write(buf, fTotalReadRowsParser);
|
|
delete[] buf;
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_PARSE_COL);
|
|
#endif
|
|
|
|
// TODO MCOL-641 Add support here.
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
RID rid1 = section->startRowId();
|
|
RID rid2 = section->endRowId();
|
|
oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows2: " << rid1 << " "
|
|
<< (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset()
|
|
<< "; lastExtentRow: " << lastInputRowInExtent;
|
|
parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal);
|
|
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section));
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Log the specified min/max buffer values to the log file. This is straight
|
|
// forward for numeric types, but for character data, we have to reverse the
|
|
// order of min/max values, because they are maintained in reverse order to
|
|
// facilitate the comparison of character strings in an int64_t variable.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::parseColLogMinMax(ostringstream& oss, ColDataType colDataType, int64_t minBufferVal,
|
|
int64_t maxBufferVal) const
|
|
{
|
|
if (isCharType(colDataType))
|
|
{
|
|
// Swap/restore byte order before printing character string
|
|
int64_t minVal = static_cast<int64_t>(uint64ToStr(static_cast<uint64_t>(minBufferVal)));
|
|
int64_t maxVal = static_cast<int64_t>(uint64ToStr(static_cast<uint64_t>(maxBufferVal)));
|
|
char minValStr[sizeof(int64_t) + 1];
|
|
char maxValStr[sizeof(int64_t) + 1];
|
|
memcpy(minValStr, &minVal, sizeof(int64_t));
|
|
memcpy(maxValStr, &maxVal, sizeof(int64_t));
|
|
minValStr[sizeof(int64_t)] = '\0';
|
|
maxValStr[sizeof(int64_t)] = '\0';
|
|
oss << "; minVal: " << minVal << "; (" << minValStr << ")"
|
|
<< "; maxVal: " << maxVal << "; (" << maxValStr << ")";
|
|
}
|
|
else if (isUnsigned(colDataType))
|
|
{
|
|
oss << "; minVal: " << static_cast<uint64_t>(minBufferVal)
|
|
<< "; maxVal: " << static_cast<uint64_t>(maxBufferVal);
|
|
}
|
|
else
|
|
{
|
|
oss << "; minVal: " << minBufferVal << "; maxVal: " << maxBufferVal;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parse Dictionary column Read buffer. Parsed row values are added to
|
|
// fColBufferMgr, which stores them into an output buffer before writing them
|
|
// out to the applicable column segment (token) file. This gets a little sticky
|
|
// here if the amount of data (in the column file) crosses an extent boundary.
|
|
// In this case, we have to split up the tokens into 2 column segment files
|
|
// and of course split up the corresponding strings into 2 different dictionary
|
|
// store files as well.
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoadBuffer::parseDict(ColumnInfo& columnInfo)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
uint32_t nRowsParsed1;
|
|
rc = parseDictSection(columnInfo, 0, fStartRowParser, fTotalReadRowsParser, nRowsParsed1);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseDict: error parsing section1: "
|
|
<< " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
//..If fTotalReadRows != nRowsParsed1 then reserveInSection() had to
|
|
// split up our input buffer tokens because they spanned 2 extents.
|
|
// After exiting reserveSection() above, we no longer have a mutex
|
|
// lock on the sections in the internal buffer, so you might think
|
|
// this could cause a race condition with more rows being added to
|
|
// the buffer by other parsing threads, while we are busy wrapping
|
|
// up the first extent and creating the second. But since reserve-
|
|
// Section() only took some of the rows from the Read buffer, any
|
|
// other threads should be blocked waiting for us to add the remain-
|
|
// ing rows from "this" Read buffer into a new ColumnBufferSection.
|
|
// The following condition wait in reserveSection() should be keeping
|
|
// things stable:
|
|
// while((fMaxRowId + 1) != startRowId) {
|
|
// //Making sure that allocation are made in order
|
|
// fOutOfSequence.wait(lock);
|
|
// }
|
|
|
|
if (fTotalReadRowsParser != nRowsParsed1)
|
|
{
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
ostringstream oss;
|
|
oss << "parseDict breaking up bufsec for OID-" << columnInfo.curCol.dataFile.fid << "; file-"
|
|
<< columnInfo.curCol.dataFile.fSegFileName << "; totalInRows-" << fTotalReadRowsParser
|
|
<< "; rowsFlushedToEndExtent-" << nRowsParsed1;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
//..Flush the rows in the buffer that fill up the current extent
|
|
rc = columnInfo.fColBufferMgr->intermediateFlush();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseDict: error flushing column: "
|
|
<< " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
//..See if we just finished filling in the last extent for this seg-
|
|
// ment token file, in which case we can truncate the corresponding
|
|
// dictionary store segment file. (this only affects compressed data).
|
|
uint16_t root = columnInfo.curCol.dataFile.fDbRoot;
|
|
uint32_t pNum = columnInfo.curCol.dataFile.fPartition;
|
|
uint16_t sNum = columnInfo.curCol.dataFile.fSegment;
|
|
bool bFileComplete = columnInfo.isFileComplete();
|
|
|
|
//..Close the current segment file, and add an extent to the next
|
|
// segment file in the rotation sequence. newSegmentFile is a
|
|
// FILE* that points to the newly opened segment file.
|
|
rc = columnInfo.fColBufferMgr->extendTokenColumn();
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseDict: error extending column: "
|
|
<< " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
//..Close current dictionary store file and open the dictionary
|
|
// store file that will match the newly opened column segment file.
|
|
rc = columnInfo.closeDctnryStore(false);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseDict: error closing store file: "
|
|
<< " OID-" << columnInfo.column.dctnry.dctnryOid << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
rc = columnInfo.openDctnryStore(false);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseDict: error opening store file: "
|
|
<< " OID-" << columnInfo.column.dctnry.dctnryOid << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
|
|
// Ignore return code from closing file; already in error state
|
|
columnInfo.closeDctnryStore(true); // clean up loose ends
|
|
return rc;
|
|
}
|
|
|
|
//..Now we can add the remaining rows in the current Read buffer to
|
|
// to the output buffer destined for the next extent we just added.
|
|
uint32_t nRowsParsed2;
|
|
rc = parseDictSection(columnInfo, nRowsParsed1, (fStartRowParser + nRowsParsed1),
|
|
(fTotalReadRowsParser - nRowsParsed1), nRowsParsed2);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
WErrorCodes ec;
|
|
ostringstream oss;
|
|
oss << "parseDict: error parsing section2: "
|
|
<< " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc);
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
//..We went ahead and completed all the necessary parsing to free up
|
|
// the buffer we were working on, so that any blocked threads can
|
|
// continue. In the mean time, this thread can now go back and
|
|
// truncate the dctnry store file we just completed, if applicable.
|
|
if (bFileComplete)
|
|
{
|
|
rc = columnInfo.truncateDctnryStore(columnInfo.column.dctnry.dctnryOid, root, pNum, sNum);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parses all or part of a Dictionary Read buffer into a ColumnBufferSection,
|
|
// depending on whether the buffer crosses an extent boundary or not. If it
|
|
// crosses such a boundary, then parseDictSection() will only parse the buffer
|
|
// up to the end of the current extent. A second call to parseDictSection()
|
|
// should be made to parse the remainder of the buffer into the second extent.
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, int tokenPos, RID startRow,
|
|
uint32_t totalReadRows, uint32_t& nRowsParsed)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
oss << "DctResSecIn: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << startRow << " "
|
|
<< totalReadRows;
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
ColumnBufferSection* section = 0;
|
|
RID lastInputRowInExtent = 0;
|
|
RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(startRow, totalReadRows, nRowsParsed, §ion,
|
|
lastInputRowInExtent));
|
|
|
|
if (nRowsParsed > 0)
|
|
{
|
|
char* tokenBuf = new char[nRowsParsed * 8];
|
|
|
|
if (fImportDataMode != IMPORT_DATA_PARQUET)
|
|
{
|
|
// Pass fDataParser data and fTokensParser meta data to dictionary
|
|
// to be parsed and tokenized, with tokens returned in tokenBuf.
|
|
rc = columnInfo.updateDctnryStore(fDataParser, &fTokensParser[tokenPos], nRowsParsed, tokenBuf);
|
|
}
|
|
else
|
|
{
|
|
// Pass columnData and tokenPos data to dictionary to be parsed and tokenized
|
|
// with tokens returned in tokenBuf.
|
|
std::shared_ptr<arrow::Array> columnData = fParquetBatchParser->column(columnInfo.id);
|
|
rc = columnInfo.updateDctnryStoreParquet(columnData, tokenPos, nRowsParsed, tokenBuf);
|
|
}
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
#if 0
|
|
int64_t* tokenVals = reinterpret_cast<int64_t*>(tokenBuf);
|
|
|
|
for (unsigned int j = 0; j < nRowsParsed; j++)
|
|
{
|
|
if (tokenVals[j] == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Warning: 0 token being stored for OID-" <<
|
|
columnInfo.curCol.dataFile.fid << "; file-" <<
|
|
columnInfo.curCol.dataFile.fSegFileName <<
|
|
"; input row number-" << fStartRowForLoggingParser + j;
|
|
fLog->logMsg( oss.str(), MSGLVL_INFO1 );
|
|
}
|
|
}
|
|
|
|
#endif
|
|
section->write(tokenBuf, nRowsParsed);
|
|
delete[] tokenBuf;
|
|
|
|
if (fLog->isDebug(DEBUG_2))
|
|
{
|
|
ostringstream oss;
|
|
RID rid1 = section->startRowId();
|
|
RID rid2 = section->endRowId();
|
|
oss << "DctRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << rid1 << " "
|
|
<< (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset();
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section));
|
|
}
|
|
else
|
|
{
|
|
delete[] tokenBuf;
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length,
|
|
size_t* parse_length, RID& totalReadRows, RID& correctTotalRows,
|
|
const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
|
unsigned int allowedErrCntThisCall)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
|
reset();
|
|
copyOverflow(overFlowBufIn);
|
|
size_t readSize = 0;
|
|
|
|
// Copy the overflow data from the last buffer, that did not get written
|
|
if (fOverflowSize != 0)
|
|
{
|
|
memcpy(fData, fOverflowBuf, fOverflowSize);
|
|
|
|
if (fOverflowBuf != NULL)
|
|
{
|
|
delete[] fOverflowBuf;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
}
|
|
|
|
readSize = fBufferSize - fOverflowSize;
|
|
if (readSize > (length - *parse_length))
|
|
{
|
|
readSize = length - *parse_length;
|
|
}
|
|
memcpy(fData + fOverflowSize, input + *parse_length, readSize);
|
|
*parse_length += readSize;
|
|
|
|
bool bEndOfData = false;
|
|
|
|
if (length == *parse_length)
|
|
{
|
|
bEndOfData = true;
|
|
}
|
|
|
|
if (bEndOfData && // @bug 3516: Add '\n' if missing from last record
|
|
(fImportDataMode == IMPORT_DATA_TEXT)) // Only applies to ascii mode
|
|
{
|
|
if ((fOverflowSize > 0) | (readSize > 0))
|
|
{
|
|
if (fData[fOverflowSize + readSize - 1] != '\n')
|
|
{
|
|
// Should be safe to add byte to fData w/o risk of overflowing,
|
|
// since we hit EOF. That should mean fread() did not read all
|
|
// the bytes we requested, meaning we have room to add a byte.
|
|
fData[fOverflowSize + readSize] = '\n';
|
|
readSize++;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lazy allocation of fToken memory as needed
|
|
if (fTokens == 0)
|
|
{
|
|
resizeTokenArray();
|
|
}
|
|
|
|
if ((readSize > 0) || (fOverflowSize > 0))
|
|
{
|
|
if (fOverflowBuf != NULL)
|
|
{
|
|
delete[] fOverflowBuf;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
|
|
fReadSize = readSize + fOverflowSize;
|
|
fStartRow = correctTotalRows;
|
|
fStartRowForLogging = totalReadRows;
|
|
|
|
if (fImportDataMode == IMPORT_DATA_TEXT)
|
|
{
|
|
tokenize(columnsInfo, allowedErrCntThisCall);
|
|
}
|
|
else
|
|
{
|
|
int rc = tokenizeBinary(columnsInfo, allowedErrCntThisCall, bEndOfData);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
|
|
// If we read a full buffer without hitting any new lines, then
|
|
// terminate import because row size is greater than read buffer size.
|
|
if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize))
|
|
{
|
|
return ERR_BULK_ROW_FILL_BUFFER;
|
|
}
|
|
|
|
totalReadRows += fTotalReadRowsForLog;
|
|
correctTotalRows += fTotalReadRows;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
int BulkLoadBuffer::fillFromFileParquet(RID& totalReadRows, RID& correctTotalRows)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
|
reset();
|
|
|
|
try
|
|
{
|
|
PARQUET_THROW_NOT_OK(fParquetReader->ReadNext(&fParquetBatch));
|
|
fStartRow = correctTotalRows;
|
|
fStartRowForLogging = totalReadRows;
|
|
fTotalReadRows = fParquetBatch->num_rows();
|
|
fTotalReadRowsForLog = fParquetBatch->num_rows();
|
|
totalReadRows += fTotalReadRowsForLog;
|
|
correctTotalRows += fTotalReadRows;
|
|
}
|
|
catch (std::exception& ex)
|
|
{
|
|
return ERR_FILE_READ_IMPORT;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Read the next set of rows from the input import file (for the specified
|
|
// table), into "this" BulkLoadBuffer.
|
|
// totalReadRows (input/output) - total row count from tokenize() (per file)
|
|
// correctTotalRows (input/output) - total valid row count from tokenize()
|
|
// (cumulative)
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalReadRows,
|
|
RID& correctTotalRows, const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
|
unsigned int allowedErrCntThisCall)
|
|
{
|
|
boost::mutex::scoped_lock lock(fSyncUpdatesBLB);
|
|
reset();
|
|
copyOverflow(overFlowBufIn);
|
|
size_t readSize = 0;
|
|
|
|
// Copy the overflow data from the last buffer, that did not get written
|
|
if (fOverflowSize != 0)
|
|
{
|
|
memcpy(fData, fOverflowBuf, fOverflowSize);
|
|
|
|
if (fOverflowBuf != NULL)
|
|
{
|
|
delete[] fOverflowBuf;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
}
|
|
|
|
readSize = fread(fData + fOverflowSize, 1, fBufferSize - fOverflowSize, handle);
|
|
|
|
if (ferror(handle))
|
|
{
|
|
return ERR_FILE_READ_IMPORT;
|
|
}
|
|
|
|
bool bEndOfData = false;
|
|
|
|
if (feof(handle))
|
|
bEndOfData = true;
|
|
|
|
if (bEndOfData && // @bug 3516: Add '\n' if missing from last record
|
|
(fImportDataMode == IMPORT_DATA_TEXT)) // Only applies to ascii mode
|
|
{
|
|
if ((fOverflowSize > 0) | (readSize > 0))
|
|
{
|
|
if (fData[fOverflowSize + readSize - 1] != '\n')
|
|
{
|
|
// Should be safe to add byte to fData w/o risk of overflowing,
|
|
// since we hit EOF. That should mean fread() did not read all
|
|
// the bytes we requested, meaning we have room to add a byte.
|
|
fData[fOverflowSize + readSize] = '\n';
|
|
readSize++;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Lazy allocation of fToken memory as needed
|
|
if (fTokens == 0)
|
|
{
|
|
resizeTokenArray();
|
|
}
|
|
|
|
if ((readSize > 0) || (fOverflowSize > 0))
|
|
{
|
|
if (fOverflowBuf != NULL)
|
|
{
|
|
delete[] fOverflowBuf;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
|
|
fReadSize = readSize + fOverflowSize;
|
|
fStartRow = correctTotalRows;
|
|
fStartRowForLogging = totalReadRows;
|
|
|
|
if (fImportDataMode == IMPORT_DATA_TEXT)
|
|
{
|
|
tokenize(columnsInfo, allowedErrCntThisCall);
|
|
}
|
|
else
|
|
{
|
|
int rc = tokenizeBinary(columnsInfo, allowedErrCntThisCall, bEndOfData);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
|
|
// If we read a full buffer without hitting any new lines, then
|
|
// terminate import because row size is greater than read buffer size.
|
|
if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize))
|
|
{
|
|
return ERR_BULK_ROW_FILL_BUFFER;
|
|
}
|
|
|
|
totalReadRows += fTotalReadRowsForLog;
|
|
correctTotalRows += fTotalReadRows;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Parse the rows of data in "fData", saving the meta information that describes
|
|
// the parsed data, in fTokens. If the number of read parsing errors for a
|
|
// given call to tokenize() should exceed the value of "allowedErrCntThisCall",
|
|
// then tokenize() will stop reading data and exit.
|
|
//
|
|
// We parse the data using the following state machine-like table.
|
|
// Enclosed by character ("), escaped by character (\), and field delimiter
|
|
// (|) can all be overridden; but we show default values in the state table.
|
|
//
|
|
// Character(s) found and action taken
|
|
//
|
|
// Current \" or " | \n other
|
|
// State "" character
|
|
// -----------------------------------------------------------------------
|
|
// LEADING_CHAR | n/a ENCLOSED endFld endFld NORMAL
|
|
// TRAILING_CHAR | n/a n/a endFld endFld ignore
|
|
// ENCLOSED | convert TRAIL n/a n/a n/a
|
|
// NORMAL | n/a n/a endFld endFld n/a
|
|
// -----------------------------------------------------------------------
|
|
//
|
|
// n/a - not applicable; no check is made for this specific character in
|
|
// this state
|
|
// ENCLOSED - transition to ENCLOSED state
|
|
// TRAIL - transition to TRAILING_CHAR state
|
|
// NORMAL - transition to NORMAL state
|
|
// convert - convert an escaped double quote (\" or "") to a single double
|
|
// quote ("), and strip out the other character
|
|
//
|
|
// The initial parsing state for each column is LEADING_CHAR or NORMAL,
|
|
// depending on whether the user has enabled the "enclosed by" feature.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::tokenize(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
|
unsigned int allowedErrCntThisCall)
|
|
{
|
|
unsigned offset = 0; // length of field
|
|
unsigned curCol = 0; // dest db column counter within a row
|
|
unsigned curFld = 0; // src input field counter within a row
|
|
unsigned curRowNum = 0; // "total" number of rows read during this call
|
|
unsigned curRowNum1 = 0; // number of "valid" rows inserted into fTokens
|
|
char* p; // iterates thru each byte in the input buffer
|
|
char c; // value of byte at address "p".
|
|
char* lastRowHead = 0; // start of latest row being processed
|
|
bool bValidRow = true; // track whether current row is valid
|
|
bool bRowGenAutoInc = false; // track whether row uses generated auto-inc
|
|
std::string validationErrMsg; // validation error msg (if any) for current row
|
|
unsigned errorCount = 0;
|
|
const char FIELD_DELIM_CHAR = fColDelim;
|
|
const char STRING_ENCLOSED_CHAR = fEnclosedByChar;
|
|
const char ESCAPE_CHAR = fEscapeChar;
|
|
const char LINE_FEED = 0x0D;
|
|
const char CARRIAGE_RETURN = 0x0A;
|
|
|
|
// Variables used to store raw data read for a row; needed if we strip out
|
|
// enclosed char(s) and later have to print original data in a *.bad file
|
|
char* pRawDataRow = 0;
|
|
unsigned rawDataRowCapacity = 0;
|
|
unsigned rawDataRowLength = 0;
|
|
const unsigned MIN_RAW_DATA_CAP = 1024;
|
|
|
|
// Enable "enclosed by" checking if user specified an "enclosed by" char
|
|
FieldParsingState initialState = FLD_PARSE_NORMAL_STATE;
|
|
|
|
if (STRING_ENCLOSED_CHAR != '\0')
|
|
{
|
|
initialState = FLD_PARSE_LEADING_CHAR_STATE;
|
|
}
|
|
|
|
FieldParsingState fieldState = initialState;
|
|
bool bNewLine = false; // Tracks new line
|
|
unsigned start = 0; // Where next field starts in fData
|
|
unsigned idxFrom = 0; // idxFrom and idxTo are used to strip out
|
|
unsigned idxTo = 0; // escape characters in \" and ""
|
|
|
|
// Initialize which field values are enclosed
|
|
unsigned int enclosedFieldFlag = 0;
|
|
#ifdef DEBUG_TOKEN_PARSING
|
|
unsigned int enclosedFieldFlags[fNumberOfColumns];
|
|
memset(enclosedFieldFlags, 0, sizeof(unsigned) * fNumberOfColumns);
|
|
#endif
|
|
|
|
p = lastRowHead = fData;
|
|
const char* pEndOfData = p + fReadSize; //@bug3810 set an end-of-data marker
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Loop through all the bytes in the read buffer in order to construct
|
|
// the meta data stored in fTokens.
|
|
//--------------------------------------------------------------------------
|
|
while (p < pEndOfData)
|
|
{
|
|
c = *p;
|
|
|
|
// If we have stripped "enclosed" characters, then save raw data
|
|
if (rawDataRowLength > 0)
|
|
{
|
|
if (rawDataRowLength == rawDataRowCapacity) // resize array if full
|
|
{
|
|
rawDataRowCapacity = rawDataRowCapacity * 2;
|
|
resizeRowDataArray(&pRawDataRow, rawDataRowLength, rawDataRowCapacity);
|
|
}
|
|
|
|
pRawDataRow[rawDataRowLength] = c;
|
|
rawDataRowLength++;
|
|
}
|
|
|
|
//----------------------------------------------------------------------
|
|
// Branch based on current parsing state for this field.
|
|
// Note that we fall out of switch/case and do more processing if we
|
|
// have hit end of column or line; else we "continue" directly to end
|
|
// of loop to process the next byte.
|
|
//----------------------------------------------------------------------
|
|
switch (fieldState)
|
|
{
|
|
//------------------------------------------------------------------
|
|
// FLD_PARSE_NORMAL_STATE
|
|
// Field not enclosed in a string delimiter such as a double quote
|
|
//------------------------------------------------------------------
|
|
case FLD_PARSE_NORMAL_STATE:
|
|
{
|
|
if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR))
|
|
{
|
|
start = p - fData - offset;
|
|
|
|
if (c == NEWLINE_CHAR)
|
|
bNewLine = true;
|
|
}
|
|
else
|
|
{
|
|
offset++;
|
|
p++;
|
|
continue; // process next byte
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
// If state is something other than FLD_PARSE_NORMAL_STATE, then
|
|
// there is extra processing to allow for fields that may be en-
|
|
// closed within a string delimiter (such as a double quote)
|
|
|
|
//----------------------------------------------------------------
|
|
// FLD_PARSE_LEADING_CHAR_STATE
|
|
//----------------------------------------------------------------
|
|
case FLD_PARSE_LEADING_CHAR_STATE:
|
|
{
|
|
bool bNewColumn = false;
|
|
|
|
if (c == STRING_ENCLOSED_CHAR)
|
|
{
|
|
fieldState = FLD_PARSE_ENCLOSED_STATE;
|
|
idxFrom = p - fData + 1;
|
|
idxTo = idxFrom;
|
|
start = idxTo;
|
|
offset = 0;
|
|
enclosedFieldFlag = 1;
|
|
}
|
|
|
|
else if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR))
|
|
{
|
|
bNewColumn = true;
|
|
start = p - fData;
|
|
offset = 0;
|
|
|
|
if (c == NEWLINE_CHAR)
|
|
bNewLine = true;
|
|
}
|
|
|
|
else
|
|
{
|
|
fieldState = FLD_PARSE_NORMAL_STATE;
|
|
start = p - fData;
|
|
offset = 1;
|
|
}
|
|
|
|
if (!bNewColumn)
|
|
{
|
|
p++;
|
|
continue; // process next byte
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
//------------------------------------------------------------------
|
|
// FLD_PARSE_ENCLOSED_STATE
|
|
//------------------------------------------------------------------
|
|
case FLD_PARSE_ENCLOSED_STATE:
|
|
{
|
|
if ((p + 1 < pEndOfData) &&
|
|
(((c == ESCAPE_CHAR) && ((*(p + 1) == STRING_ENCLOSED_CHAR) || (*(p + 1) == ESCAPE_CHAR) ||
|
|
(*(p + 1) == LINE_FEED) || (*(p + 1) == CARRIAGE_RETURN))) ||
|
|
((c == STRING_ENCLOSED_CHAR) && (*(p + 1) == STRING_ENCLOSED_CHAR))))
|
|
{
|
|
// Create/save original data before stripping out bytes
|
|
if (rawDataRowLength == 0)
|
|
{
|
|
rawDataRowLength = (p + 1) - lastRowHead + 1;
|
|
rawDataRowCapacity = rawDataRowLength * 2;
|
|
|
|
if (rawDataRowCapacity < MIN_RAW_DATA_CAP)
|
|
rawDataRowCapacity = MIN_RAW_DATA_CAP;
|
|
|
|
pRawDataRow = new char[rawDataRowCapacity];
|
|
memcpy(pRawDataRow, lastRowHead, rawDataRowLength);
|
|
}
|
|
else
|
|
{
|
|
if (rawDataRowLength == rawDataRowCapacity)
|
|
{
|
|
// resize array if full
|
|
rawDataRowCapacity = rawDataRowCapacity * 2;
|
|
resizeRowDataArray(&pRawDataRow, rawDataRowLength, rawDataRowCapacity);
|
|
}
|
|
|
|
pRawDataRow[rawDataRowLength] = *(p + 1);
|
|
rawDataRowLength++;
|
|
}
|
|
|
|
fData[idxTo] = *(p + 1);
|
|
idxFrom += 2;
|
|
idxTo++;
|
|
offset++;
|
|
p++;
|
|
}
|
|
|
|
else if (c == STRING_ENCLOSED_CHAR)
|
|
{
|
|
fieldState = FLD_PARSE_TRAILING_CHAR_STATE;
|
|
}
|
|
|
|
else
|
|
{
|
|
if (idxTo != idxFrom)
|
|
fData[idxTo] = fData[idxFrom];
|
|
|
|
idxFrom++;
|
|
idxTo++;
|
|
offset++;
|
|
}
|
|
|
|
p++;
|
|
continue; // process next byte
|
|
}
|
|
|
|
//------------------------------------------------------------------
|
|
// FLD_PARSE_TRAILING_CHAR_STATE
|
|
// Ignore any trailing chars till we reach field or line delimiter.
|
|
//------------------------------------------------------------------
|
|
case FLD_PARSE_TRAILING_CHAR_STATE:
|
|
default:
|
|
{
|
|
if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR))
|
|
{
|
|
if (c == NEWLINE_CHAR)
|
|
bNewLine = true;
|
|
}
|
|
else
|
|
{
|
|
p++;
|
|
continue; // process next byte
|
|
}
|
|
|
|
break;
|
|
}
|
|
} // end of switch on fieldState
|
|
|
|
//----------------------------------------------------------------------
|
|
// Finished reading the bytes in the next source field.
|
|
// See if source field is to be included (or ignored)
|
|
//----------------------------------------------------------------------
|
|
if ((curFld < fNumFieldsInFile) && (fFieldList[curFld].fFldColType == BULK_FLDCOL_COLUMN_FIELD))
|
|
{
|
|
//------------------------------------------------------------------
|
|
// Process destination column or end of row if source field is to
|
|
// be included as part of output to database.
|
|
//------------------------------------------------------------------
|
|
if (curCol < fNumColsInFile)
|
|
{
|
|
const JobColumn& jobCol = columnsInfo[curCol].column;
|
|
|
|
// tmp code to test trailing space
|
|
if (jobCol.dataType == CalpontSystemCatalog::CHAR)
|
|
{
|
|
// cout << "triming ... " << endl;
|
|
char* tmp = p;
|
|
|
|
while (tmp != lastRowHead && *(--tmp) == ' ')
|
|
{
|
|
// cout << "offset is " << offset <<endl;
|
|
offset--;
|
|
}
|
|
}
|
|
|
|
fTokens[curRowNum1][curCol].start = start;
|
|
fTokens[curRowNum1][curCol].offset = offset;
|
|
#ifdef DEBUG_TOKEN_PARSING
|
|
enclosedFieldFlags[curCol] = enclosedFieldFlag;
|
|
#endif
|
|
|
|
// Would like to refactor this validation logic into a separate
|
|
// inline function, but code may be too long for compiler
|
|
// to inline. And factoring out into a non-inline function
|
|
// slows down the read thread by 10%. So left code here.
|
|
if (offset)
|
|
{
|
|
switch (fTokens[curRowNum1][curCol].offset)
|
|
{
|
|
// Special auto-increment case; treat '0' as null value
|
|
case 1:
|
|
{
|
|
if ((jobCol.autoIncFlag) && (*(fData + fTokens[curRowNum1][curCol].start) == NULL_AUTO_INC_0))
|
|
{
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
bRowGenAutoInc = true;
|
|
}
|
|
else if (jobCol.dataType == CalpontSystemCatalog::VARBINARY && (bValidRow))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_ODD_VARBINARY_LENGTH << "field " << (curFld + 1) << " has " << offset
|
|
<< " bytes";
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case 2:
|
|
{
|
|
if ((*(fData + fTokens[curRowNum1][curCol].start) == ESCAPE_CHAR) &&
|
|
(*(fData + fTokens[curRowNum1][curCol].start + 1) == NULL_CHAR))
|
|
{
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
if (jobCol.autoIncFlag)
|
|
bRowGenAutoInc = true;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
// If enclosedFlag set, then treat "NULL" as data and
|
|
// not as a null value
|
|
case 4:
|
|
{
|
|
if ((fNullStringMode) && (!enclosedFieldFlag))
|
|
{
|
|
if ((*(fData + fTokens[curRowNum1][curCol].start) == NULL_VALUE_STRING[0]) &&
|
|
(*(fData + fTokens[curRowNum1][curCol].start + 1) == NULL_VALUE_STRING[1]) &&
|
|
(*(fData + fTokens[curRowNum1][curCol].start + 2) == NULL_VALUE_STRING[2]) &&
|
|
(*(fData + fTokens[curRowNum1][curCol].start + 3) == NULL_VALUE_STRING[3]))
|
|
{
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
if (jobCol.autoIncFlag)
|
|
bRowGenAutoInc = true;
|
|
}
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
if ((jobCol.dataType == CalpontSystemCatalog::VARBINARY) && ((offset & 1) == 1) && (bValidRow))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_ODD_VARBINARY_LENGTH << "field " << (curFld + 1) << " has " << offset
|
|
<< " bytes";
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
|
|
// @bug 3478: Truncate instead of rejecting dctnry
|
|
// strings>8000. Only reject numeric cols>1000 bytes
|
|
else if ((fTokens[curRowNum1][curCol].offset > MAX_FIELD_SIZE) &&
|
|
(jobCol.colType != COL_TYPE_DICT) && (bValidRow))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_TOO_LONG << "field " << (curFld + 1) << " longer than "
|
|
<< MAX_FIELD_SIZE << " bytes";
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
|
|
break;
|
|
}
|
|
} // end of switch on offset
|
|
|
|
// @bug 4037: When cmd line option set, treat char
|
|
// and varchar fields that are too long as errors
|
|
if (getTruncationAsError() && bValidRow &&
|
|
(fTokens[curRowNum1][curCol].offset != COLPOSPAIR_NULL_TOKEN_OFFSET))
|
|
{
|
|
if ((jobCol.dataType == CalpontSystemCatalog::VARCHAR ||
|
|
jobCol.dataType == CalpontSystemCatalog::CHAR) &&
|
|
(fTokens[curRowNum1][curCol].offset > jobCol.definedWidth))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_STRING_TOO_LONG << "field " << (curFld + 1) << " longer than "
|
|
<< jobCol.definedWidth << " bytes";
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
}
|
|
|
|
} // end of "if (offset)"
|
|
else
|
|
{
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
if (jobCol.autoIncFlag)
|
|
bRowGenAutoInc = true;
|
|
}
|
|
|
|
// For non-autoincrement column,
|
|
// Validate a NotNull column is supplied a value or a default
|
|
if (!bRowGenAutoInc)
|
|
{
|
|
if ((jobCol.fNotNull) && (fTokens[curRowNum1][curCol].offset == COLPOSPAIR_NULL_TOKEN_OFFSET) &&
|
|
(!jobCol.fWithDefault) && (bValidRow))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_NULL_CONSTRAINT << "; field " << (curFld + 1);
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
}
|
|
} // end if curCol < fNumberOfColumns
|
|
|
|
curCol++;
|
|
}
|
|
|
|
curFld++;
|
|
|
|
//----------------------------------------------------------------------
|
|
// End-of-row processing
|
|
//----------------------------------------------------------------------
|
|
if (bNewLine)
|
|
{
|
|
// Debug: Dump next row that may or may not be accepted as
|
|
// valid. Not a typo, that we print "curRowNum" as the row
|
|
// number, but we use curRowNum1 as the index into fTokens.
|
|
#ifdef DEBUG_TOKEN_PARSING
|
|
std::cout << "Row " << curRowNum + 1 << ". fTokens: "
|
|
<< "(start,offset,enclosed)" << std::endl;
|
|
unsigned kColCount = fNumColsInFile;
|
|
|
|
if (curCol < kColCount)
|
|
kColCount = curCol;
|
|
|
|
for (unsigned int k = 0; k < kColCount; k++)
|
|
{
|
|
std::cout << " (" << fTokens[curRowNum1][k].start << "," << fTokens[curRowNum1][k].offset << ","
|
|
<< enclosedFieldFlags[k] << ") ";
|
|
|
|
if (fTokens[curRowNum1][k].offset != COLPOSPAIR_NULL_TOKEN_OFFSET)
|
|
{
|
|
std::string outField(fData + fTokens[curRowNum1][k].start, fTokens[curRowNum1][k].offset);
|
|
std::cout << " " << outField << std::endl;
|
|
}
|
|
else
|
|
{
|
|
std::cout << " <NULL>" << std::endl;
|
|
}
|
|
}
|
|
|
|
#endif
|
|
|
|
curRowNum++; // increment total number of rows read
|
|
int rowLength = p - lastRowHead + 1;
|
|
|
|
// @bug 3146: Allow optional trailing value at end of input file
|
|
// Don't count last column if no data after last delimiter,
|
|
// and we don't need that last column.
|
|
if ((offset == 0) && (curFld == (fNumFieldsInFile + 1)))
|
|
{
|
|
curFld--;
|
|
}
|
|
|
|
if ((curFld != fNumFieldsInFile) && (bValidRow))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_WRONG_NO_COLUMNS << "; num fields expected-" << fNumFieldsInFile
|
|
<< "; num fields found-" << curFld;
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
|
|
if (bValidRow)
|
|
{
|
|
// Initialize fTokens for <DefaultColumn> tags not in input file
|
|
if (fNumColsInFile < fNumberOfColumns)
|
|
{
|
|
for (unsigned int n = fNumColsInFile; n < fNumberOfColumns; n++)
|
|
{
|
|
fTokens[curRowNum1][n].start = 0;
|
|
fTokens[curRowNum1][n].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
if (columnsInfo[n].column.autoIncFlag)
|
|
bRowGenAutoInc = true;
|
|
}
|
|
}
|
|
|
|
curRowNum1++; // increment valid row count
|
|
|
|
if (bRowGenAutoInc)
|
|
fAutoIncGenCount++; // update number of generated auto-incs
|
|
}
|
|
else
|
|
{
|
|
// Store validation error message to be logged
|
|
if (rawDataRowLength == 0)
|
|
{
|
|
string tmp(lastRowHead, rowLength);
|
|
fErrRows.push_back(tmp);
|
|
}
|
|
else
|
|
{
|
|
string tmp(pRawDataRow, rawDataRowLength);
|
|
fErrRows.push_back(tmp);
|
|
}
|
|
|
|
fRowStatus.push_back(std::pair<RID, std::string>(fStartRowForLogging + curRowNum, validationErrMsg));
|
|
|
|
errorCount++;
|
|
|
|
// Quit if we exceed max allowable errors for this call.
|
|
// We set lastRowHead = p, so that the code that follows this
|
|
// loop won't try to save any data in fOverflowBuf.
|
|
if (errorCount > allowedErrCntThisCall)
|
|
{
|
|
lastRowHead = p + 1;
|
|
p++;
|
|
break;
|
|
}
|
|
}
|
|
|
|
curCol = 0;
|
|
curFld = 0;
|
|
lastRowHead = p + 1;
|
|
rawDataRowLength = 0;
|
|
|
|
// Resize fTokens array if we are about to fill it up
|
|
if (curRowNum1 >= fTotalRows)
|
|
{
|
|
resizeTokenArray();
|
|
}
|
|
|
|
bNewLine = false;
|
|
bValidRow = true;
|
|
bRowGenAutoInc = false;
|
|
#ifdef DEBUG_TOKEN_PARSING
|
|
|
|
if (initialState == FLD_PARSE_LEADING_CHAR_STATE)
|
|
memset(enclosedFieldFlags, 0, sizeof(unsigned) * fNumberOfColumns);
|
|
|
|
#endif
|
|
} // end of (bNewLine)
|
|
|
|
offset = 0;
|
|
fieldState = initialState;
|
|
enclosedFieldFlag = 0;
|
|
|
|
p++;
|
|
} // end of (p < pEndOfData) loop to step thru the read buffer
|
|
|
|
// Save any leftover data that we did not yet parse, into fOverflowBuf
|
|
if (p > lastRowHead)
|
|
{
|
|
fOverflowSize = p - lastRowHead;
|
|
fOverflowBuf = new char[fOverflowSize];
|
|
|
|
// If we stripped out any chars, be sure to preserve the original data
|
|
if (rawDataRowLength == 0)
|
|
memcpy(fOverflowBuf, lastRowHead, fOverflowSize);
|
|
else
|
|
memcpy(fOverflowBuf, pRawDataRow, fOverflowSize);
|
|
}
|
|
else
|
|
{
|
|
fOverflowSize = 0;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
|
|
fTotalReadRows = curRowNum1; // number of valid rows read
|
|
fTotalReadRowsForLog = curRowNum; // total number of rows read
|
|
|
|
if (pRawDataRow)
|
|
delete[] pRawDataRow;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Resize the fTokens array used to store meta data about the input read buffer.
|
|
// Used for initial allocation as well.
|
|
//------------------------------------------------------------------------------
|
|
void BulkLoadBuffer::resizeTokenArray()
|
|
{
|
|
unsigned tmpTotalRows = 0;
|
|
|
|
if (!fTokens)
|
|
{
|
|
tmpTotalRows = fBufferSize / 100;
|
|
|
|
// Estimate the number of rows we can store in
|
|
// one buffer by getting length of first record
|
|
for (unsigned int k = 0; k < (fBufferSize - fOverflowSize); k++)
|
|
{
|
|
if (fData[k] == NEWLINE_CHAR)
|
|
{
|
|
tmpTotalRows = fBufferSize / (k + 1);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
tmpTotalRows = (unsigned int)(fTotalRows * 1.25);
|
|
|
|
// @bug 3478: Make sure token array is expanded.
|
|
// If rows are loooong, then fTotalRows may be small (< 4), in which
|
|
// a 1.25 factor won't increase the row count. So this check is here
|
|
// to make sure we increase the row count in this case.
|
|
if (tmpTotalRows <= fTotalRows)
|
|
tmpTotalRows = fTotalRows * 2;
|
|
}
|
|
|
|
if (fLog->isDebug(DEBUG_1))
|
|
{
|
|
std::string allocLabel("Re-Allocating");
|
|
|
|
if (!fTokens)
|
|
allocLabel = "Allocating";
|
|
|
|
ostringstream oss;
|
|
oss << "Table: " << fTableName << "; ReadBuffer: " << fBufferId << "; " << allocLabel
|
|
<< " ColValue metadata of size " << sizeof(ColPosPair) << " for " << tmpTotalRows << " rows and "
|
|
<< fNumberOfColumns << " columns ";
|
|
fLog->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
ColPosPair** tmp;
|
|
tmp = new ColPosPair*[tmpTotalRows];
|
|
|
|
if (fTokens)
|
|
{
|
|
memcpy(tmp, fTokens, sizeof(ColPosPair*) * fTotalRows);
|
|
delete[] fTokens;
|
|
}
|
|
|
|
fTokens = tmp;
|
|
|
|
// Allocate a ColPosPair array for each new row
|
|
for (unsigned i = fTotalRows; i < tmpTotalRows; ++i)
|
|
fTokens[i] = new ColPosPair[fNumberOfColumns];
|
|
|
|
fTotalRows = tmpTotalRows;
|
|
}
|
|
|
|
//@bug 5027: Add tokenizeBinary() and isBinaryFieldNull() for binary imports
|
|
//------------------------------------------------------------------------------
|
|
// Alternatve version of tokenize() uesd to import fixed length records in
|
|
// binary mode.
|
|
// Parse the rows of data in "fData", saving the meta information that describes
|
|
// the parsed data, in fTokens. If the number of read parsing errors for a
|
|
// given call to tokenize() should exceed the value of "allowedErrCntThisCall",
|
|
// then tokenize() will stop reading data and exit.
|
|
//------------------------------------------------------------------------------
|
|
int BulkLoadBuffer::tokenizeBinary(const boost::ptr_vector<ColumnInfo>& columnsInfo,
|
|
unsigned int allowedErrCntThisCall, bool bEndOfData)
|
|
{
|
|
unsigned curCol = 0; // dest db column counter within a row
|
|
unsigned curRowNum = 0; // "total" number of rows read during this call
|
|
unsigned curRowNum1 = 0; // number of "valid" rows inserted into fTokens
|
|
char* p; // iterates thru each field in the input buffer
|
|
char* lastRowHead = 0; // start of latest row being processed
|
|
bool bValidRow = true; // track whether current row is valid
|
|
bool bRowGenAutoInc = false; // track whether row uses generated auto-inc
|
|
std::string validationErrMsg; // validation error msg (if any) for current row
|
|
unsigned errorCount = 0;
|
|
int rc = NO_ERROR;
|
|
|
|
p = lastRowHead = fData;
|
|
|
|
ldiv_t rowcnt = ldiv(fReadSize, fFixedBinaryRecLen);
|
|
|
|
//--------------------------------------------------------------------------
|
|
// Loop through all the bytes in the read buffer in order to construct
|
|
// the meta data stored in fTokens.
|
|
//--------------------------------------------------------------------------
|
|
for (long kRow = 0; kRow < rowcnt.quot; kRow++)
|
|
{
|
|
//----------------------------------------------------------------------
|
|
// Manage all the fields in a row
|
|
//----------------------------------------------------------------------
|
|
for (unsigned int curFld = 0; curFld < fNumFieldsInFile; curFld++)
|
|
{
|
|
if (fFieldList[curFld].fFldColType == BULK_FLDCOL_COLUMN_FIELD)
|
|
{
|
|
const JobColumn& jobCol = columnsInfo[curCol].column;
|
|
|
|
if (curCol < fNumColsInFile)
|
|
{
|
|
fTokens[curRowNum1][curCol].start = p - fData;
|
|
fTokens[curRowNum1][curCol].offset = jobCol.definedWidth;
|
|
|
|
// Special auto-increment case; treat 0 as null value
|
|
if (jobCol.autoIncFlag)
|
|
{
|
|
if (memcmp(p, &NULL_AUTO_INC_0_BINARY, jobCol.definedWidth) == 0)
|
|
{
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
bRowGenAutoInc = true;
|
|
}
|
|
}
|
|
|
|
switch (jobCol.weType)
|
|
{
|
|
case WR_CHAR:
|
|
{
|
|
// Detect empty string for CHAR and VARCHAR
|
|
if (*p == '\0')
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
break;
|
|
}
|
|
|
|
case WR_VARBINARY:
|
|
{
|
|
// Detect empty VARBINARY field
|
|
int kk;
|
|
|
|
for (kk = 0; kk < jobCol.definedWidth; kk++)
|
|
{
|
|
if (p[kk] != '\0')
|
|
break;
|
|
}
|
|
|
|
if (kk >= jobCol.definedWidth)
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
// In BinaryAcceptNULL mode, check for NULL value
|
|
if ((fTokens[curRowNum1][curCol].offset != COLPOSPAIR_NULL_TOKEN_OFFSET) &&
|
|
(fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL))
|
|
{
|
|
if (isBinaryFieldNull(p, jobCol.weType, jobCol.dataType))
|
|
{
|
|
fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
if (jobCol.autoIncFlag)
|
|
bRowGenAutoInc = true;
|
|
}
|
|
}
|
|
|
|
break;
|
|
}
|
|
} // end of switch (jobCol.weType)
|
|
|
|
// Validate NotNull column is supplied a value or a default
|
|
if ((jobCol.fNotNull) && (fTokens[curRowNum1][curCol].offset == COLPOSPAIR_NULL_TOKEN_OFFSET) &&
|
|
(!jobCol.fWithDefault) && (bValidRow))
|
|
{
|
|
bValidRow = false;
|
|
|
|
ostringstream ossErrMsg;
|
|
ossErrMsg << INPUT_ERROR_NULL_CONSTRAINT << "; field " << (curFld + 1);
|
|
validationErrMsg = ossErrMsg.str();
|
|
}
|
|
} // end "if (curCol < fNumColsInFile)"
|
|
|
|
p += jobCol.definedWidth;
|
|
curCol++;
|
|
}
|
|
else
|
|
{
|
|
// This is where we would handle <IgnoreField> fields
|
|
// if they were supported in Binary Import mode
|
|
// p += ?
|
|
}
|
|
} // end of loop through fields in a row
|
|
|
|
//----------------------------------------------------------------------
|
|
// End-of-row processing
|
|
//----------------------------------------------------------------------
|
|
|
|
curRowNum++; // increment total number of rows read
|
|
|
|
if (bValidRow)
|
|
{
|
|
// Initialize fTokens for <DefaultColumn> tags not in input file
|
|
if (fNumColsInFile < fNumberOfColumns)
|
|
{
|
|
for (unsigned int n = fNumColsInFile; n < fNumberOfColumns; n++)
|
|
{
|
|
fTokens[curRowNum1][n].start = 0;
|
|
fTokens[curRowNum1][n].offset = COLPOSPAIR_NULL_TOKEN_OFFSET;
|
|
|
|
if (columnsInfo[n].column.autoIncFlag)
|
|
bRowGenAutoInc = true;
|
|
}
|
|
}
|
|
|
|
curRowNum1++; // increment valid row count
|
|
|
|
if (bRowGenAutoInc)
|
|
fAutoIncGenCount++; // update number of generated auto-incs
|
|
}
|
|
else
|
|
{
|
|
// Store validation error message to be logged
|
|
string tmp(lastRowHead, fFixedBinaryRecLen);
|
|
fErrRows.push_back(tmp);
|
|
|
|
fRowStatus.push_back(std::pair<RID, std::string>(fStartRowForLogging + curRowNum, validationErrMsg));
|
|
|
|
errorCount++;
|
|
|
|
// Quit if we exceed max allowable errors for this call
|
|
if (errorCount > allowedErrCntThisCall)
|
|
break;
|
|
}
|
|
|
|
curCol = 0;
|
|
lastRowHead += fFixedBinaryRecLen;
|
|
|
|
// Resize fTokens array if we are about to fill it up
|
|
if (curRowNum1 >= fTotalRows)
|
|
{
|
|
resizeTokenArray();
|
|
}
|
|
|
|
bValidRow = true;
|
|
bRowGenAutoInc = false;
|
|
} // end of loop through the rows in the read buffer
|
|
|
|
// Save any leftover data that we did not yet parse, into fOverflowBuf
|
|
if (rowcnt.rem > 0)
|
|
{
|
|
if (bEndOfData)
|
|
{
|
|
rc = ERR_BULK_BINARY_PARTIAL_REC;
|
|
ostringstream oss;
|
|
oss << "Incomplete record (" << rowcnt.rem
|
|
<< " bytes) at end "
|
|
"of import data; expected fixed length records of length "
|
|
<< fFixedBinaryRecLen << " bytes";
|
|
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
|
|
}
|
|
else
|
|
{
|
|
fOverflowSize = rowcnt.rem;
|
|
fOverflowBuf = new char[fOverflowSize];
|
|
|
|
memcpy(fOverflowBuf, (fData + fReadSize - rowcnt.rem), fOverflowSize);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
fOverflowSize = 0;
|
|
fOverflowBuf = NULL;
|
|
}
|
|
|
|
fTotalReadRows = curRowNum1; // number of valid rows read
|
|
fTotalReadRowsForLog = curRowNum; // total number of rows read
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Compare the numeric value (val) against the relevant NULL value, based on
|
|
// column type (ct and dt), to see whether the specified value is NULL.
|
|
//------------------------------------------------------------------------------
|
|
bool BulkLoadBuffer::isBinaryFieldNull(void* val, WriteEngine::ColType ct,
|
|
execplan::CalpontSystemCatalog::ColDataType dt)
|
|
{
|
|
bool isNullFlag = false;
|
|
|
|
switch (ct)
|
|
{
|
|
case WriteEngine::WR_BYTE:
|
|
{
|
|
if ((*(uint8_t*)val) == joblist::TINYINTNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_SHORT:
|
|
{
|
|
if ((*(uint16_t*)val) == joblist::SMALLINTNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_INT:
|
|
{
|
|
if (dt == execplan::CalpontSystemCatalog::DATE)
|
|
{
|
|
if ((*(uint32_t*)val) == joblist::DATENULL)
|
|
isNullFlag = true;
|
|
}
|
|
else
|
|
{
|
|
if ((*(uint32_t*)val) == joblist::INTNULL)
|
|
isNullFlag = true;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_LONGLONG:
|
|
{
|
|
if (dt == execplan::CalpontSystemCatalog::DATETIME)
|
|
{
|
|
if ((*(uint64_t*)val) == joblist::DATETIMENULL)
|
|
isNullFlag = true;
|
|
}
|
|
else if (dt == execplan::CalpontSystemCatalog::TIMESTAMP)
|
|
{
|
|
if ((*(uint64_t*)val) == joblist::TIMESTAMPNULL)
|
|
isNullFlag = true;
|
|
}
|
|
else if (dt == execplan::CalpontSystemCatalog::TIME)
|
|
{
|
|
if ((*(uint64_t*)val) == joblist::TIMENULL)
|
|
isNullFlag = true;
|
|
}
|
|
else
|
|
{
|
|
if ((*(uint64_t*)val) == joblist::BIGINTNULL)
|
|
isNullFlag = true;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_FLOAT:
|
|
{
|
|
if ((*(uint32_t*)val) == joblist::FLOATNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_DOUBLE:
|
|
{
|
|
if ((*(uint64_t*)val) == joblist::DOUBLENULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
// Detect empty string for CHAR and VARCHAR
|
|
case WriteEngine::WR_CHAR:
|
|
{
|
|
// not applicable
|
|
break;
|
|
}
|
|
|
|
// Detect empty VARBINARY field
|
|
case WriteEngine::WR_VARBINARY:
|
|
{
|
|
// not applicable
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_UBYTE:
|
|
{
|
|
if ((*(uint8_t*)val) == joblist::UTINYINTNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_USHORT:
|
|
{
|
|
if ((*(uint16_t*)val) == joblist::USMALLINTNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_UINT:
|
|
{
|
|
if ((*(uint32_t*)val) == joblist::UINTNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_ULONGLONG:
|
|
{
|
|
if ((*(uint64_t*)val) == joblist::UBIGINTNULL)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
case WriteEngine::WR_BINARY:
|
|
{
|
|
if ((*((int128_t*)val)) == datatypes::Decimal128Null)
|
|
isNullFlag = true;
|
|
|
|
break;
|
|
}
|
|
|
|
default:
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
|
|
return isNullFlag;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Sets the column status.
|
|
// returns TRUE if all columns in the buffer are complete.
|
|
//
|
|
// Note that fSyncUpdatesTI mutex is used to synchronize usage of fColumnLocks
|
|
// and fParseComplete from both read and parse threads.
|
|
//
|
|
// setColumnStatus() and tryAndLockColumn() formerly used fSyncUpdatesBLB mutex.
|
|
// But this seemed inconsistent because resetColumnLocks(), getColumnStatus(),
|
|
// and getColumnLocker() were not using this mutex. In researching the idea of
|
|
// adding fSyncUpdatesBLB locks to these functions, I determined, that all the
|
|
// calls to the following functions were protected by a fSyncUpdatesTI mutex:
|
|
// setColumnStatus()
|
|
// tryAndLockColumn()
|
|
// resetColumnLocks()
|
|
// getColumnStatus()
|
|
// getColumnLocker()
|
|
// So I added this note and removed the extraneous fSyncUpdatesBLB lock from
|
|
// setColumnStatus() and tryAndLockColumn(). (dmc-07/19/2009)
|
|
//------------------------------------------------------------------------------
|
|
bool BulkLoadBuffer::setColumnStatus(const int& columnId, const Status& status)
|
|
{
|
|
fColumnLocks[columnId].status = status;
|
|
|
|
if (status == WriteEngine::PARSE_COMPLETE)
|
|
fParseComplete++;
|
|
|
|
if (fParseComplete == fNumberOfColumns)
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Note that fSyncUpdatesTI mutex is used to synchronize usage of fColumnLocks
|
|
// and fParseComplete from both read and parse threads.
|
|
//------------------------------------------------------------------------------
|
|
bool BulkLoadBuffer::tryAndLockColumn(const int& columnId, const int& id)
|
|
{
|
|
if ((fColumnLocks[columnId].status != WriteEngine::PARSE_COMPLETE) && (fColumnLocks[columnId].locker == -1))
|
|
{
|
|
fColumnLocks[columnId].locker = id;
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
} // namespace WriteEngine
|