/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /********************************************************************* * $Id: we_bulkloadbuffer.cpp 4661 2013-06-04 12:59:50Z dcathey $ * ********************************************************************/ #include #include #include #include #include #include #include // includes on linux #include #include #include #include "we_bulkload.h" #include "we_bulkloadbuffer.h" #include "we_brm.h" #include "we_convertor.h" #include "we_log.h" #include "brmtypes.h" #include "dataconvert.h" #include "exceptclasses.h" #include "mcs_decimal.h" #include "mcs_datatype.h" #include "joblisttypes.h" #include "utils_utf8.h" // utf8_truncate_point() #include #include #include using namespace std; using namespace boost; using namespace execplan; namespace { const std::string INPUT_ERROR_WRONG_NO_COLUMNS = "Data contains wrong number of columns"; const std::string INPUT_ERROR_TOO_LONG = "Data in wrong format; exceeds max field length; "; const std::string INPUT_ERROR_NULL_CONSTRAINT = "Data violates NOT NULL constraint with no default"; const std::string INPUT_ERROR_ODD_VARBINARY_LENGTH = "VarBinary column is incomplete; odd number of bytes; "; const std::string INPUT_ERROR_STRING_TOO_LONG = "Character data exceeds max field length; "; const char NULL_CHAR = 'N'; const char* NULL_VALUE_STRING = "NULL"; const char NULL_AUTO_INC_0 = '0'; const unsigned long long NULL_AUTO_INC_0_BINARY = 0; const char NEWLINE_CHAR = '\n'; // Enumeration states related to parsing a column value enum FieldParsingState { FLD_PARSE_LEADING_CHAR_STATE = 1, // parsing leading character FLD_PARSE_ENCLOSED_STATE = 2, // parsing an enclosed column value FLD_PARSE_TRAILING_CHAR_STATE = 3, // parsing bytes after an // enclosed column value FLD_PARSE_NORMAL_STATE = 4 // parsing non-enclosed column value }; //------------------------------------------------------------------------------ // Expand pRowData to size "newArrayCapacity", preserving contents, and // deleting old pointer //------------------------------------------------------------------------------ inline void resizeRowDataArray(char** pRowData, unsigned int dataLength, unsigned int newArrayCapacity) { char* tmpRaw = new char[newArrayCapacity]; memcpy(tmpRaw, *pRowData, dataLength); delete[] *pRowData; *pRowData = tmpRaw; } } // namespace // #define DEBUG_TOKEN_PARSING 1 namespace WriteEngine { //------------------------------------------------------------------------------ // BulkLoadBuffer constructor //------------------------------------------------------------------------------ BulkLoadBuffer::BulkLoadBuffer(unsigned numberOfCols, unsigned bufferSize, Log* logger, int bufferId, const std::string& tableName, const JobFieldRefList& jobFieldRefList) : fOverflowSize(0) , fParseComplete(0) , fTotalRows(0) , fStartRow(0) , fStartRowForLogging(0) , fAutoIncGenCount(0) , fAutoIncNextValue(0) , fReadSize(0) , fLog(logger) , fNullStringMode(false) , fEnclosedByChar('\0') , fEscapeChar('\\') , fBufferId(bufferId) , fTableName(tableName) , fbTruncationAsError(false) , fImportDataMode(IMPORT_DATA_TEXT) , fTimeZone(dataconvert::systemTimeZoneOffset()) , fFixedBinaryRecLen(0) { // if it's non-parquet case, initialize the fData if (fImportDataMode != IMPORT_DATA_PARQUET) fData = new char[bufferSize]; fOverflowBuf = NULL; fStatusBLB = WriteEngine::NEW; fNumberOfColumns = numberOfCols; fBufferSize = bufferSize; fColumnLocks.clear(); fTokens = 0; fRowStatus.clear(); fErrRows.clear(); struct LockInfo info; info.locker = -1; info.status = WriteEngine::NEW; fColumnLocks.resize(numberOfCols); fColumnLocks.assign(fNumberOfColumns, info); fTotalReadRowsParser = 0; fStartRowParser = 0; fDataParser = 0; fTokensParser = 0; fStartRowForLoggingParser = 0; fAutoIncGenCountParser = 0; fNumFieldsInFile = 0; fNumColsInFile = 0; // Count the total number of fields in the input file (fNumFieldsInFile) // and the number of db columns that will be loaded from those fields // (fNumColsInFile). Keep in mind that fNumColsInFile may be less than // fNumFieldsInFile, because there may be fields we are to ignore, and/or // some db columns may get default loaded without a corresponding field // in the input file. fFieldList.resize(jobFieldRefList.size()); for (unsigned k = 0; k < jobFieldRefList.size(); k++) { fFieldList[k] = jobFieldRefList[k]; switch (jobFieldRefList[k].fFldColType) { case BULK_FLDCOL_COLUMN_FIELD: { fNumColsInFile++; fNumFieldsInFile++; break; } case BULK_FLDCOL_IGNORE_FIELD: { fNumFieldsInFile++; break; } case BULK_FLDCOL_COLUMN_DEFAULT: default: { break; } } } } //------------------------------------------------------------------------------ // BulkLoadBuffer destructor //------------------------------------------------------------------------------ BulkLoadBuffer::~BulkLoadBuffer() { if (fData != NULL) delete[] fData; if (fOverflowBuf != NULL) delete[] fOverflowBuf; fColumnLocks.clear(); if (fTokens != NULL) { for (unsigned int i = 0; i < fTotalRows; ++i) { delete[] fTokens[i]; } delete[] fTokens; } fRowStatus.clear(); fErrRows.clear(); } //------------------------------------------------------------------------------ // Resets state of buffer. //------------------------------------------------------------------------------ void BulkLoadBuffer::reset() { fStartRow = fTotalReadRows = fTotalReadRowsForLog = 0; fAutoIncGenCount = 0; } //------------------------------------------------------------------------------ // Resets state of buffer's column locks. //------------------------------------------------------------------------------ void BulkLoadBuffer::resetColumnLocks() { fParseComplete = 0; struct LockInfo info; fColumnLocks.assign(fNumberOfColumns, info); } //------------------------------------------------------------------------------ // Copy overflow leftover from previous buffer into the start of "this" buffer //------------------------------------------------------------------------------ void BulkLoadBuffer::copyOverflow(const BulkLoadBuffer& buffer) { if (fOverflowBuf != NULL) { delete[] fOverflowBuf; fOverflowBuf = NULL; } fOverflowSize = buffer.fOverflowSize; if (fOverflowSize != 0) { fOverflowBuf = new char[buffer.fOverflowSize]; memcpy(fOverflowBuf, buffer.fOverflowBuf, buffer.fOverflowSize); } } //------------------------------------------------------------------------------ // Parse/convert the given "field" value based on the specified length and type. // field (in) - the input field value to be parsed // fieldLength (in) - number of bytes of data in "field" // nullFlag (in) - indicates if NULL value is to be assigned to "output" // rather than parsing the data in "field" // output (out) - the parsed value taken from "field" // column (in) - column information for the column we are parsing // bufStats: // minBufferVal (in/out) - ongoing min value for the Read buffer we are parsing // maxBufferVal (in/out) - ongoing max value for the Read buffer we are parsing // satCount (in/out) - ongoing saturation row count for buffer being parsed //------------------------------------------------------------------------------ void BulkLoadBuffer::convert(char* field, int fieldLength, bool nullFlag, unsigned char* output, const JobColumn& column, BLBufferStats& bufStats) { char biVal; int iVal; float fVal; double dVal; short siVal; void* pVal; int32_t iDate; char charTmpBuf[MAX_COLUMN_BOUNDARY + 1] = {0}; long long llVal = 0, llDate = 0; int128_t bigllVal = 0; uint64_t tmp64; uint32_t tmp32; uint8_t ubiVal; uint16_t usiVal; uint32_t uiVal; uint64_t ullVal; int width = column.width; //-------------------------------------------------------------------------- // Parse based on column data type //-------------------------------------------------------------------------- switch (column.weType) { //---------------------------------------------------------------------- // FLOAT //---------------------------------------------------------------------- case WriteEngine::WR_FLOAT: { if (nullFlag) { if (column.fWithDefault) { fVal = column.fDefaultDbl; pVal = &fVal; } else { tmp32 = joblist::FLOATNULL; pVal = &tmp32; } } else { float minFltSat = column.fMinDblSat; float maxFltSat = column.fMaxDblSat; if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&fVal, field, sizeof(fVal)); if (isnan(fVal)) { if (signbit(fVal)) fVal = minFltSat; else fVal = maxFltSat; bufStats.satCount++; } else { if (fVal > maxFltSat) { fVal = maxFltSat; bufStats.satCount++; } else if (fVal < minFltSat) { fVal = minFltSat; bufStats.satCount++; } } } else { errno = 0; fVal = strtof(field, 0); if (errno == ERANGE) { if (abs(fVal) == HUGE_VALF) { if (fVal > 0) fVal = maxFltSat; else fVal = minFltSat; bufStats.satCount++; } else fVal = 0; } else { if (fVal > maxFltSat) { fVal = maxFltSat; bufStats.satCount++; } else if (fVal < minFltSat) { fVal = minFltSat; bufStats.satCount++; } if (fVal == 0 && isTrueWord(const_cast(field), fieldLength)) { fVal = 1; } } } pVal = &fVal; } break; } //---------------------------------------------------------------------- // DOUBLE //---------------------------------------------------------------------- case WriteEngine::WR_DOUBLE: { if (nullFlag) { if (column.fWithDefault) { dVal = column.fDefaultDbl; pVal = &dVal; } else { tmp64 = joblist::DOUBLENULL; pVal = &tmp64; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&dVal, field, sizeof(dVal)); if (std::isnan(dVal)) { if (signbit(dVal)) dVal = column.fMinDblSat; else dVal = column.fMaxDblSat; bufStats.satCount++; } else { if (dVal > column.fMaxDblSat) { dVal = column.fMaxDblSat; bufStats.satCount++; } else if (dVal < column.fMinDblSat) { dVal = column.fMinDblSat; bufStats.satCount++; } } } else { errno = 0; dVal = strtod(field, 0); if (errno == ERANGE) { if (abs(dVal) == HUGE_VALL) { if (dVal > 0) dVal = column.fMaxDblSat; else dVal = column.fMinDblSat; bufStats.satCount++; } else dVal = 0; } else { if (dVal > column.fMaxDblSat) { dVal = column.fMaxDblSat; bufStats.satCount++; } else if (dVal < column.fMinDblSat) { dVal = column.fMinDblSat; bufStats.satCount++; } else if (dVal == 0 && isTrueWord(const_cast(field), fieldLength)) { dVal = 1; } } } pVal = &dVal; } break; } //---------------------------------------------------------------------- // CHARACTER //---------------------------------------------------------------------- case WriteEngine::WR_CHAR: { if (nullFlag) { if (column.fWithDefault) { int defLen = column.fDefaultChr.length(); const char* defData = column.fDefaultChr.str(); if (defLen > column.definedWidth) memcpy(charTmpBuf, defData, column.definedWidth); else memcpy(charTmpBuf, defData, defLen); // fall through to update saturation and min/max } else { idbassert(width <= 8); for (int i = 0; i < width - 1; i++) { charTmpBuf[i] = '\377'; } charTmpBuf[width - 1] = '\376'; pVal = charTmpBuf; break; } } else { // truncate string if it is too long // @Bug 3040. Use definedWidth for the data truncation to keep // from storing characters beyond the column's defined width. // It contains the column definition width rather than the bytes // on disk (e.g. 5 for a varchar(5) instead of 8). if (column.cs->mbmaxlen > 1) { const CHARSET_INFO* cs = column.cs; const char* start = (const char*)field; const char* end = (const char*)(field + fieldLength); size_t numChars = cs->numchars(start, end); size_t maxCharLength = column.definedWidth / cs->mbmaxlen; if (numChars > maxCharLength) { MY_STRCOPY_STATUS status; cs->well_formed_char_length(start, end, maxCharLength, &status); fieldLength = status.m_source_end_pos - start; bufStats.satCount++; } } else // cs->mbmaxlen == 1 { if (fieldLength > column.definedWidth) { fieldLength = column.definedWidth; bufStats.satCount++; } } memcpy(charTmpBuf, field, fieldLength); } // Swap byte order before comparing character string // Compare must be unsigned uint64_t compChar = uint64ToStr(*(reinterpret_cast(charTmpBuf))); int64_t binChar = static_cast(compChar); // Update min/max range uint64_t minVal = static_cast(bufStats.minBufferVal); uint64_t maxVal = static_cast(bufStats.maxBufferVal); if (compChar < minVal) bufStats.minBufferVal = binChar; if (compChar > maxVal) bufStats.maxBufferVal = binChar; pVal = charTmpBuf; // cout << "In convert: fieldLength = " << fieldLength < static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } else if (origVal == 0 && isTrueWord(const_cast(field), fieldLength)) { origVal = 1; } if (bSatVal) bufStats.satCount++; // Update min/max range if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; siVal = origVal; pVal = &siVal; break; } //---------------------------------------------------------------------- // UNSIGNED SHORT INT //---------------------------------------------------------------------- case WriteEngine::WR_USHORT: { int64_t origVal = 0; bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); // fall through to update saturation and min/max } else { usiVal = joblist::USMALLINTNULL; pVal = &usiVal; break; } } else { origVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { unsigned short int siVal2; memcpy(&siVal2, field, sizeof(siVal2)); origVal = siVal2; } else { errno = 0; origVal = strtoll(field, 0, 10); if (errno == ERANGE) bSatVal = true; } } // Saturate the value (saturates any negative value to 0) if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } else if (origVal == 0 && isTrueWord(const_cast(field), fieldLength)) { origVal = 1; } if (bSatVal) bufStats.satCount++; // Update min/max range uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; usiVal = origVal; pVal = &usiVal; break; } //---------------------------------------------------------------------- // TINY INT //---------------------------------------------------------------------- case WriteEngine::WR_BYTE: { long long origVal; bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = column.fDefaultInt; // fall through to update saturation and min/max } else { biVal = joblist::TINYINTNULL; pVal = &biVal; break; } } else { origVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { char biVal2; memcpy(&biVal2, field, sizeof(biVal2)); origVal = biVal2; } else { if (isTrueWord(const_cast(field), fieldLength)) { strcpy(field, "1"); fieldLength = 1; } if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { // errno is initialized and set in convertDecimalString origVal = Convertor::convertDecimalString(field, fieldLength, column.scale); } else { errno = 0; origVal = strtol(field, 0, 10); } if (errno == ERANGE) bSatVal = true; } } // Saturate the value if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; // Update min/max range if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; biVal = origVal; pVal = &biVal; break; } //---------------------------------------------------------------------- // UNSIGNED TINY INT //---------------------------------------------------------------------- case WriteEngine::WR_UBYTE: { int64_t origVal = 0; bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); // fall through to update saturation and min/max } else { ubiVal = joblist::UTINYINTNULL; pVal = &ubiVal; break; } } else { origVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { uint8_t biVal2; memcpy(&biVal2, field, sizeof(biVal2)); origVal = biVal2; } else { errno = 0; origVal = strtoll(field, 0, 10); if (errno == ERANGE) bSatVal = true; } } // Saturate the value (saturates any negative value to 0) if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } else if (origVal == 0 && isTrueWord(const_cast(field), fieldLength)) { origVal = 1; } if (bSatVal) bufStats.satCount++; // Update min/max range uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; ubiVal = origVal; pVal = &ubiVal; break; } //---------------------------------------------------------------------- // BIG INT //---------------------------------------------------------------------- case WriteEngine::WR_LONGLONG: { bool bSatVal = false; if (column.dataType != CalpontSystemCatalog::DATETIME && column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME) { if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { llVal = column.fDefaultInt; // fall through to update saturation and min/max } else { llVal = joblist::BIGINTNULL; pVal = &llVal; break; } } else { llVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&llVal, field, sizeof(llVal)); } else { if (isTrueWord(const_cast(field), fieldLength)) { strcpy(field, "1"); fieldLength = 1; } if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { // errno is initialized and set in convertDecimalString llVal = Convertor::convertDecimalString(field, fieldLength, column.scale); } else { errno = 0; llVal = strtoll(field, 0, 10); } } if (errno == ERANGE) bSatVal = true; } // Saturate the value if (llVal < column.fMinIntSat) { llVal = column.fMinIntSat; bSatVal = true; } else if (llVal > static_cast(column.fMaxIntSat)) { // llVal can be > fMaxIntSat if this is a decimal column llVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; // Update min/max range if (llVal < bufStats.minBufferVal) bufStats.minBufferVal = llVal; if (llVal > bufStats.maxBufferVal) bufStats.maxBufferVal = llVal; pVal = &llVal; } else if (column.dataType == CalpontSystemCatalog::TIME) { // time conversion int rc = 0; if (nullFlag) { if (column.fWithDefault) { llDate = column.fDefaultInt; // fall through to update saturation and min/max } else { llDate = joblist::TIMENULL; pVal = &llDate; break; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&llDate, field, sizeof(llDate)); if (!dataconvert::DataConvert::isColumnTimeValid(llDate)) rc = -1; } else { llDate = dataconvert::DataConvert::convertColumnTime(field, dataconvert::CALPONTTIME_ENUM, rc, fieldLength); } } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { bufStats.satCount++; } pVal = &llDate; } else if (column.dataType == CalpontSystemCatalog::TIMESTAMP) { // timestamp conversion int rc = 0; if (nullFlag) { if (column.fWithDefault) { llDate = column.fDefaultInt; // fall through to update saturation and min/max } else { llDate = joblist::TIMESTAMPNULL; pVal = &llDate; break; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&llDate, field, sizeof(llDate)); if (!dataconvert::DataConvert::isColumnTimeStampValid(llDate)) rc = -1; } else { llDate = dataconvert::DataConvert::convertColumnTimestamp( field, dataconvert::CALPONTDATETIME_ENUM, rc, fieldLength, fTimeZone); } } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { llDate = 0; bufStats.satCount++; } pVal = &llDate; } else { // datetime conversion int rc = 0; if (nullFlag) { if (column.fWithDefault) { llDate = column.fDefaultInt; // fall through to update saturation and min/max } else { llDate = joblist::DATETIMENULL; pVal = &llDate; break; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&llDate, field, sizeof(llDate)); if (!dataconvert::DataConvert::isColumnDateTimeValid(llDate)) rc = -1; } else { llDate = dataconvert::DataConvert::convertColumnDatetime(field, dataconvert::CALPONTDATETIME_ENUM, rc, fieldLength); } } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { llDate = 0; bufStats.satCount++; } pVal = &llDate; } break; } //---------------------------------------------------------------------- // WIDE DECIMAL //---------------------------------------------------------------------- case WriteEngine::WR_BINARY: { bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { bigllVal = column.fDefaultWideDecimal; // fall through to update saturation and min/max } else { bigllVal = datatypes::Decimal128Null; pVal = &bigllVal; break; } } else { // TODO MCOL-641 Add support for int128_t version of // fAutoIncNextValue bigllVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&bigllVal, field, sizeof(bigllVal)); } else { if (isTrueWord(const_cast(field), fieldLength)) { strcpy(field, "1"); fieldLength = 1; } bool dummy = false; // Value saturation to 9999... or -9999... is handled by // number_int_value(), and the bSatVal flag is set to true dataconvert::number_int_value( string(field), column.dataType, datatypes::TypeAttributesStd(column.width, column.scale, column.precision), dummy, false, bigllVal, &bSatVal); } } if (bSatVal) bufStats.satCount++; // Update min/max range if (bigllVal < bufStats.bigMinBufferVal) bufStats.bigMinBufferVal = bigllVal; if (bigllVal > bufStats.bigMaxBufferVal) bufStats.bigMaxBufferVal = bigllVal; pVal = &bigllVal; break; } //---------------------------------------------------------------------- // UNSIGNED BIG INT //---------------------------------------------------------------------- case WriteEngine::WR_ULONGLONG: { bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { ullVal = column.fDefaultUInt; // fall through to update saturation and min/max } else { ullVal = joblist::UBIGINTNULL; pVal = &ullVal; break; } } else { ullVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&ullVal, field, sizeof(ullVal)); } else { // Check for negative. strtoull doesn't do this for us. // I considered using boost::trim_left here, but part of the // exercise is to minimize cpu cycles, so I do it the old // fashioned way. isspace() uses more cycles than direct // compare to ' ', '\t', etc. but the payoff is that it // works with Locale, so it ought to work well with utf-8 // input. int idx1; for (idx1 = 0; idx1 < fieldLength; idx1++) { if (!isspace(field[idx1])) break; } if ((idx1 < fieldLength) && (field[idx1] == '-')) { ullVal = static_cast(column.fMinIntSat); bSatVal = true; } else { errno = 0; ullVal = strtoull(field, 0, 10); if (errno == ERANGE) bSatVal = true; } } } // Saturate the value if (ullVal > column.fMaxIntSat) { ullVal = column.fMaxIntSat; bSatVal = true; } else if (ullVal == 0 && isTrueWord(const_cast(field), fieldLength)) { ullVal = 1; } if (bSatVal) bufStats.satCount++; // Update min/max range if (ullVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = static_cast(ullVal); if (ullVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = static_cast(ullVal); pVal = &ullVal; break; } //---------------------------------------------------------------------- // UNSIGNED MEDIUM INTEGER AND UNSIGNED INTEGER //---------------------------------------------------------------------- case WriteEngine::WR_UMEDINT: case WriteEngine::WR_UINT: { int64_t origVal; bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); // fall through to update saturation and min/max } else { uiVal = joblist::UINTNULL; pVal = &uiVal; break; } } else { origVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { unsigned int iVal2; memcpy(&iVal2, field, sizeof(iVal2)); origVal = iVal2; } else { errno = 0; origVal = strtoll(field, 0, 10); if (errno == ERANGE) bSatVal = true; } } // Saturate the value (saturates any negative value to 0) if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } else if (origVal == 0 && isTrueWord(const_cast(field), fieldLength)) { origVal = 1; } if (bSatVal) bufStats.satCount++; // Update min/max range uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; uiVal = origVal; pVal = &uiVal; break; } //---------------------------------------------------------------------- // MEDIUM INTEGER AND INTEGER //---------------------------------------------------------------------- case WriteEngine::WR_MEDINT: case WriteEngine::WR_INT: default: { if (column.dataType != CalpontSystemCatalog::DATE) { long long origVal; bool bSatVal = false; if (nullFlag) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = column.fDefaultInt; // fall through to update saturation and min/max } else { iVal = joblist::INTNULL; pVal = &iVal; break; } } else { origVal = fAutoIncNextValue++; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { int iVal2; memcpy(&iVal2, field, sizeof(iVal2)); origVal = iVal2; } else { if (isTrueWord(const_cast(field), fieldLength)) { strcpy(field, "1"); fieldLength = 1; } if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { // errno is initialized and set in convertDecimalString origVal = Convertor::convertDecimalString(field, fieldLength, column.scale); } else { errno = 0; origVal = strtol(field, 0, 10); } if (errno == ERANGE) bSatVal = true; } } // Saturate the value if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; // Update min/max range if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; iVal = (int)origVal; pVal = &iVal; } else { // date conversion int rc = 0; if (nullFlag) { if (column.fWithDefault) { iDate = column.fDefaultInt; // fall through to update saturation and min/max } else { iDate = joblist::DATENULL; pVal = &iDate; break; } } else { if (fImportDataMode != IMPORT_DATA_TEXT) { memcpy(&iDate, field, sizeof(iDate)); if (!dataconvert::DataConvert::isColumnDateValid(iDate)) rc = -1; } else { iDate = dataconvert::DataConvert::convertColumnDate(field, dataconvert::CALPONTDATE_ENUM, rc, fieldLength); } } if (rc == 0) { if (iDate < bufStats.minBufferVal) bufStats.minBufferVal = iDate; if (iDate > bufStats.maxBufferVal) bufStats.maxBufferVal = iDate; } else { iDate = 0; bufStats.satCount++; } pVal = &iDate; } break; } } memcpy(output, pVal, width); } //------------------------------------------------------------------------------ // Parse the contents of the Read buffer based on whether it is a dictionary // column or not. //------------------------------------------------------------------------------ int BulkLoadBuffer::parse(ColumnInfo& columnInfo) { int rc = NO_ERROR; // Rather than locking fSyncUpdatesBLB for the entire life of parse(), // we only briefly lock, and force a synchronization with the relevant // class variables from reader threads (by copying to Parser specific // variables). It should be okay to reference a copy of these variables // as no other thread should be changing them while we are in parse(). { boost::mutex::scoped_lock lock(fSyncUpdatesBLB); fTotalReadRowsParser = fTotalReadRows; fStartRowParser = fStartRow; if (fImportDataMode != IMPORT_DATA_PARQUET) { fDataParser = fData; fTokensParser = fTokens; } else { fParquetBatchParser = fParquetBatch; } fStartRowForLoggingParser = fStartRowForLogging; fAutoIncGenCountParser = fAutoIncGenCount; } // Bug806 - If buffer is empty then return early. if (fTotalReadRowsParser == 0) return rc; // If this is the first batch of rows, create the starting DB file // if this PM did not have a DB file (delayed file creation). RETURN_ON_ERROR(columnInfo.createDelayedFileIfNeeded(fTableName)); if (columnInfo.column.colType == COL_TYPE_DICT) { rc = parseDict(columnInfo); } else { if (fImportDataMode != IMPORT_DATA_PARQUET) rc = parseCol(columnInfo); else rc = parseColParquet(columnInfo); } return rc; } int BulkLoadBuffer::parseColParquet(ColumnInfo& columnInfo) { int rc = NO_ERROR; // Parse the data and fill up a buffer; which is written to output file uint32_t nRowsParsed; if (fLog->isDebug(DEBUG_2)) { ostringstream oss; oss << "ColResSecIn: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << fStartRowParser << " " << fTotalReadRowsParser; fLog->logMsg(oss.str(), MSGLVL_INFO2); } ColumnBufferSection* section = 0; RID lastInputRowInExtent; RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(fStartRowParser, fTotalReadRowsParser, nRowsParsed, §ion, lastInputRowInExtent)); unsigned int columnId = columnInfo.id; int64_t nullCount = 0; bool isNonAuxColumn = columnId < fNumberOfColumns - 1; if (isNonAuxColumn) { nullCount = fParquetBatchParser->column(columnId)->null_count(); } if (nRowsParsed > 0) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_COL); #endif // Reserve auto-increment numbers we need to generate if ((columnInfo.column.autoIncFlag) && (nullCount > 0)) { rc = columnInfo.reserveAutoIncNums(nullCount, fAutoIncNextValue); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseCol: error generating auto-increment values " "for table-" << fTableName << ", column-" << columnInfo.column.colName << "; OID-" << columnInfo.column.mapOid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); BulkLoad::addErrorMsg2BrmUpdater(fTableName, oss); return rc; } } // create a buffer for the size of the rows being written. unsigned char* buf = new unsigned char[fTotalReadRowsParser * columnInfo.column.width]; // Initialize min/max buffer values. We initialize to a sufficient // range to force the first value to automatically update the range. // If we are managing char data, minBufferVal and maxBufferVal are // maintained in reverse byte order to facilitate string comparisons BLBufferStats bufStats(columnInfo.column.dataType); bool updateCPInfoPendingFlag = false; // Point column data in one batch std::shared_ptr columnData; // not aux column if (isNonAuxColumn) { columnData = fParquetBatchParser->column(columnId); } else // aux column { try { arrow::NullBuilder nullBuilder; PARQUET_THROW_NOT_OK(nullBuilder.Reserve(fTotalReadRowsParser)); PARQUET_THROW_NOT_OK(nullBuilder.AppendNulls(fTotalReadRowsParser)); PARQUET_THROW_NOT_OK(nullBuilder.Finish(&columnData)); } catch (std::exception& ex) { ostringstream oss; oss << "Error in creating aux column when importing."; fLog->logMsg(oss.str(), ERR_PARQUET_AUX, MSGLVL_ERROR); return ERR_PARQUET_AUX; } } convertParquet(columnData, buf, columnInfo.column, bufStats, lastInputRowInExtent, columnInfo, updateCPInfoPendingFlag, section); if (updateCPInfoPendingFlag) { if (columnInfo.column.width <= 8) { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } else { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } } if (bufStats.satCount) // @bug 3504: increment row saturation count { // If we don't want to allow saturated values for auto inc columns. // then this is where we handle it. Too late to reject a single // row from the parsing thread, so we abort the job. // if (columnInfo.column.autoIncFlag) //{ // rc = ERR_AUTOINC_USER_OUT_OF_RANGE; // WErrorCodes ec; // ostringstream oss; // oss << "parseCol: error with auto-increment values " // "for table-" << fTableName << // ", column-" << columnInfo.column.colName << // "; OID-" << columnInfo.column.mapOid << // "; " << ec.errorString(rc); // fLog->logMsg( oss.str(), rc, MSGLVL_ERROR ); // return rc; //} columnInfo.incSaturatedCnt(bufStats.satCount); } section->write(buf, fTotalReadRowsParser); delete[] buf; #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_COL); #endif // TODO MCOL-641 Add support here. if (fLog->isDebug(DEBUG_2)) { ostringstream oss; RID rid1 = section->startRowId(); RID rid2 = section->endRowId(); oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows2: " << rid1 << " " << (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset() << "; lastExtentRow: " << lastInputRowInExtent; parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal); fLog->logMsg(oss.str(), MSGLVL_INFO2); } RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section)); } return rc; } //----------------------------------------------------------------------------------- // Convert arrow/parquet column data // columnData (in) - the input column data of one batch // column (in) - the column information // bufStats: // minBufferVal (in/out) - ongoing min value for the Read buffer we are parsing // maxBufferVal (in/out) - ongoing max value for the Read buffer we are parsing // satCount (in/out) - ongoing saturation row count for buffer being parsed // buf (out) - the parsed values take from columnData // fTotalReadRowsParser (in) - current batch size(row number) // fAutoIncNextValue (in) - first auto increment number of this batch //----------------------------------------------------------------------------------- void BulkLoadBuffer::convertParquet(std::shared_ptr columnData, unsigned char* buf, const JobColumn& column, BLBufferStats& bufStats, RID& lastInputRowInExtent, ColumnInfo& columnInfo, bool& updateCPInfoPendingFlag, ColumnBufferSection* section) { char biVal; int iVal; float fVal; double dVal; short siVal; void* pVal; int32_t iDate; long long llVal = 0, llDate = 0; int128_t bigllVal = 0; uint64_t tmp64; uint32_t tmp32; uint8_t ubiVal; uint16_t usiVal; uint32_t uiVal; uint64_t ullVal; int width = column.width; int bufIndex = 1; if (columnData->data()->buffers.size() < 2) { bufIndex = 0; } //-------------------------------------------------------------------------- // Parse based on column data type //-------------------------------------------------------------------------- switch (column.weType) { //---------------------------------------------------------------------- // FLOAT //---------------------------------------------------------------------- case WriteEngine::WR_FLOAT: { const float* dataPtr = columnData->data()->GetValues(bufIndex); for (uint32_t i = 0; i < fTotalReadRowsParser; i++) { void* p = buf + i * width; updateCPInfoPendingFlag = true; if (columnData->IsNull(i)) { if (column.fWithDefault) { fVal = column.fDefaultDbl; pVal = &fVal; } else { tmp32 = joblist::FLOATNULL; pVal = &tmp32; } } else { float minFltSat = column.fMinDblSat; float maxFltSat = column.fMaxDblSat; memcpy(&fVal, dataPtr + i, width); if (fVal > maxFltSat) { fVal = maxFltSat; bufStats.satCount++; } else if (fVal < minFltSat) { fVal = minFltSat; bufStats.satCount++; } pVal = &fVal; } memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // DOUBLE //---------------------------------------------------------------------- case WriteEngine::WR_DOUBLE: { const double* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { void* p = buf + i * width; if (columnData->IsNull(i)) { if (column.fWithDefault) { dVal = column.fDefaultDbl; pVal = &dVal; } else { tmp64 = joblist::DOUBLENULL; pVal = &tmp64; } } else { memcpy(&dVal, dataPtr + i, width); if (dVal > column.fMaxDblSat) { dVal = column.fMaxDblSat; bufStats.satCount++; } else if (dVal < column.fMinDblSat) { dVal = column.fMinDblSat; bufStats.satCount++; } pVal = &dVal; } memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // CHARACTER //---------------------------------------------------------------------- case WriteEngine::WR_CHAR: { std::shared_ptr binaryArray; std::shared_ptr fixedSizeBinaryArray; int tokenLen; if (columnData->type_id() != arrow::Type::type::FIXED_SIZE_BINARY) binaryArray = std::static_pointer_cast(columnData); else fixedSizeBinaryArray = std::static_pointer_cast(columnData); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { char charTmpBuf[MAX_COLUMN_BOUNDARY + 1] = {0}; void* p = buf + width * i; if (columnData->IsNull(i)) { if (column.fWithDefault) { int defLen = column.fDefaultChr.length(); const char* defData = column.fDefaultChr.str(); if (defLen > column.definedWidth) memcpy(charTmpBuf, defData, column.definedWidth); else memcpy(charTmpBuf, defData, defLen); } else { idbassert(width <= 8); for (int j = 0; j < width - 1; j++) { charTmpBuf[j] = '\377'; } charTmpBuf[width - 1] = '\376'; pVal = charTmpBuf; memcpy(p, pVal, width); continue; } } else { const uint8_t* data; if (binaryArray != nullptr) { data = binaryArray->GetValue(i, &tokenLen); } else { data = fixedSizeBinaryArray->GetValue(i); std::shared_ptr tType = fixedSizeBinaryArray->type(); tokenLen = tType->byte_width(); } const char* dataPtr = reinterpret_cast(data); if (tokenLen > column.definedWidth) { uint8_t truncate_point = utf8::utf8_truncate_point(dataPtr, column.definedWidth); memcpy(charTmpBuf, dataPtr, column.definedWidth - truncate_point); bufStats.satCount++; } else { memcpy(charTmpBuf, dataPtr, tokenLen); } } uint64_t compChar = uint64ToStr(*(reinterpret_cast(charTmpBuf))); int64_t binChar = static_cast(compChar); // Update min/max range uint64_t minVal = static_cast(bufStats.minBufferVal); uint64_t maxVal = static_cast(bufStats.maxBufferVal); if (compChar < minVal) bufStats.minBufferVal = binChar; if (compChar > maxVal) bufStats.maxBufferVal = binChar; pVal = charTmpBuf; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // SHORT INT //---------------------------------------------------------------------- case WriteEngine::WR_SHORT: { long long origVal; const short* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = column.fDefaultInt; } else { siVal = joblist::SMALLINTNULL; pVal = &siVal; memcpy(p, pVal, width); continue; } } else { origVal = fAutoIncNextValue++; } } else { if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { const int128_t* dataPtr1 = reinterpret_cast(dataPtr); origVal = *(dataPtr1 + i); } else { origVal = *(dataPtr + i); } } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; siVal = origVal; pVal = &siVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // UNSIGNED SHORT INT //---------------------------------------------------------------------- case WriteEngine::WR_USHORT: { int64_t origVal = 0; const uint16_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); } else { usiVal = joblist::USMALLINTNULL; pVal = &usiVal; memcpy(p, pVal, width); continue; } } else { origVal = fAutoIncNextValue++; } } else { origVal = *(dataPtr + i); } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; usiVal = origVal; pVal = &usiVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // TINY INT //---------------------------------------------------------------------- case WriteEngine::WR_BYTE: { long long origVal; // if use int8_t here, it will take 8 bool value of parquet array std::shared_ptr boolArray = std::static_pointer_cast(columnData); const int8_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = column.fDefaultInt; } else { biVal = joblist::TINYINTNULL; pVal = &biVal; memcpy(p, pVal, width); continue; } } else { origVal = fAutoIncNextValue++; } } else { if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { const int128_t* dataPtr1 = reinterpret_cast(dataPtr); origVal = *(dataPtr1 + i); } else if (columnData->type_id() == arrow::Type::type::BOOL) { origVal = boolArray->Value(i); } else { origVal = *(dataPtr + i); } } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; biVal = origVal; pVal = &biVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // UNSIGNED TINY INT //---------------------------------------------------------------------- case WriteEngine::WR_UBYTE: { int64_t origVal = 0; // special handling for aux column to fix segmentation error if (columnData->type_id() != arrow::Type::type::NA) { const uint8_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); } else { ubiVal = joblist::UTINYINTNULL; pVal = &ubiVal; memcpy(p, pVal, width); continue; } } else { origVal = fAutoIncNextValue++; } } else { origVal = *(dataPtr + i); } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; ubiVal = origVal; pVal = &ubiVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } else { for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; // this condition is for aux column which is default null // no auto increment here if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); } else { ubiVal = joblist::UTINYINTNULL; pVal = &ubiVal; memcpy(p, pVal, width); continue; } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; ubiVal = origVal; pVal = &ubiVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } break; } //---------------------------------------------------------------------- // BIG INT //---------------------------------------------------------------------- case WriteEngine::WR_LONGLONG: { if (column.dataType != CalpontSystemCatalog::DATETIME && column.dataType != CalpontSystemCatalog::TIMESTAMP && column.dataType != CalpontSystemCatalog::TIME) { const long long* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { void* p = buf + i * width; bool bSatVal = false; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { llVal = column.fDefaultInt; } else { llVal = joblist::BIGINTNULL; pVal = &llVal; memcpy(p, pVal, width); continue; } } else { llVal = fAutoIncNextValue++; } } else { if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { const int128_t* dataPtr1 = reinterpret_cast(dataPtr); llVal = *(dataPtr1 + i); } else { llVal = *(dataPtr + i); } } if (llVal < column.fMinIntSat) { llVal = column.fMinIntSat; bSatVal = true; } else if (llVal > static_cast(column.fMaxIntSat)) { llVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; // Update min/max range if (llVal < bufStats.minBufferVal) bufStats.minBufferVal = llVal; if (llVal > bufStats.maxBufferVal) bufStats.maxBufferVal = llVal; pVal = &llVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } else { datatypes::TypeAttributesStd dummyTypeAttribute; const auto* typeHandler = datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute); if (column.dataType == CalpontSystemCatalog::TIME) { // time conversion here int rc = 0; // for parquet, there are two time type, time32 and time64 // if it's time32, unit is millisecond, int32 if (columnData->type_id() == arrow::Type::type::TIME32 || columnData->type_id() == arrow::Type::type::NA) { std::shared_ptr timeArray = std::static_pointer_cast(columnData); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { void* p = buf + i * width; if (columnData->IsNull(i)) { if (column.fWithDefault) { llDate = column.fDefaultInt; } else { llDate = joblist::TIMENULL; pVal = &llDate; memcpy(p, pVal, width); continue; } } else { // timeVal is millisecond since midnight int32_t timeVal = timeArray->Value(i); const datatypes::TypeHandlerTime* typeHandlerTime = dynamic_cast(typeHandler); idbassert(typeHandlerTime); llDate = typeHandlerTime->convertArrowColumnTime32(timeVal, rc); } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { bufStats.satCount++; } pVal = &llDate; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } // if it's time64, unit is microsecond, int64 else if (columnData->type_id() == arrow::Type::type::TIME64) { std::shared_ptr timeArray = std::static_pointer_cast(columnData); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { void* p = buf + i * width; if (columnData->IsNull(i)) { if (column.fWithDefault) { llDate = column.fDefaultInt; } else { llDate = joblist::TIMENULL; pVal = &llDate; memcpy(p, pVal, width); continue; } } else { // timeVal is macrosecond since midnight int64_t timeVal = timeArray->Value(i); const datatypes::TypeHandlerTime* typeHandlerTime = dynamic_cast(typeHandler); idbassert(typeHandlerTime); llDate = typeHandlerTime->convertArrowColumnTime64(timeVal, rc); } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { bufStats.satCount++; } pVal = &llDate; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } } else if (column.dataType == CalpontSystemCatalog::TIMESTAMP) { // timestamp conversion here // default column type is TIMESTAMP // default unit is millisecond std::shared_ptr timeStampArray = std::static_pointer_cast(columnData); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { int rc = 0; void* p = buf + i * width; if (columnData->IsNull(i)) { if (column.fWithDefault) { llDate = column.fDefaultInt; } else { llDate = joblist::TIMESTAMPNULL; pVal = &llDate; memcpy(p, pVal, width); continue; } } else { int64_t timeVal = timeStampArray->Value(i); std::shared_ptr fType = std::static_pointer_cast(columnData->type()); const datatypes::TypeHandlerTimestamp* typeHandlerTimestamp = dynamic_cast(typeHandler); idbassert(typeHandlerTimestamp); if (fType->unit() == arrow::TimeUnit::MILLI) { llDate = typeHandlerTimestamp->convertArrowColumnTimestamp(timeVal, rc); } else { llDate = typeHandlerTimestamp->convertArrowColumnTimestampUs(timeVal, rc); } } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { llDate = 0; bufStats.satCount++; } pVal = &llDate; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } else { // datetime conversion here // default column type is TIMESTAMP std::shared_ptr dateTimeArray = std::static_pointer_cast(columnData); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { int rc = 0; void* p = buf + i * width; if (columnData->IsNull(i)) { if (column.fWithDefault) { llDate = column.fDefaultInt; } else { llDate = joblist::DATETIMENULL; pVal = &llDate; memcpy(p, pVal, width); continue; } } else { int64_t timeVal = dateTimeArray->Value(i); std::shared_ptr fType = std::static_pointer_cast(columnData->type()); const datatypes::TypeHandlerDatetime* typeHandlerDateTime = dynamic_cast(typeHandler); idbassert(typeHandlerDateTime); if (fType->unit() == arrow::TimeUnit::MILLI) { llDate = typeHandlerDateTime->convertArrowColumnDatetime(timeVal, rc); } else { llDate = typeHandlerDateTime->convertArrowColumnDatetimeUs(timeVal, rc); } } if (rc == 0) { if (llDate < bufStats.minBufferVal) bufStats.minBufferVal = llDate; if (llDate > bufStats.maxBufferVal) bufStats.maxBufferVal = llDate; } else { llDate = 0; bufStats.satCount++; } pVal = &llDate; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } } break; } //---------------------------------------------------------------------- // WIDE DECIMAL //---------------------------------------------------------------------- case WriteEngine::WR_BINARY: { std::shared_ptr decimalArray = std::static_pointer_cast(columnData); std::shared_ptr fType = std::static_pointer_cast(decimalArray->type()); const int128_t* dataPtr = decimalArray->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { void* p = buf + i * width; bool bSatVal = false; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { bigllVal = column.fDefaultWideDecimal; } else { bigllVal = datatypes::Decimal128Null; pVal = &bigllVal; memcpy(p, pVal, width); continue; } } else { bigllVal = fAutoIncNextValue++; } } else { // data imported should match the column data type, so directly // copy data here memcpy(&bigllVal, dataPtr + i, sizeof(int128_t)); } // Parquet data imported should fit its precision and // scale, so the data won't saturate if (bSatVal) bufStats.satCount++; if (bigllVal < bufStats.bigMinBufferVal) bufStats.bigMinBufferVal = bigllVal; if (bigllVal > bufStats.bigMaxBufferVal) bufStats.bigMaxBufferVal = bigllVal; pVal = &bigllVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // UNSIGNED BIG INT //---------------------------------------------------------------------- case WriteEngine::WR_ULONGLONG: { const uint64_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { ullVal = column.fDefaultUInt; } else { ullVal = joblist::UBIGINTNULL; pVal = &ullVal; memcpy(p, pVal, width); continue; } } else { ullVal = fAutoIncNextValue++; } } else { memcpy(&ullVal, dataPtr + i, width); } if (ullVal > column.fMaxIntSat) { ullVal = column.fMaxIntSat; bSatVal = true; } if (bSatVal) bufStats.satCount++; if (ullVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = static_cast(ullVal); if (ullVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = static_cast(ullVal); pVal = &ullVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // UNSIGNED MEDIUM INTEGER AND UNSIGNED INTEGER //---------------------------------------------------------------------- case WriteEngine::WR_UMEDINT: case WriteEngine::WR_UINT: { int64_t origVal; const uint32_t* dataPtr = columnData->data()->GetValues(bufIndex); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = static_cast(column.fDefaultUInt); } else { uiVal = joblist::UINTNULL; pVal = &uiVal; memcpy(p, pVal, width); continue; } } else { origVal = fAutoIncNextValue++; } } else { origVal = *(dataPtr + i); } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; // Update min/max range uint64_t uVal = origVal; if (uVal < static_cast(bufStats.minBufferVal)) bufStats.minBufferVal = origVal; if (uVal > static_cast(bufStats.maxBufferVal)) bufStats.maxBufferVal = origVal; uiVal = origVal; pVal = &uiVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } break; } //---------------------------------------------------------------------- // MEDIUM INTEGER AND INTEGER //---------------------------------------------------------------------- case WriteEngine::WR_MEDINT: case WriteEngine::WR_INT: default: { if (column.dataType != CalpontSystemCatalog::DATE) { const int* dataPtr = columnData->data()->GetValues(bufIndex); long long origVal; for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { bool bSatVal = false; void* p = buf + i * width; if (columnData->IsNull(i)) { if (!column.autoIncFlag) { if (column.fWithDefault) { origVal = column.fDefaultInt; } else { iVal = joblist::INTNULL; pVal = &iVal; memcpy(p, pVal, width); continue; } } else { origVal = fAutoIncNextValue++; } } else { if ((column.dataType == CalpontSystemCatalog::DECIMAL) || (column.dataType == CalpontSystemCatalog::UDECIMAL)) { const int128_t* dataPtr1 = reinterpret_cast(dataPtr); origVal = *(dataPtr1 + i); } else { origVal = *(dataPtr + i); } } if (origVal < column.fMinIntSat) { origVal = column.fMinIntSat; bSatVal = true; } else if (origVal > static_cast(column.fMaxIntSat)) { origVal = static_cast(column.fMaxIntSat); bSatVal = true; } if (bSatVal) bufStats.satCount++; if (origVal < bufStats.minBufferVal) bufStats.minBufferVal = origVal; if (origVal > bufStats.maxBufferVal) bufStats.maxBufferVal = origVal; iVal = (int)origVal; pVal = &iVal; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } else { // Parquet support. std::shared_ptr dateArray = std::static_pointer_cast(columnData); datatypes::TypeAttributesStd dummyTypeAttribute; const datatypes::TypeHandlerDate* typeHandlerDate = dynamic_cast( datatypes::TypeHandler::find(column.dataType, dummyTypeAttribute)); idbassert(typeHandlerDate); for (unsigned int i = 0; i < fTotalReadRowsParser; i++) { int rc = 0; void* p = buf + i * width; if (columnData->IsNull(i)) { if (column.fWithDefault) { iDate = column.fDefaultInt; } else { iDate = joblist::DATENULL; pVal = &iDate; memcpy(p, pVal, width); continue; } } else { int32_t dayVal = dateArray->Value(i); iDate = typeHandlerDate->convertArrowColumnDate(dayVal, rc); } if (rc == 0) { if (iDate < bufStats.minBufferVal) bufStats.minBufferVal = iDate; if (iDate > bufStats.maxBufferVal) bufStats.maxBufferVal = iDate; } else { iDate = 0; bufStats.satCount++; } pVal = &iDate; memcpy(p, pVal, width); updateCPInfoPendingFlag = true; if ((fStartRowParser + i) == lastInputRowInExtent) { updateCPMinMax(columnInfo, lastInputRowInExtent, bufStats, updateCPInfoPendingFlag, section, i); } } } } } } inline void BulkLoadBuffer::updateCPMinMax(ColumnInfo& columnInfo, RID& lastInputRowInExtent, BLBufferStats& bufStats, bool& updateCPInfoPendingFlag, ColumnBufferSection* section, uint32_t curRow) { if (columnInfo.column.width <= 8) { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } else { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } // TODO MCOL-641 Add support here. if (fLog->isDebug(DEBUG_2)) { ostringstream oss; oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows1: " << section->startRowId() << " " << curRow + 1 << "; lastExtentRow: " << lastInputRowInExtent; parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal); fLog->logMsg(oss.str(), MSGLVL_INFO2); } lastInputRowInExtent += columnInfo.rowsPerExtent(); if (isUnsigned(columnInfo.column.dataType)) { if (columnInfo.column.width <= 8) { bufStats.minBufferVal = static_cast(MAX_UBIGINT); bufStats.maxBufferVal = static_cast(MIN_UBIGINT); } else { bufStats.bigMinBufferVal = -1; bufStats.bigMaxBufferVal = 0; } updateCPInfoPendingFlag = false; } else { if (columnInfo.column.width <= 8) { bufStats.minBufferVal = MAX_BIGINT; bufStats.maxBufferVal = MIN_BIGINT; } else { utils::int128Max(bufStats.bigMinBufferVal); utils::int128Min(bufStats.bigMaxBufferVal); } updateCPInfoPendingFlag = false; } } //------------------------------------------------------------------------------ // Parse nonDictionary column Read buffer. Parsed row values are added to // fColBufferMgr, which stores them into an output buffer before writing them // out to the applicable column segment file. //------------------------------------------------------------------------------ int BulkLoadBuffer::parseCol(ColumnInfo& columnInfo) { int rc = NO_ERROR; // Parse the data and fill up a buffer; which is written to output file uint32_t nRowsParsed; if (fLog->isDebug(DEBUG_2)) { ostringstream oss; oss << "ColResSecIn: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << fStartRowParser << " " << fTotalReadRowsParser; fLog->logMsg(oss.str(), MSGLVL_INFO2); } ColumnBufferSection* section = 0; RID lastInputRowInExtent; RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(fStartRowParser, fTotalReadRowsParser, nRowsParsed, §ion, lastInputRowInExtent)); if (nRowsParsed > 0) { #ifdef PROFILE Stats::startParseEvent(WE_STATS_PARSE_COL); #endif // Reserve auto-increment numbers we need to generate if ((columnInfo.column.autoIncFlag) && (fAutoIncGenCountParser > 0)) { rc = columnInfo.reserveAutoIncNums(fAutoIncGenCountParser, fAutoIncNextValue); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseCol: error generating auto-increment values " "for table-" << fTableName << ", column-" << columnInfo.column.colName << "; OID-" << columnInfo.column.mapOid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); BulkLoad::addErrorMsg2BrmUpdater(fTableName, oss); return rc; } } // create a buffer for the size of the rows being written. unsigned char* buf = new unsigned char[fTotalReadRowsParser * columnInfo.column.width]; char* field = new char[MAX_FIELD_SIZE + 1]; // Initialize min/max buffer values. We initialize to a sufficient // range to force the first value to automatically update the range. // If we are managing char data, minBufferVal and maxBufferVal are // maintained in reverse byte order to facilitate string comparisons BLBufferStats bufStats(columnInfo.column.dataType); bool updateCPInfoPendingFlag = false; int tokenLength = 0; bool tokenNullFlag = false; for (uint32_t i = 0; i < fTotalReadRowsParser; ++i) { char* p = fDataParser + fTokensParser[i][columnInfo.id].start; if (fTokensParser[i][columnInfo.id].offset > 0) { memcpy(field, p, fTokensParser[i][columnInfo.id].offset); field[fTokensParser[i][columnInfo.id].offset] = '\0'; tokenLength = fTokensParser[i][columnInfo.id].offset; tokenNullFlag = false; } else { field[0] = '\0'; tokenLength = 0; tokenNullFlag = true; } // convert the data into appropriate format and update CP values convert(field, tokenLength, tokenNullFlag, buf + i * columnInfo.column.width, columnInfo.column, bufStats); updateCPInfoPendingFlag = true; // Update CP min/max if this is last row in this extent if ((fStartRowParser + i) == lastInputRowInExtent) { if (columnInfo.column.width <= 8) { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } else { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } // TODO MCOL-641 Add support here. if (fLog->isDebug(DEBUG_2)) { ostringstream oss; oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows1: " << section->startRowId() << " " << i + 1 << "; lastExtentRow: " << lastInputRowInExtent; parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal); fLog->logMsg(oss.str(), MSGLVL_INFO2); } lastInputRowInExtent += columnInfo.rowsPerExtent(); if (isUnsigned(columnInfo.column.dataType)) { if (columnInfo.column.width <= 8) { bufStats.minBufferVal = static_cast(MAX_UBIGINT); bufStats.maxBufferVal = static_cast(MIN_UBIGINT); } else { bufStats.bigMinBufferVal = -1; bufStats.bigMaxBufferVal = 0; } updateCPInfoPendingFlag = false; } else { if (columnInfo.column.width <= 8) { bufStats.minBufferVal = MAX_BIGINT; bufStats.maxBufferVal = MIN_BIGINT; } else { utils::int128Max(bufStats.bigMinBufferVal); utils::int128Min(bufStats.bigMaxBufferVal); } updateCPInfoPendingFlag = false; } } } if (updateCPInfoPendingFlag) { if (columnInfo.column.width <= 8) { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.minBufferVal, bufStats.maxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } else { columnInfo.updateCPInfo(lastInputRowInExtent, bufStats.bigMinBufferVal, bufStats.bigMaxBufferVal, columnInfo.column.dataType, columnInfo.column.width); } } if (bufStats.satCount) // @bug 3504: increment row saturation count { // If we don't want to allow saturated values for auto inc columns. // then this is where we handle it. Too late to reject a single // row from the parsing thread, so we abort the job. // if (columnInfo.column.autoIncFlag) //{ // rc = ERR_AUTOINC_USER_OUT_OF_RANGE; // WErrorCodes ec; // ostringstream oss; // oss << "parseCol: error with auto-increment values " // "for table-" << fTableName << // ", column-" << columnInfo.column.colName << // "; OID-" << columnInfo.column.mapOid << // "; " << ec.errorString(rc); // fLog->logMsg( oss.str(), rc, MSGLVL_ERROR ); // return rc; //} columnInfo.incSaturatedCnt(bufStats.satCount); } delete[] field; section->write(buf, fTotalReadRowsParser); delete[] buf; #ifdef PROFILE Stats::stopParseEvent(WE_STATS_PARSE_COL); #endif // TODO MCOL-641 Add support here. if (fLog->isDebug(DEBUG_2)) { ostringstream oss; RID rid1 = section->startRowId(); RID rid2 = section->endRowId(); oss << "ColRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows2: " << rid1 << " " << (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset() << "; lastExtentRow: " << lastInputRowInExtent; parseColLogMinMax(oss, columnInfo.column.dataType, bufStats.minBufferVal, bufStats.maxBufferVal); fLog->logMsg(oss.str(), MSGLVL_INFO2); } RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section)); } return rc; } //------------------------------------------------------------------------------ // Log the specified min/max buffer values to the log file. This is straight // forward for numeric types, but for character data, we have to reverse the // order of min/max values, because they are maintained in reverse order to // facilitate the comparison of character strings in an int64_t variable. //------------------------------------------------------------------------------ void BulkLoadBuffer::parseColLogMinMax(ostringstream& oss, ColDataType colDataType, int64_t minBufferVal, int64_t maxBufferVal) const { if (isCharType(colDataType)) { // Swap/restore byte order before printing character string int64_t minVal = static_cast(uint64ToStr(static_cast(minBufferVal))); int64_t maxVal = static_cast(uint64ToStr(static_cast(maxBufferVal))); char minValStr[sizeof(int64_t) + 1]; char maxValStr[sizeof(int64_t) + 1]; memcpy(minValStr, &minVal, sizeof(int64_t)); memcpy(maxValStr, &maxVal, sizeof(int64_t)); minValStr[sizeof(int64_t)] = '\0'; maxValStr[sizeof(int64_t)] = '\0'; oss << "; minVal: " << minVal << "; (" << minValStr << ")" << "; maxVal: " << maxVal << "; (" << maxValStr << ")"; } else if (isUnsigned(colDataType)) { oss << "; minVal: " << static_cast(minBufferVal) << "; maxVal: " << static_cast(maxBufferVal); } else { oss << "; minVal: " << minBufferVal << "; maxVal: " << maxBufferVal; } } //------------------------------------------------------------------------------ // Parse Dictionary column Read buffer. Parsed row values are added to // fColBufferMgr, which stores them into an output buffer before writing them // out to the applicable column segment (token) file. This gets a little sticky // here if the amount of data (in the column file) crosses an extent boundary. // In this case, we have to split up the tokens into 2 column segment files // and of course split up the corresponding strings into 2 different dictionary // store files as well. //------------------------------------------------------------------------------ int BulkLoadBuffer::parseDict(ColumnInfo& columnInfo) { int rc = NO_ERROR; uint32_t nRowsParsed1; rc = parseDictSection(columnInfo, 0, fStartRowParser, fTotalReadRowsParser, nRowsParsed1); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseDict: error parsing section1: " << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } //..If fTotalReadRows != nRowsParsed1 then reserveInSection() had to // split up our input buffer tokens because they spanned 2 extents. // After exiting reserveSection() above, we no longer have a mutex // lock on the sections in the internal buffer, so you might think // this could cause a race condition with more rows being added to // the buffer by other parsing threads, while we are busy wrapping // up the first extent and creating the second. But since reserve- // Section() only took some of the rows from the Read buffer, any // other threads should be blocked waiting for us to add the remain- // ing rows from "this" Read buffer into a new ColumnBufferSection. // The following condition wait in reserveSection() should be keeping // things stable: // while((fMaxRowId + 1) != startRowId) { // //Making sure that allocation are made in order // fOutOfSequence.wait(lock); // } if (fTotalReadRowsParser != nRowsParsed1) { if (fLog->isDebug(DEBUG_1)) { ostringstream oss; oss << "parseDict breaking up bufsec for OID-" << columnInfo.curCol.dataFile.fid << "; file-" << columnInfo.curCol.dataFile.fSegFileName << "; totalInRows-" << fTotalReadRowsParser << "; rowsFlushedToEndExtent-" << nRowsParsed1; fLog->logMsg(oss.str(), MSGLVL_INFO2); } //..Flush the rows in the buffer that fill up the current extent rc = columnInfo.fColBufferMgr->intermediateFlush(); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseDict: error flushing column: " << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } //..See if we just finished filling in the last extent for this seg- // ment token file, in which case we can truncate the corresponding // dictionary store segment file. (this only affects compressed data). uint16_t root = columnInfo.curCol.dataFile.fDbRoot; uint32_t pNum = columnInfo.curCol.dataFile.fPartition; uint16_t sNum = columnInfo.curCol.dataFile.fSegment; bool bFileComplete = columnInfo.isFileComplete(); //..Close the current segment file, and add an extent to the next // segment file in the rotation sequence. newSegmentFile is a // FILE* that points to the newly opened segment file. rc = columnInfo.fColBufferMgr->extendTokenColumn(); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseDict: error extending column: " << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } //..Close current dictionary store file and open the dictionary // store file that will match the newly opened column segment file. rc = columnInfo.closeDctnryStore(false); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseDict: error closing store file: " << " OID-" << columnInfo.column.dctnry.dctnryOid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } rc = columnInfo.openDctnryStore(false); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseDict: error opening store file: " << " OID-" << columnInfo.column.dctnry.dctnryOid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); // Ignore return code from closing file; already in error state columnInfo.closeDctnryStore(true); // clean up loose ends return rc; } //..Now we can add the remaining rows in the current Read buffer to // to the output buffer destined for the next extent we just added. uint32_t nRowsParsed2; rc = parseDictSection(columnInfo, nRowsParsed1, (fStartRowParser + nRowsParsed1), (fTotalReadRowsParser - nRowsParsed1), nRowsParsed2); if (rc != NO_ERROR) { WErrorCodes ec; ostringstream oss; oss << "parseDict: error parsing section2: " << " OID-" << columnInfo.curCol.dataFile.fid << "; " << ec.errorString(rc); fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); return rc; } //..We went ahead and completed all the necessary parsing to free up // the buffer we were working on, so that any blocked threads can // continue. In the mean time, this thread can now go back and // truncate the dctnry store file we just completed, if applicable. if (bFileComplete) { rc = columnInfo.truncateDctnryStore(columnInfo.column.dctnry.dctnryOid, root, pNum, sNum); if (rc != NO_ERROR) return rc; } } return rc; } //------------------------------------------------------------------------------ // Parses all or part of a Dictionary Read buffer into a ColumnBufferSection, // depending on whether the buffer crosses an extent boundary or not. If it // crosses such a boundary, then parseDictSection() will only parse the buffer // up to the end of the current extent. A second call to parseDictSection() // should be made to parse the remainder of the buffer into the second extent. //------------------------------------------------------------------------------ int BulkLoadBuffer::parseDictSection(ColumnInfo& columnInfo, int tokenPos, RID startRow, uint32_t totalReadRows, uint32_t& nRowsParsed) { int rc = NO_ERROR; if (fLog->isDebug(DEBUG_2)) { ostringstream oss; oss << "DctResSecIn: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << startRow << " " << totalReadRows; fLog->logMsg(oss.str(), MSGLVL_INFO2); } ColumnBufferSection* section = 0; RID lastInputRowInExtent = 0; RETURN_ON_ERROR(columnInfo.fColBufferMgr->reserveSection(startRow, totalReadRows, nRowsParsed, §ion, lastInputRowInExtent)); if (nRowsParsed > 0) { char* tokenBuf = new char[nRowsParsed * 8]; if (fImportDataMode != IMPORT_DATA_PARQUET) { // Pass fDataParser data and fTokensParser meta data to dictionary // to be parsed and tokenized, with tokens returned in tokenBuf. rc = columnInfo.updateDctnryStore(fDataParser, &fTokensParser[tokenPos], nRowsParsed, tokenBuf); } else { // Pass columnData and tokenPos data to dictionary to be parsed and tokenized // with tokens returned in tokenBuf. std::shared_ptr columnData = fParquetBatchParser->column(columnInfo.id); rc = columnInfo.updateDctnryStoreParquet(columnData, tokenPos, nRowsParsed, tokenBuf); } if (rc == NO_ERROR) { #if 0 int64_t* tokenVals = reinterpret_cast(tokenBuf); for (unsigned int j = 0; j < nRowsParsed; j++) { if (tokenVals[j] == 0) { ostringstream oss; oss << "Warning: 0 token being stored for OID-" << columnInfo.curCol.dataFile.fid << "; file-" << columnInfo.curCol.dataFile.fSegFileName << "; input row number-" << fStartRowForLoggingParser + j; fLog->logMsg( oss.str(), MSGLVL_INFO1 ); } } #endif section->write(tokenBuf, nRowsParsed); delete[] tokenBuf; if (fLog->isDebug(DEBUG_2)) { ostringstream oss; RID rid1 = section->startRowId(); RID rid2 = section->endRowId(); oss << "DctRelSecOut: OID-" << columnInfo.column.mapOid << "; StartRID/Rows: " << rid1 << " " << (rid2 - rid1) + 1 << "; startOffset: " << section->getStartOffset(); fLog->logMsg(oss.str(), MSGLVL_INFO2); } RETURN_ON_ERROR(columnInfo.fColBufferMgr->releaseSection(section)); } else { delete[] tokenBuf; } } return rc; } int BulkLoadBuffer::fillFromMemory(const BulkLoadBuffer& overFlowBufIn, const char* input, size_t length, size_t* parse_length, RID& totalReadRows, RID& correctTotalRows, const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall) { boost::mutex::scoped_lock lock(fSyncUpdatesBLB); reset(); copyOverflow(overFlowBufIn); size_t readSize = 0; // Copy the overflow data from the last buffer, that did not get written if (fOverflowSize != 0) { memcpy(fData, fOverflowBuf, fOverflowSize); if (fOverflowBuf != NULL) { delete[] fOverflowBuf; fOverflowBuf = NULL; } } readSize = fBufferSize - fOverflowSize; if (readSize > (length - *parse_length)) { readSize = length - *parse_length; } memcpy(fData + fOverflowSize, input + *parse_length, readSize); *parse_length += readSize; bool bEndOfData = false; if (length == *parse_length) { bEndOfData = true; } if (bEndOfData && // @bug 3516: Add '\n' if missing from last record (fImportDataMode == IMPORT_DATA_TEXT)) // Only applies to ascii mode { if ((fOverflowSize > 0) | (readSize > 0)) { if (fData[fOverflowSize + readSize - 1] != '\n') { // Should be safe to add byte to fData w/o risk of overflowing, // since we hit EOF. That should mean fread() did not read all // the bytes we requested, meaning we have room to add a byte. fData[fOverflowSize + readSize] = '\n'; readSize++; } } } // Lazy allocation of fToken memory as needed if (fTokens == 0) { resizeTokenArray(); } if ((readSize > 0) || (fOverflowSize > 0)) { if (fOverflowBuf != NULL) { delete[] fOverflowBuf; fOverflowBuf = NULL; } fReadSize = readSize + fOverflowSize; fStartRow = correctTotalRows; fStartRowForLogging = totalReadRows; if (fImportDataMode == IMPORT_DATA_TEXT) { tokenize(columnsInfo, allowedErrCntThisCall); } else { int rc = tokenizeBinary(columnsInfo, allowedErrCntThisCall, bEndOfData); if (rc != NO_ERROR) return rc; } // If we read a full buffer without hitting any new lines, then // terminate import because row size is greater than read buffer size. if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize)) { return ERR_BULK_ROW_FILL_BUFFER; } totalReadRows += fTotalReadRowsForLog; correctTotalRows += fTotalReadRows; } return NO_ERROR; } int BulkLoadBuffer::fillFromFileParquet(RID& totalReadRows, RID& correctTotalRows) { boost::mutex::scoped_lock lock(fSyncUpdatesBLB); reset(); try { fParquetBatch.reset(); PARQUET_THROW_NOT_OK(fParquetReader->ReadNext(&fParquetBatch)); fStartRow = correctTotalRows; fStartRowForLogging = totalReadRows; fTotalReadRows = fParquetBatch->num_rows(); fTotalReadRowsForLog = fParquetBatch->num_rows(); totalReadRows += fTotalReadRowsForLog; correctTotalRows += fTotalReadRows; } catch (std::exception& ex) { return ERR_FILE_READ_IMPORT; } return NO_ERROR; } //------------------------------------------------------------------------------ // Read the next set of rows from the input import file (for the specified // table), into "this" BulkLoadBuffer. // totalReadRows (input/output) - total row count from tokenize() (per file) // correctTotalRows (input/output) - total valid row count from tokenize() // (cumulative) //------------------------------------------------------------------------------ int BulkLoadBuffer::fillFromFile(const BulkLoadBuffer& overFlowBufIn, FILE* handle, RID& totalReadRows, RID& correctTotalRows, const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall) { boost::mutex::scoped_lock lock(fSyncUpdatesBLB); reset(); copyOverflow(overFlowBufIn); size_t readSize = 0; // Copy the overflow data from the last buffer, that did not get written if (fOverflowSize != 0) { memcpy(fData, fOverflowBuf, fOverflowSize); if (fOverflowBuf != NULL) { delete[] fOverflowBuf; fOverflowBuf = NULL; } } readSize = fread(fData + fOverflowSize, 1, fBufferSize - fOverflowSize, handle); if (ferror(handle)) { return ERR_FILE_READ_IMPORT; } bool bEndOfData = false; if (feof(handle)) bEndOfData = true; if (bEndOfData && // @bug 3516: Add '\n' if missing from last record (fImportDataMode == IMPORT_DATA_TEXT)) // Only applies to ascii mode { if ((fOverflowSize > 0) | (readSize > 0)) { if (fData[fOverflowSize + readSize - 1] != '\n') { // Should be safe to add byte to fData w/o risk of overflowing, // since we hit EOF. That should mean fread() did not read all // the bytes we requested, meaning we have room to add a byte. fData[fOverflowSize + readSize] = '\n'; readSize++; } } } // Lazy allocation of fToken memory as needed if (fTokens == 0) { resizeTokenArray(); } if ((readSize > 0) || (fOverflowSize > 0)) { if (fOverflowBuf != NULL) { delete[] fOverflowBuf; fOverflowBuf = NULL; } fReadSize = readSize + fOverflowSize; fStartRow = correctTotalRows; fStartRowForLogging = totalReadRows; if (fImportDataMode == IMPORT_DATA_TEXT) { tokenize(columnsInfo, allowedErrCntThisCall); } else { int rc = tokenizeBinary(columnsInfo, allowedErrCntThisCall, bEndOfData); if (rc != NO_ERROR) return rc; } // If we read a full buffer without hitting any new lines, then // terminate import because row size is greater than read buffer size. if ((fTotalReadRowsForLog == 0) && (fReadSize == fBufferSize)) { return ERR_BULK_ROW_FILL_BUFFER; } totalReadRows += fTotalReadRowsForLog; correctTotalRows += fTotalReadRows; } return NO_ERROR; } //------------------------------------------------------------------------------ // Parse the rows of data in "fData", saving the meta information that describes // the parsed data, in fTokens. If the number of read parsing errors for a // given call to tokenize() should exceed the value of "allowedErrCntThisCall", // then tokenize() will stop reading data and exit. // // We parse the data using the following state machine-like table. // Enclosed by character ("), escaped by character (\), and field delimiter // (|) can all be overridden; but we show default values in the state table. // // Character(s) found and action taken // // Current \" or " | \n other // State "" character // ----------------------------------------------------------------------- // LEADING_CHAR | n/a ENCLOSED endFld endFld NORMAL // TRAILING_CHAR | n/a n/a endFld endFld ignore // ENCLOSED | convert TRAIL n/a n/a n/a // NORMAL | n/a n/a endFld endFld n/a // ----------------------------------------------------------------------- // // n/a - not applicable; no check is made for this specific character in // this state // ENCLOSED - transition to ENCLOSED state // TRAIL - transition to TRAILING_CHAR state // NORMAL - transition to NORMAL state // convert - convert an escaped double quote (\" or "") to a single double // quote ("), and strip out the other character // // The initial parsing state for each column is LEADING_CHAR or NORMAL, // depending on whether the user has enabled the "enclosed by" feature. //------------------------------------------------------------------------------ void BulkLoadBuffer::tokenize(const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall) { unsigned offset = 0; // length of field unsigned curCol = 0; // dest db column counter within a row unsigned curFld = 0; // src input field counter within a row unsigned curRowNum = 0; // "total" number of rows read during this call unsigned curRowNum1 = 0; // number of "valid" rows inserted into fTokens char* p; // iterates thru each byte in the input buffer char c; // value of byte at address "p". char* lastRowHead = 0; // start of latest row being processed bool bValidRow = true; // track whether current row is valid bool bRowGenAutoInc = false; // track whether row uses generated auto-inc std::string validationErrMsg; // validation error msg (if any) for current row unsigned errorCount = 0; const char FIELD_DELIM_CHAR = fColDelim; const char STRING_ENCLOSED_CHAR = fEnclosedByChar; const char ESCAPE_CHAR = fEscapeChar; const char LINE_FEED = 0x0D; const char CARRIAGE_RETURN = 0x0A; // Variables used to store raw data read for a row; needed if we strip out // enclosed char(s) and later have to print original data in a *.bad file char* pRawDataRow = 0; unsigned rawDataRowCapacity = 0; unsigned rawDataRowLength = 0; const unsigned MIN_RAW_DATA_CAP = 1024; // Enable "enclosed by" checking if user specified an "enclosed by" char FieldParsingState initialState = FLD_PARSE_NORMAL_STATE; if (STRING_ENCLOSED_CHAR != '\0') { initialState = FLD_PARSE_LEADING_CHAR_STATE; } FieldParsingState fieldState = initialState; bool bNewLine = false; // Tracks new line unsigned start = 0; // Where next field starts in fData unsigned idxFrom = 0; // idxFrom and idxTo are used to strip out unsigned idxTo = 0; // escape characters in \" and "" // Initialize which field values are enclosed unsigned int enclosedFieldFlag = 0; #ifdef DEBUG_TOKEN_PARSING unsigned int enclosedFieldFlags[fNumberOfColumns]; memset(enclosedFieldFlags, 0, sizeof(unsigned) * fNumberOfColumns); #endif p = lastRowHead = fData; const char* pEndOfData = p + fReadSize; //@bug3810 set an end-of-data marker //-------------------------------------------------------------------------- // Loop through all the bytes in the read buffer in order to construct // the meta data stored in fTokens. //-------------------------------------------------------------------------- while (p < pEndOfData) { c = *p; // If we have stripped "enclosed" characters, then save raw data if (rawDataRowLength > 0) { if (rawDataRowLength == rawDataRowCapacity) // resize array if full { rawDataRowCapacity = rawDataRowCapacity * 2; resizeRowDataArray(&pRawDataRow, rawDataRowLength, rawDataRowCapacity); } pRawDataRow[rawDataRowLength] = c; rawDataRowLength++; } //---------------------------------------------------------------------- // Branch based on current parsing state for this field. // Note that we fall out of switch/case and do more processing if we // have hit end of column or line; else we "continue" directly to end // of loop to process the next byte. //---------------------------------------------------------------------- switch (fieldState) { //------------------------------------------------------------------ // FLD_PARSE_NORMAL_STATE // Field not enclosed in a string delimiter such as a double quote //------------------------------------------------------------------ case FLD_PARSE_NORMAL_STATE: { if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR)) { start = p - fData - offset; if (c == NEWLINE_CHAR) bNewLine = true; } else { offset++; p++; continue; // process next byte } break; } // If state is something other than FLD_PARSE_NORMAL_STATE, then // there is extra processing to allow for fields that may be en- // closed within a string delimiter (such as a double quote) //---------------------------------------------------------------- // FLD_PARSE_LEADING_CHAR_STATE //---------------------------------------------------------------- case FLD_PARSE_LEADING_CHAR_STATE: { bool bNewColumn = false; if (c == STRING_ENCLOSED_CHAR) { fieldState = FLD_PARSE_ENCLOSED_STATE; idxFrom = p - fData + 1; idxTo = idxFrom; start = idxTo; offset = 0; enclosedFieldFlag = 1; } else if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR)) { bNewColumn = true; start = p - fData; offset = 0; if (c == NEWLINE_CHAR) bNewLine = true; } else { fieldState = FLD_PARSE_NORMAL_STATE; start = p - fData; offset = 1; } if (!bNewColumn) { p++; continue; // process next byte } break; } //------------------------------------------------------------------ // FLD_PARSE_ENCLOSED_STATE //------------------------------------------------------------------ case FLD_PARSE_ENCLOSED_STATE: { if ((p + 1 < pEndOfData) && (((c == ESCAPE_CHAR) && ((*(p + 1) == STRING_ENCLOSED_CHAR) || (*(p + 1) == ESCAPE_CHAR) || (*(p + 1) == LINE_FEED) || (*(p + 1) == CARRIAGE_RETURN))) || ((c == STRING_ENCLOSED_CHAR) && (*(p + 1) == STRING_ENCLOSED_CHAR)))) { // Create/save original data before stripping out bytes if (rawDataRowLength == 0) { rawDataRowLength = (p + 1) - lastRowHead + 1; rawDataRowCapacity = rawDataRowLength * 2; if (rawDataRowCapacity < MIN_RAW_DATA_CAP) rawDataRowCapacity = MIN_RAW_DATA_CAP; pRawDataRow = new char[rawDataRowCapacity]; memcpy(pRawDataRow, lastRowHead, rawDataRowLength); } else { if (rawDataRowLength == rawDataRowCapacity) { // resize array if full rawDataRowCapacity = rawDataRowCapacity * 2; resizeRowDataArray(&pRawDataRow, rawDataRowLength, rawDataRowCapacity); } pRawDataRow[rawDataRowLength] = *(p + 1); rawDataRowLength++; } fData[idxTo] = *(p + 1); idxFrom += 2; idxTo++; offset++; p++; } else if (c == STRING_ENCLOSED_CHAR) { fieldState = FLD_PARSE_TRAILING_CHAR_STATE; } else { if (idxTo != idxFrom) fData[idxTo] = fData[idxFrom]; idxFrom++; idxTo++; offset++; } p++; continue; // process next byte } //------------------------------------------------------------------ // FLD_PARSE_TRAILING_CHAR_STATE // Ignore any trailing chars till we reach field or line delimiter. //------------------------------------------------------------------ case FLD_PARSE_TRAILING_CHAR_STATE: default: { if ((c == FIELD_DELIM_CHAR) || (c == NEWLINE_CHAR)) { if (c == NEWLINE_CHAR) bNewLine = true; } else { p++; continue; // process next byte } break; } } // end of switch on fieldState //---------------------------------------------------------------------- // Finished reading the bytes in the next source field. // See if source field is to be included (or ignored) //---------------------------------------------------------------------- if ((curFld < fNumFieldsInFile) && (fFieldList[curFld].fFldColType == BULK_FLDCOL_COLUMN_FIELD)) { //------------------------------------------------------------------ // Process destination column or end of row if source field is to // be included as part of output to database. //------------------------------------------------------------------ if (curCol < fNumColsInFile) { const JobColumn& jobCol = columnsInfo[curCol].column; // tmp code to test trailing space if (jobCol.dataType == CalpontSystemCatalog::CHAR) { // cout << "triming ... " << endl; char* tmp = p; while (tmp != lastRowHead && *(--tmp) == ' ') { // cout << "offset is " << offset <8000. Only reject numeric cols>1000 bytes else if ((fTokens[curRowNum1][curCol].offset > MAX_FIELD_SIZE) && (jobCol.colType != COL_TYPE_DICT) && (bValidRow)) { bValidRow = false; ostringstream ossErrMsg; ossErrMsg << INPUT_ERROR_TOO_LONG << "field " << (curFld + 1) << " longer than " << MAX_FIELD_SIZE << " bytes"; validationErrMsg = ossErrMsg.str(); } break; } } // end of switch on offset // @bug 4037: When cmd line option set, treat char // and varchar fields that are too long as errors if (getTruncationAsError() && bValidRow && (fTokens[curRowNum1][curCol].offset != COLPOSPAIR_NULL_TOKEN_OFFSET)) { if ((jobCol.dataType == CalpontSystemCatalog::VARCHAR || jobCol.dataType == CalpontSystemCatalog::CHAR) && (fTokens[curRowNum1][curCol].offset > jobCol.definedWidth)) { bValidRow = false; ostringstream ossErrMsg; ossErrMsg << INPUT_ERROR_STRING_TOO_LONG << "field " << (curFld + 1) << " longer than " << jobCol.definedWidth << " bytes"; validationErrMsg = ossErrMsg.str(); } } } // end of "if (offset)" else { fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; if (jobCol.autoIncFlag) bRowGenAutoInc = true; } // For non-autoincrement column, // Validate a NotNull column is supplied a value or a default if (!bRowGenAutoInc) { if ((jobCol.fNotNull) && (fTokens[curRowNum1][curCol].offset == COLPOSPAIR_NULL_TOKEN_OFFSET) && (!jobCol.fWithDefault) && (bValidRow)) { bValidRow = false; ostringstream ossErrMsg; ossErrMsg << INPUT_ERROR_NULL_CONSTRAINT << "; field " << (curFld + 1); validationErrMsg = ossErrMsg.str(); } } } // end if curCol < fNumberOfColumns curCol++; } curFld++; //---------------------------------------------------------------------- // End-of-row processing //---------------------------------------------------------------------- if (bNewLine) { // Debug: Dump next row that may or may not be accepted as // valid. Not a typo, that we print "curRowNum" as the row // number, but we use curRowNum1 as the index into fTokens. #ifdef DEBUG_TOKEN_PARSING std::cout << "Row " << curRowNum + 1 << ". fTokens: " << "(start,offset,enclosed)" << std::endl; unsigned kColCount = fNumColsInFile; if (curCol < kColCount) kColCount = curCol; for (unsigned int k = 0; k < kColCount; k++) { std::cout << " (" << fTokens[curRowNum1][k].start << "," << fTokens[curRowNum1][k].offset << "," << enclosedFieldFlags[k] << ") "; if (fTokens[curRowNum1][k].offset != COLPOSPAIR_NULL_TOKEN_OFFSET) { std::string outField(fData + fTokens[curRowNum1][k].start, fTokens[curRowNum1][k].offset); std::cout << " " << outField << std::endl; } else { std::cout << " " << std::endl; } } #endif curRowNum++; // increment total number of rows read int rowLength = p - lastRowHead + 1; // @bug 3146: Allow optional trailing value at end of input file // Don't count last column if no data after last delimiter, // and we don't need that last column. if ((offset == 0) && (curFld == (fNumFieldsInFile + 1))) { curFld--; } if ((curFld != fNumFieldsInFile) && (bValidRow)) { bValidRow = false; ostringstream ossErrMsg; ossErrMsg << INPUT_ERROR_WRONG_NO_COLUMNS << "; num fields expected-" << fNumFieldsInFile << "; num fields found-" << curFld; validationErrMsg = ossErrMsg.str(); } if (bValidRow) { // Initialize fTokens for tags not in input file if (fNumColsInFile < fNumberOfColumns) { for (unsigned int n = fNumColsInFile; n < fNumberOfColumns; n++) { fTokens[curRowNum1][n].start = 0; fTokens[curRowNum1][n].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; if (columnsInfo[n].column.autoIncFlag) bRowGenAutoInc = true; } } curRowNum1++; // increment valid row count if (bRowGenAutoInc) fAutoIncGenCount++; // update number of generated auto-incs } else { // Store validation error message to be logged if (rawDataRowLength == 0) { string tmp(lastRowHead, rowLength); fErrRows.push_back(tmp); } else { string tmp(pRawDataRow, rawDataRowLength); fErrRows.push_back(tmp); } fRowStatus.push_back(std::pair(fStartRowForLogging + curRowNum, validationErrMsg)); errorCount++; // Quit if we exceed max allowable errors for this call. // We set lastRowHead = p, so that the code that follows this // loop won't try to save any data in fOverflowBuf. if (errorCount > allowedErrCntThisCall) { lastRowHead = p + 1; p++; break; } } curCol = 0; curFld = 0; lastRowHead = p + 1; rawDataRowLength = 0; // Resize fTokens array if we are about to fill it up if (curRowNum1 >= fTotalRows) { resizeTokenArray(); } bNewLine = false; bValidRow = true; bRowGenAutoInc = false; #ifdef DEBUG_TOKEN_PARSING if (initialState == FLD_PARSE_LEADING_CHAR_STATE) memset(enclosedFieldFlags, 0, sizeof(unsigned) * fNumberOfColumns); #endif } // end of (bNewLine) offset = 0; fieldState = initialState; enclosedFieldFlag = 0; p++; } // end of (p < pEndOfData) loop to step thru the read buffer // Save any leftover data that we did not yet parse, into fOverflowBuf if (p > lastRowHead) { fOverflowSize = p - lastRowHead; fOverflowBuf = new char[fOverflowSize]; // If we stripped out any chars, be sure to preserve the original data if (rawDataRowLength == 0) memcpy(fOverflowBuf, lastRowHead, fOverflowSize); else memcpy(fOverflowBuf, pRawDataRow, fOverflowSize); } else { fOverflowSize = 0; fOverflowBuf = NULL; } fTotalReadRows = curRowNum1; // number of valid rows read fTotalReadRowsForLog = curRowNum; // total number of rows read if (pRawDataRow) delete[] pRawDataRow; } //------------------------------------------------------------------------------ // Resize the fTokens array used to store meta data about the input read buffer. // Used for initial allocation as well. //------------------------------------------------------------------------------ void BulkLoadBuffer::resizeTokenArray() { unsigned tmpTotalRows = 0; if (!fTokens) { tmpTotalRows = fBufferSize / 100; // Estimate the number of rows we can store in // one buffer by getting length of first record for (unsigned int k = 0; k < (fBufferSize - fOverflowSize); k++) { if (fData[k] == NEWLINE_CHAR) { tmpTotalRows = fBufferSize / (k + 1); break; } } } else { tmpTotalRows = (unsigned int)(fTotalRows * 1.25); // @bug 3478: Make sure token array is expanded. // If rows are loooong, then fTotalRows may be small (< 4), in which // a 1.25 factor won't increase the row count. So this check is here // to make sure we increase the row count in this case. if (tmpTotalRows <= fTotalRows) tmpTotalRows = fTotalRows * 2; } if (fLog->isDebug(DEBUG_1)) { std::string allocLabel("Re-Allocating"); if (!fTokens) allocLabel = "Allocating"; ostringstream oss; oss << "Table: " << fTableName << "; ReadBuffer: " << fBufferId << "; " << allocLabel << " ColValue metadata of size " << sizeof(ColPosPair) << " for " << tmpTotalRows << " rows and " << fNumberOfColumns << " columns "; fLog->logMsg(oss.str(), MSGLVL_INFO2); } ColPosPair** tmp; tmp = new ColPosPair*[tmpTotalRows]; if (fTokens) { memcpy(tmp, fTokens, sizeof(ColPosPair*) * fTotalRows); delete[] fTokens; } fTokens = tmp; // Allocate a ColPosPair array for each new row for (unsigned i = fTotalRows; i < tmpTotalRows; ++i) fTokens[i] = new ColPosPair[fNumberOfColumns]; fTotalRows = tmpTotalRows; } //@bug 5027: Add tokenizeBinary() and isBinaryFieldNull() for binary imports //------------------------------------------------------------------------------ // Alternatve version of tokenize() uesd to import fixed length records in // binary mode. // Parse the rows of data in "fData", saving the meta information that describes // the parsed data, in fTokens. If the number of read parsing errors for a // given call to tokenize() should exceed the value of "allowedErrCntThisCall", // then tokenize() will stop reading data and exit. //------------------------------------------------------------------------------ int BulkLoadBuffer::tokenizeBinary(const boost::ptr_vector& columnsInfo, unsigned int allowedErrCntThisCall, bool bEndOfData) { unsigned curCol = 0; // dest db column counter within a row unsigned curRowNum = 0; // "total" number of rows read during this call unsigned curRowNum1 = 0; // number of "valid" rows inserted into fTokens char* p; // iterates thru each field in the input buffer char* lastRowHead = 0; // start of latest row being processed bool bValidRow = true; // track whether current row is valid bool bRowGenAutoInc = false; // track whether row uses generated auto-inc std::string validationErrMsg; // validation error msg (if any) for current row unsigned errorCount = 0; int rc = NO_ERROR; p = lastRowHead = fData; ldiv_t rowcnt = ldiv(fReadSize, fFixedBinaryRecLen); //-------------------------------------------------------------------------- // Loop through all the bytes in the read buffer in order to construct // the meta data stored in fTokens. //-------------------------------------------------------------------------- for (long kRow = 0; kRow < rowcnt.quot; kRow++) { //---------------------------------------------------------------------- // Manage all the fields in a row //---------------------------------------------------------------------- for (unsigned int curFld = 0; curFld < fNumFieldsInFile; curFld++) { if (fFieldList[curFld].fFldColType == BULK_FLDCOL_COLUMN_FIELD) { const JobColumn& jobCol = columnsInfo[curCol].column; if (curCol < fNumColsInFile) { fTokens[curRowNum1][curCol].start = p - fData; fTokens[curRowNum1][curCol].offset = jobCol.definedWidth; // Special auto-increment case; treat 0 as null value if (jobCol.autoIncFlag) { if (memcmp(p, &NULL_AUTO_INC_0_BINARY, jobCol.definedWidth) == 0) { fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; bRowGenAutoInc = true; } } switch (jobCol.weType) { case WR_CHAR: { // Detect empty string for CHAR and VARCHAR if (*p == '\0') fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; break; } case WR_VARBINARY: { // Detect empty VARBINARY field int kk; for (kk = 0; kk < jobCol.definedWidth; kk++) { if (p[kk] != '\0') break; } if (kk >= jobCol.definedWidth) fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; break; } default: { // In BinaryAcceptNULL mode, check for NULL value if ((fTokens[curRowNum1][curCol].offset != COLPOSPAIR_NULL_TOKEN_OFFSET) && (fImportDataMode == IMPORT_DATA_BIN_ACCEPT_NULL)) { if (isBinaryFieldNull(p, jobCol.weType, jobCol.dataType)) { fTokens[curRowNum1][curCol].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; if (jobCol.autoIncFlag) bRowGenAutoInc = true; } } break; } } // end of switch (jobCol.weType) // Validate NotNull column is supplied a value or a default if ((jobCol.fNotNull) && (fTokens[curRowNum1][curCol].offset == COLPOSPAIR_NULL_TOKEN_OFFSET) && (!jobCol.fWithDefault) && (bValidRow)) { bValidRow = false; ostringstream ossErrMsg; ossErrMsg << INPUT_ERROR_NULL_CONSTRAINT << "; field " << (curFld + 1); validationErrMsg = ossErrMsg.str(); } } // end "if (curCol < fNumColsInFile)" p += jobCol.definedWidth; curCol++; } else { // This is where we would handle fields // if they were supported in Binary Import mode // p += ? } } // end of loop through fields in a row //---------------------------------------------------------------------- // End-of-row processing //---------------------------------------------------------------------- curRowNum++; // increment total number of rows read if (bValidRow) { // Initialize fTokens for tags not in input file if (fNumColsInFile < fNumberOfColumns) { for (unsigned int n = fNumColsInFile; n < fNumberOfColumns; n++) { fTokens[curRowNum1][n].start = 0; fTokens[curRowNum1][n].offset = COLPOSPAIR_NULL_TOKEN_OFFSET; if (columnsInfo[n].column.autoIncFlag) bRowGenAutoInc = true; } } curRowNum1++; // increment valid row count if (bRowGenAutoInc) fAutoIncGenCount++; // update number of generated auto-incs } else { // Store validation error message to be logged string tmp(lastRowHead, fFixedBinaryRecLen); fErrRows.push_back(tmp); fRowStatus.push_back(std::pair(fStartRowForLogging + curRowNum, validationErrMsg)); errorCount++; // Quit if we exceed max allowable errors for this call if (errorCount > allowedErrCntThisCall) break; } curCol = 0; lastRowHead += fFixedBinaryRecLen; // Resize fTokens array if we are about to fill it up if (curRowNum1 >= fTotalRows) { resizeTokenArray(); } bValidRow = true; bRowGenAutoInc = false; } // end of loop through the rows in the read buffer // Save any leftover data that we did not yet parse, into fOverflowBuf if (rowcnt.rem > 0) { if (bEndOfData) { rc = ERR_BULK_BINARY_PARTIAL_REC; ostringstream oss; oss << "Incomplete record (" << rowcnt.rem << " bytes) at end " "of import data; expected fixed length records of length " << fFixedBinaryRecLen << " bytes"; fLog->logMsg(oss.str(), rc, MSGLVL_ERROR); } else { fOverflowSize = rowcnt.rem; fOverflowBuf = new char[fOverflowSize]; memcpy(fOverflowBuf, (fData + fReadSize - rowcnt.rem), fOverflowSize); } } else { fOverflowSize = 0; fOverflowBuf = NULL; } fTotalReadRows = curRowNum1; // number of valid rows read fTotalReadRowsForLog = curRowNum; // total number of rows read return rc; } //------------------------------------------------------------------------------ // Compare the numeric value (val) against the relevant NULL value, based on // column type (ct and dt), to see whether the specified value is NULL. //------------------------------------------------------------------------------ bool BulkLoadBuffer::isBinaryFieldNull(void* val, WriteEngine::ColType ct, execplan::CalpontSystemCatalog::ColDataType dt) { bool isNullFlag = false; switch (ct) { case WriteEngine::WR_BYTE: { if ((*(uint8_t*)val) == joblist::TINYINTNULL) isNullFlag = true; break; } case WriteEngine::WR_SHORT: { if ((*(uint16_t*)val) == joblist::SMALLINTNULL) isNullFlag = true; break; } case WriteEngine::WR_INT: { if (dt == execplan::CalpontSystemCatalog::DATE) { if ((*(uint32_t*)val) == joblist::DATENULL) isNullFlag = true; } else { if ((*(uint32_t*)val) == joblist::INTNULL) isNullFlag = true; } break; } case WriteEngine::WR_LONGLONG: { if (dt == execplan::CalpontSystemCatalog::DATETIME) { if ((*(uint64_t*)val) == joblist::DATETIMENULL) isNullFlag = true; } else if (dt == execplan::CalpontSystemCatalog::TIMESTAMP) { if ((*(uint64_t*)val) == joblist::TIMESTAMPNULL) isNullFlag = true; } else if (dt == execplan::CalpontSystemCatalog::TIME) { if ((*(uint64_t*)val) == joblist::TIMENULL) isNullFlag = true; } else { if ((*(uint64_t*)val) == joblist::BIGINTNULL) isNullFlag = true; } break; } case WriteEngine::WR_FLOAT: { if ((*(uint32_t*)val) == joblist::FLOATNULL) isNullFlag = true; break; } case WriteEngine::WR_DOUBLE: { if ((*(uint64_t*)val) == joblist::DOUBLENULL) isNullFlag = true; break; } // Detect empty string for CHAR and VARCHAR case WriteEngine::WR_CHAR: { // not applicable break; } // Detect empty VARBINARY field case WriteEngine::WR_VARBINARY: { // not applicable break; } case WriteEngine::WR_UBYTE: { if ((*(uint8_t*)val) == joblist::UTINYINTNULL) isNullFlag = true; break; } case WriteEngine::WR_USHORT: { if ((*(uint16_t*)val) == joblist::USMALLINTNULL) isNullFlag = true; break; } case WriteEngine::WR_UINT: { if ((*(uint32_t*)val) == joblist::UINTNULL) isNullFlag = true; break; } case WriteEngine::WR_ULONGLONG: { if ((*(uint64_t*)val) == joblist::UBIGINTNULL) isNullFlag = true; break; } case WriteEngine::WR_BINARY: { if ((*((int128_t*)val)) == datatypes::Decimal128Null) isNullFlag = true; break; } default: { break; } } return isNullFlag; } //------------------------------------------------------------------------------ // Sets the column status. // returns TRUE if all columns in the buffer are complete. // // Note that fSyncUpdatesTI mutex is used to synchronize usage of fColumnLocks // and fParseComplete from both read and parse threads. // // setColumnStatus() and tryAndLockColumn() formerly used fSyncUpdatesBLB mutex. // But this seemed inconsistent because resetColumnLocks(), getColumnStatus(), // and getColumnLocker() were not using this mutex. In researching the idea of // adding fSyncUpdatesBLB locks to these functions, I determined, that all the // calls to the following functions were protected by a fSyncUpdatesTI mutex: // setColumnStatus() // tryAndLockColumn() // resetColumnLocks() // getColumnStatus() // getColumnLocker() // So I added this note and removed the extraneous fSyncUpdatesBLB lock from // setColumnStatus() and tryAndLockColumn(). (dmc-07/19/2009) //------------------------------------------------------------------------------ bool BulkLoadBuffer::setColumnStatus(const int& columnId, const Status& status) { fColumnLocks[columnId].status = status; if (status == WriteEngine::PARSE_COMPLETE) fParseComplete++; if (fParseComplete == fNumberOfColumns) return true; return false; } //------------------------------------------------------------------------------ // Note that fSyncUpdatesTI mutex is used to synchronize usage of fColumnLocks // and fParseComplete from both read and parse threads. //------------------------------------------------------------------------------ bool BulkLoadBuffer::tryAndLockColumn(const int& columnId, const int& id) { if ((fColumnLocks[columnId].status != WriteEngine::PARSE_COMPLETE) && (fColumnLocks[columnId].locker == -1)) { fColumnLocks[columnId].locker = id; return true; } return false; } } // namespace WriteEngine