1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-20 09:07:44 +03:00
Gagan Goel 931f2b36a1
MCOL-4931 Make cpimport charset-aware. (#2938)
1. Extend the following CalpontSystemCatalog member functions to
   set CalpontSystemCatalog::ColType::charsetNumber, after the
   system catalog update to add charset number to calpontsys.syscolumn
   in MCOL-5005:
     CalpontSystemCatalog::lookupOID
     CalpontSystemCatalog::colType
     CalpontSystemCatalog::columnRIDs
     CalpontSystemCatalog::getSchemaInfo

2. Update cpimport to use the CHARSET_INFO object associated with the
   charset number retrieved from the system catalog, for a
   dictionary/non-dictionary CHAR/VARCHAR/TEXT column, to truncate
   long strings that exceed the target column character length.

3. Add MTR test cases.
2023-09-05 17:17:20 +03:00

1315 lines
43 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/*******************************************************************************
* $Id: we_xmljob.cpp 4579 2013-03-19 23:16:54Z dhall $
*
*******************************************************************************/
/** @file */
#include "mcs_basic_types.h"
#define WRITEENGINEXMLJOB_DLLEXPORT
#include "we_xmljob.h"
#undef WRITEENGINEXMLJOB_DLLEXPORT
#include <limits>
#include <sstream>
#include <unistd.h>
#include <stdexcept>
#include <cstdlib>
#include <set>
#include "we_config.h"
#include "we_log.h"
#include "we_convertor.h"
#include "dataconvert.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/filesystem/convenience.hpp>
#include <sys/time.h>
using namespace std;
using namespace execplan;
namespace WriteEngine
{
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
XMLJob::XMLJob()
: fDebugLevel(DEBUG_0)
, fDeleteTempFile(false)
, fValidateColList(true)
, fTimeZone(dataconvert::systemTimeZoneOffset())
{
}
//------------------------------------------------------------------------------
// Default Destructor
// Delete temporary Job XML file if applicable.
//------------------------------------------------------------------------------
XMLJob::~XMLJob()
{
if ((fDeleteTempFile) && (!fJobFileName.empty()))
{
unlink(fJobFileName.c_str());
}
}
//------------------------------------------------------------------------------
// Load a job xml file
// fileName - name of file to load
// bTempFile - are we loading a temporary file (that destructor should delete)
// bValidateColumnList - validate that all db columns have an XML tag
// returns NO_ERROR if success; other if fail
//------------------------------------------------------------------------------
int XMLJob::loadJobXmlFile(const string& fileName, bool bTempFile, bool bValidateColumnList, string& errMsg)
{
int rc;
fDeleteTempFile = bTempFile;
fJobFileName = fileName;
fValidateColList = bValidateColumnList;
try
{
rc = parseDoc(fileName.c_str());
if (rc != NO_ERROR)
return rc;
}
catch (exception& ex)
{
errMsg = ex.what();
return ERR_XML_PARSE;
}
return rc;
}
//------------------------------------------------------------------------------
// Print contents of fJob to the specified logger object.
// logger - Log object to use in logging
//------------------------------------------------------------------------------
void XMLJob::printJobInfo(Log& logger) const
{
const Job& job = fJob;
ostringstream oss1;
oss1 << "Job " << job.id << " input\n";
oss1 << "===============================================" << endl;
oss1 << "Name : " << job.name << endl;
oss1 << "Desc : " << job.desc << endl;
oss1 << "User : " << job.userName << endl;
oss1 << "Delim: " << job.fDelimiter << endl;
oss1 << "Enclosed By : ";
if (job.fEnclosedByChar)
oss1 << job.fEnclosedByChar << endl;
else
oss1 << "n/a" << endl;
oss1 << "Escape Char : ";
if (job.fEscapeChar)
oss1 << job.fEscapeChar << endl;
else
oss1 << "n/a" << endl;
oss1 << "Read Buffers: " << job.numberOfReadBuffers << endl;
oss1 << "Read Buffer Size: " << job.readBufferSize << endl;
oss1 << "setvbuf Size: " << job.writeBufferSize << endl;
oss1 << "Create Date : " << job.createDate << endl;
oss1 << "Create Time : " << job.createTime << endl;
oss1 << "Schema Name : " << job.schema << endl;
oss1 << "Num Tables : " << job.jobTableList.size() << endl;
logger.logMsg(oss1.str(), MSGLVL_INFO2);
for (unsigned int i = 0; i < job.jobTableList.size(); i++)
{
const JobTable& jobTable = job.jobTableList[i];
ostringstream oss2;
oss2 << "\n-------------------------------------------------" << endl;
oss2 << "\tTable Name : " << jobTable.tblName << endl;
oss2 << "\tTable OID : " << jobTable.mapOid << endl;
oss2 << "\tTable Load Name : " << jobTable.loadFileName << endl;
oss2 << "\tMax Err Num : " << jobTable.maxErrNum << endl;
const JobColList& colList = jobTable.colList;
oss2 << "\tNum of Columns : " << colList.size() << endl;
logger.logMsg(oss2.str(), MSGLVL_INFO2);
// Note that we don't print JobColumn.dataType because it is not carried
// in the XML file. dataType is assigned/used internally by bulkload.
for (unsigned int j = 0; j < jobTable.fFldRefs.size(); j++)
{
unsigned idx = jobTable.fFldRefs[j].fArrayIndex;
BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
const JobColumn& jobCol =
((fldColType == BULK_FLDCOL_IGNORE_FIELD) ? jobTable.fIgnoredFields[idx] : jobTable.colList[idx]);
ostringstream oss3;
oss3 << "\n\t****************************************" << endl;
if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
oss3 << "\t\tDefaultColumn Name: " << jobCol.colName << endl;
else
oss3 << "\t\tColumn Name : " << jobCol.colName << endl;
oss3 << "\t\tColumn OID : " << jobCol.mapOid << endl;
oss3 << "\t\tColumn type name : " << jobCol.typeName << endl;
oss3 << "\t\tColumn width : " << jobCol.width << endl;
oss3 << "\t\tColumn Not Null : " << jobCol.fNotNull << endl;
oss3 << "\t\tColumn WithDefault: " << jobCol.fWithDefault << endl;
oss3 << "\t\tColumn type : " << jobCol.colType << endl;
oss3 << "\t\tColumn comp type : " << jobCol.compressionType << endl;
oss3 << "\t\tColumn autoInc : " << jobCol.autoIncFlag << endl;
if (jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL])
{
oss3 << "\t\tColumn Precision : " << jobCol.precision << endl;
oss3 << "\t\tColumn Scale : " << jobCol.scale << endl;
}
if (jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL])
{
oss3 << "\t\tColumn Precision : " << jobCol.precision << endl;
oss3 << "\t\tColumn Scale : " << jobCol.scale << endl;
}
if (jobCol.colType == 'D')
{
oss3 << "\t\tDictionary Oid : " << jobCol.dctnry.dctnryOid << endl;
}
logger.logMsg(oss3.str(), MSGLVL_INFO2);
} // end of loop through columns in a table
} // end of loop through tables
}
//------------------------------------------------------------------------------
// Print brief contents of specified Job to specified logger object.
// logger - Log object to use in logging
//------------------------------------------------------------------------------
void XMLJob::printJobInfoBrief(Log& logger) const
{
const Job& job = fJob;
ostringstream oss1;
oss1 << "XMLJobFile: Delim(" << job.fDelimiter << "); EnclosedBy(";
if (job.fEnclosedByChar)
oss1 << job.fEnclosedByChar;
else
oss1 << "n/a";
oss1 << "); EscapeChar(";
if (job.fEscapeChar)
oss1 << job.fEscapeChar;
else
oss1 << "n/a";
oss1 << "); ReadBufs(" << job.numberOfReadBuffers << "); ReadBufSize(" << job.readBufferSize
<< "); setvbufSize(" << job.writeBufferSize << ')';
logger.logMsg(oss1.str(), MSGLVL_INFO2);
for (unsigned int i = 0; i < job.jobTableList.size(); i++)
{
const JobTable& jobTable = job.jobTableList[i];
ostringstream oss2;
oss2 << " Table(" << jobTable.tblName << "); OID(" << jobTable.mapOid << ')' << "; MaxErrNum("
<< jobTable.maxErrNum << ')';
logger.logMsg(oss2.str(), MSGLVL_INFO2);
for (unsigned int j = 0; j < jobTable.fFldRefs.size(); j++)
{
unsigned idx = jobTable.fFldRefs[j].fArrayIndex;
BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
const JobColumn& jobCol =
((fldColType == BULK_FLDCOL_IGNORE_FIELD) ? jobTable.fIgnoredFields[idx] : jobTable.colList[idx]);
ostringstream oss3;
if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
oss3 << " DefaultColumn(" << jobCol.colName;
else
oss3 << " Column(" << jobCol.colName;
oss3 << "); OID(" << jobCol.mapOid << "); Type(" << jobCol.typeName << "); Width(" << jobCol.width
<< "); Comp(" << jobCol.compressionType;
if (jobCol.colType == 'D')
oss3 << "); DctnryOid(" << jobCol.dctnry.dctnryOid;
oss3 << ')';
if (jobCol.autoIncFlag)
oss3 << "; autoInc";
if (jobCol.fNotNull)
oss3 << "; NotNull";
if (jobCol.fWithDefault)
oss3 << "; WithDefault";
logger.logMsg(oss3.str(), MSGLVL_INFO2);
}
} // end of for( int i
}
//------------------------------------------------------------------------------
// Process a node
// pNode - current node
// returns TRUE if success, FALSE otherwise
//------------------------------------------------------------------------------
bool XMLJob::processNode(xmlNode* pNode)
{
if (isTag(pNode, TAG_BULK_JOB))
{
// no work for the BulkJob tag
}
else if (isTag(pNode, TAG_CREATE_DATE))
setJobData(pNode, TAG_CREATE_DATE, true, TYPE_CHAR);
else if (isTag(pNode, TAG_CREATE_TIME))
setJobData(pNode, TAG_CREATE_TIME, true, TYPE_CHAR);
else if (isTag(pNode, TAG_COLUMN))
setJobData(pNode, TAG_COLUMN, false, TYPE_EMPTY);
else if (isTag(pNode, TAG_DEFAULT_COLUMN))
setJobData(pNode, TAG_DEFAULT_COLUMN, false, TYPE_EMPTY);
else if (isTag(pNode, TAG_DESC))
setJobData(pNode, TAG_DESC, true, TYPE_CHAR);
else if (isTag(pNode, TAG_ID))
setJobData(pNode, TAG_ID, true, TYPE_INT);
else if (isTag(pNode, TAG_IGNORE_FIELD))
setJobData(pNode, TAG_IGNORE_FIELD, false, TYPE_EMPTY);
else if (isTag(pNode, TAG_NAME))
setJobData(pNode, TAG_NAME, true, TYPE_CHAR);
else if (isTag(pNode, TAG_PATH))
setJobData(pNode, TAG_PATH, true, TYPE_CHAR);
else if (isTag(pNode, TAG_TABLE))
setJobData(pNode, TAG_TABLE, false, TYPE_EMPTY);
else if (isTag(pNode, TAG_TYPE))
setJobData(pNode, TAG_TYPE, true, TYPE_CHAR);
else if (isTag(pNode, TAG_USER))
setJobData(pNode, TAG_USER, true, TYPE_CHAR);
else if (isTag(pNode, TAG_SCHEMA))
setJobData(pNode, TAG_SCHEMA, false, TYPE_EMPTY);
else if (isTag(pNode, TAG_READ_BUFFERS))
setJobData(pNode, TAG_READ_BUFFERS, false, TYPE_EMPTY);
else if (isTag(pNode, TAG_WRITE_BUFFER_SIZE))
setJobData(pNode, TAG_WRITE_BUFFER_SIZE, true, TYPE_INT);
else if (isTag(pNode, TAG_DELIMITER))
setJobData(pNode, TAG_DELIMITER, true, TYPE_CHAR);
else if (isTag(pNode, TAG_ENCLOSED_BY_CHAR))
setJobData(pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR);
else if (isTag(pNode, TAG_ESCAPE_CHAR))
setJobData(pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR);
else
{
ostringstream oss;
oss << "Unrecognized TAG in Job XML file: <" << pNode->name << ">";
throw runtime_error(oss.str());
}
if (XMLOp::processNode(pNode))
{
if (isTag(pNode, TAG_TABLE))
{
postProcessTableNode();
}
}
else
{
return false;
}
return true;
}
//------------------------------------------------------------------------------
// Generic setter
// pNode - current node
// tag - xml tag
// bExpectContent - should node content be present to process
// tagType - data type
//------------------------------------------------------------------------------
void XMLJob::setJobData(xmlNode* pNode, const xmlTag tag, bool bExpectContent, XML_DTYPE tagType)
{
int intVal = 0;
long long llVal = 0;
std::string bufString;
bool bSuccess = false;
if (bExpectContent)
{
if (tagType == TYPE_INT)
bSuccess = getNodeContent(pNode, &intVal, TYPE_INT);
else // longlong
if (tagType == TYPE_LONGLONG)
bSuccess = getNodeContent(pNode, &llVal, TYPE_LONGLONG);
else // char
if (tagType == TYPE_CHAR)
bSuccess = getNodeContentStr(pNode, bufString);
if (!bSuccess)
return;
}
// process tag content and attributes
switch (tag)
{
case TAG_READ_BUFFERS: setReadBuffers(pNode); break;
case TAG_COLUMN: setJobDataColumn(pNode, false); break;
case TAG_CREATE_DATE: fJob.createDate = bufString; break;
case TAG_CREATE_TIME: fJob.createTime = bufString; break;
case TAG_DEFAULT_COLUMN: setJobDataColumn(pNode, true); break;
case TAG_DESC: fJob.desc = bufString; break;
case TAG_ID: fJob.id = intVal; break;
case TAG_IGNORE_FIELD: setJobDataIgnoreField(); break;
case TAG_NAME: fJob.name = bufString; break;
case TAG_PATH:
// no action necessary, but keep for backwards compatability
break;
case TAG_TABLE: setJobDataTable(pNode); break;
case TAG_TYPE:
// no action necessary, but keep for backwards compatability
break;
case TAG_USER: fJob.userName = bufString; break;
case TAG_SCHEMA: setSchema(pNode); break;
case TAG_WRITE_BUFFER_SIZE: fJob.writeBufferSize = intVal; break;
case TAG_DELIMITER:
{
const char* buf = bufString.c_str();
if ((!strcmp(buf, "\\t")) || (!strcmp(buf, "'\\t'")))
{
fJob.fDelimiter = '\t';
}
else
{
fJob.fDelimiter = bufString[0];
}
break;
}
case TAG_ENCLOSED_BY_CHAR:
{
fJob.fEnclosedByChar = bufString[0];
break;
}
case TAG_ESCAPE_CHAR:
{
fJob.fEscapeChar = bufString[0];
break;
}
default: break;
}
}
//------------------------------------------------------------------------------
// Set table information parms.
// pNode - current node
//------------------------------------------------------------------------------
void XMLJob::setJobDataTable(xmlNode* pNode)
{
int intVal;
std::string bufString;
JobTable curTable;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_ORIG_NAME], bufString))
curTable.tblName = bufString;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_TBL_NAME], bufString))
curTable.tblName = bufString;
if (curTable.tblName.empty())
{
throw runtime_error("Required table name attribute (tblName) missing from Table tag");
}
if (getNodeAttribute(pNode, xmlTagTable[TAG_TBL_OID], &intVal, TYPE_INT))
curTable.mapOid = intVal;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_LOAD_NAME], bufString))
curTable.loadFileName = bufString;
if (getNodeAttribute(pNode, xmlTagTable[TAG_MAX_ERR_ROW], &intVal, TYPE_INT))
curTable.maxErrNum = intVal;
fJob.jobTableList.push_back(curTable);
}
//------------------------------------------------------------------------------
// Set column information parms.
// pNode - current node
// bDefaultCol - is this a <DefaultColumn> tag
//
// Note on Supported Tags: (Bug 2828)
// Note that the "notnull" and "defaultValue" attribute tags are not recognized
// by this function because by the time we added support for these tags, we had
// changed to only store the table and column names in the XML file. Much of
// the functionality in setJobDataColumn() is only present to provide backwards
// compatability for an old Job XML file that a user might still be using.
//
// Any other new tags probably don't need adding to setJobDataColumn() either,
// for the same reason.
//------------------------------------------------------------------------------
void XMLJob::setJobDataColumn(xmlNode* pNode, bool bDefaultCol)
{
int intVal;
std::string bufString;
JobColumn curColumn;
if (fJob.jobTableList.size() == 0)
return;
int tableNo = fJob.jobTableList.size() - 1;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_ORIG_NAME], bufString))
curColumn.colName = bufString;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_COL_NAME], bufString))
curColumn.colName = bufString;
if (curColumn.colName.empty())
{
ostringstream oss;
oss << "Required column name attribute (colName) missing from "
"Column tag for table "
<< fJob.jobTableList[tableNo].tblName;
throw runtime_error(oss.str());
}
if (getNodeAttribute(pNode, xmlTagTable[TAG_COL_OID], &intVal, TYPE_INT))
curColumn.mapOid = intVal;
if (getNodeAttribute(pNode, xmlTagTable[TAG_WIDTH], &intVal, TYPE_INT))
{
curColumn.width = intVal;
curColumn.definedWidth = intVal; //@Bug 3040
}
if (getNodeAttribute(pNode, xmlTagTable[TAG_PRECISION], &intVal, TYPE_INT))
curColumn.precision = intVal;
if (getNodeAttribute(pNode, xmlTagTable[TAG_SCALE], &intVal, TYPE_INT))
curColumn.scale = intVal;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_DATA_TYPE], bufString))
curColumn.typeName = bufString;
if (getNodeAttribute(pNode, xmlTagTable[TAG_COMPRESS_TYPE], &intVal, TYPE_INT))
{
curColumn.compressionType = intVal;
curColumn.dctnry.fCompressionType = intVal;
}
if (getNodeAttribute(pNode, xmlTagTable[TAG_AUTOINCREMENT_FLAG], &intVal, TYPE_INT))
{
if (intVal)
curColumn.autoIncFlag = true;
else
curColumn.autoIncFlag = false;
}
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_COL_TYPE], bufString))
{
const char* buf = bufString.c_str();
if (!strcmp(buf, "D"))
{
curColumn.colType = 'D';
// @Bug 2565: Retain dictionary width to use in truncating strings,
// since BulkLoad eventually stores column token width in 'width'.
curColumn.dctnryWidth = curColumn.width;
if (getNodeAttribute(pNode, xmlTagTable[TAG_DVAL_OID], &intVal, TYPE_INT))
curColumn.dctnry.dctnryOid = intVal;
}
}
// This is a workaround that DBBuilder can not pass decimal type to XML file
if ((curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT] ||
curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT] ||
curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT] ||
curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) &&
curColumn.scale > 0)
curColumn.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
// end of workaround
// Initialize the saturation limits for this column
initSatLimits(curColumn);
// Save default columns in separate list, so that we can intentionally
// add/keep them at the "end" of colList later, after all other columns.
if (bDefaultCol) // temporarily save in separate list
{
curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
fDefaultColumns.push_back(curColumn);
}
else
{
// Add to list of db columns to be loaded
curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_FIELD;
fJob.jobTableList[tableNo].colList.push_back(curColumn);
// Add to combined field list of columns and ignored fields
JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_FIELD, fJob.jobTableList[tableNo].colList.size() - 1);
fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef);
}
}
//------------------------------------------------------------------------------
// Set column information parms for an input field that is to be ignored
//------------------------------------------------------------------------------
void XMLJob::setJobDataIgnoreField()
{
JobColumn curColumn;
int tableNo = fJob.jobTableList.size() - 1;
ostringstream oss;
oss << "IgnoreField" << fJob.jobTableList[tableNo].fFldRefs.size() + 1;
curColumn.colName = oss.str();
// Add to list of ignored fields
curColumn.fFldColRelation = BULK_FLDCOL_IGNORE_FIELD;
fJob.jobTableList[tableNo].fIgnoredFields.push_back(curColumn);
// Add to combined field list of columns and ignored fields
JobFieldRef fieldRef(BULK_FLDCOL_IGNORE_FIELD, fJob.jobTableList[tableNo].fIgnoredFields.size() - 1);
fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef);
}
//------------------------------------------------------------------------------
// Initialize the saturation limits for the specified column.
//------------------------------------------------------------------------------
void XMLJob::initSatLimits(JobColumn& curColumn) const
{
// If one of the integer types, we set the min/max saturation value.
// For DECIMAL columns this will vary with the precision.
if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT])
{
curColumn.fMinIntSat = MIN_INT;
curColumn.fMaxIntSat = MAX_INT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UINT])
{
curColumn.fMinIntSat = MIN_UINT;
curColumn.fMaxIntSat = MAX_UINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT])
{
curColumn.fMinIntSat = MIN_BIGINT;
curColumn.fMaxIntSat = MAX_BIGINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UBIGINT])
{
curColumn.fMinIntSat = MIN_UBIGINT;
curColumn.fMaxIntSat = MAX_UBIGINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::MEDINT])
{
curColumn.fMinIntSat = MIN_MEDINT;
curColumn.fMaxIntSat = MAX_MEDINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UMEDINT])
{
curColumn.fMinIntSat = MIN_UMEDINT;
curColumn.fMaxIntSat = MAX_UMEDINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT])
{
curColumn.fMinIntSat = MIN_SMALLINT;
curColumn.fMaxIntSat = MAX_SMALLINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::USMALLINT])
{
curColumn.fMinIntSat = MIN_USMALLINT;
curColumn.fMaxIntSat = MAX_USMALLINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT])
{
curColumn.fMinIntSat = MIN_TINYINT;
curColumn.fMaxIntSat = MAX_TINYINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UTINYINT])
{
curColumn.fMinIntSat = MIN_UTINYINT;
curColumn.fMaxIntSat = MAX_UTINYINT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL])
{
curColumn.fMaxIntSat = dataconvert::decimalRangeUp<int128_t>(curColumn.precision);
curColumn.fMinIntSat = -curColumn.fMaxIntSat;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL])
{
curColumn.fMinIntSat = 0;
curColumn.fMaxIntSat = dataconvert::decimalRangeUp<int128_t>(curColumn.precision);
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::FLOAT])
{
curColumn.fMinDblSat = MIN_FLOAT;
curColumn.fMaxDblSat = MAX_FLOAT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UFLOAT])
{
curColumn.fMinDblSat = 0.0;
curColumn.fMaxDblSat = MAX_FLOAT;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::DOUBLE])
{
curColumn.fMinDblSat = MIN_DOUBLE;
curColumn.fMaxDblSat = MAX_DOUBLE;
}
else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDOUBLE])
{
curColumn.fMinDblSat = 0.0;
curColumn.fMaxDblSat = MAX_DOUBLE;
}
}
//------------------------------------------------------------------------------
// Set Read Buffers attributes
// pNode - current node
//------------------------------------------------------------------------------
void XMLJob::setReadBuffers(xmlNode* pNode)
{
int intVal = 0;
if (getNodeAttribute(pNode, xmlTagTable[TAG_NO_OF_READ_BUFFERS], &intVal, TYPE_INT))
fJob.numberOfReadBuffers = intVal;
if (getNodeAttribute(pNode, xmlTagTable[TAG_READ_BUFFER_SIZE], &intVal, TYPE_INT))
fJob.readBufferSize = intVal;
}
//------------------------------------------------------------------------------
// Set Schema attributes
// pNode - current node
//------------------------------------------------------------------------------
void XMLJob::setSchema(xmlNode* pNode)
{
std::string bufString;
if (getNodeAttributeStr(pNode, xmlTagTable[TAG_SCHEMA_NAME], bufString))
fJob.schema = bufString;
}
//------------------------------------------------------------------------------
// Transfer any/all <DefaultColumn> columns from temporary fDefaultColumns, to
// the end of the column/field lists.
// It is assumed that we are working with the last table in jobTableList.
// Then get additional information from system catalog to finish populating
// our Job structs with all the table and column attributes we need.
//------------------------------------------------------------------------------
void XMLJob::postProcessTableNode()
{
bool bValidateNoDefColWithoutDefValue = false;
if (fDefaultColumns.size() > 0)
{
bValidateNoDefColWithoutDefValue = true;
int tableNo = fJob.jobTableList.size() - 1;
for (unsigned k = 0; k < fDefaultColumns.size(); k++)
{
// Add to list of db columns to be loaded
fJob.jobTableList[tableNo].colList.push_back(fDefaultColumns[k]);
// Add to combined list of columns and ignored fields
JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, fJob.jobTableList[tableNo].colList.size() - 1);
fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef);
}
fDefaultColumns.clear();
}
// Supplement xml file contents with information from syscat
execplan::CalpontSystemCatalog::RIDList colRidList;
fillInXMLDataAsLoaded(colRidList);
// After getting all the system catalog information...
// Validate that if there are any <DefaultColumn> tags for a NotNull
// column, that the column is defined as NotNull With Default.
if (bValidateNoDefColWithoutDefValue)
{
int tableNo = fJob.jobTableList.size() - 1;
for (unsigned int iCol = 0; iCol < fJob.jobTableList[tableNo].colList.size(); iCol++)
{
JobColumn& col = fJob.jobTableList[tableNo].colList[iCol];
if (col.fFldColRelation == BULK_FLDCOL_COLUMN_DEFAULT)
{
if ((col.fNotNull) && (!col.fWithDefault))
{
std::ostringstream oss;
oss << "Column " << col.colName << " in table " << fJob.jobTableList[tableNo].tblName
<< " is NotNull "
"w/o default; cannot be used with <DefaultColumn>";
throw std::runtime_error(oss.str());
}
}
}
}
// Make sure all Columns in the DB are counted for with <Column> or
// <DefaultColumn> tags (unless validate is disabled)
if (fValidateColList)
validateAllColumnsHaveTags(colRidList);
}
//------------------------------------------------------------------------------
// Use the table and column names from the last <Table> just loaded, to
// collect the remaining information from the system catalog, in order to
// populate the JobColumn structure.
//------------------------------------------------------------------------------
void XMLJob::fillInXMLDataAsLoaded(execplan::CalpontSystemCatalog::RIDList& colRidList)
{
boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
cat->identity(execplan::CalpontSystemCatalog::EC);
// Get the table and column attributes for the last <Table> processed
unsigned int iTbl = fJob.jobTableList.size() - 1;
JobTable& tbl = fJob.jobTableList[iTbl];
std::string tblName;
string::size_type startName = tbl.tblName.rfind('.');
if (startName == string::npos)
tblName.assign(tbl.tblName);
else
tblName.assign(tbl.tblName.substr(startName + 1));
execplan::CalpontSystemCatalog::TableName table(fJob.schema, tblName);
if (fJob.jobTableList[iTbl].mapOid == 0)
{
execplan::CalpontSystemCatalog::OID tblOid = cat->tableRID(table).objnum;
tbl.mapOid = tblOid;
}
// This call is made to improve performance.
// The call forces all the column information for this table to be
// cached at one time, instead of doing it piece-meal through repeated
// calls to lookupOID().
colRidList = cat->columnRIDs(table, true);
// Loop through the columns to get the column attributes
for (unsigned int iCol = 0; iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
{
JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
if (col.mapOid == 0)
{
execplan::CalpontSystemCatalog::TableColName column;
column.schema = fJob.schema;
column.table = tblName;
column.column = col.colName;
execplan::CalpontSystemCatalog::OID colOid = cat->lookupOID(column);
if (colOid < 0)
{
ostringstream oss;
oss << "Column OID lookup failed for: " << column;
throw runtime_error(oss.str());
}
col.mapOid = colOid;
execplan::CalpontSystemCatalog::ColType colType = cat->colType(col.mapOid);
col.width = colType.colWidth;
col.definedWidth = colType.colWidth;
if ((colType.scale > 0) || (colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) ||
(colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL))
{
col.precision = colType.precision;
col.scale = colType.scale;
}
col.typeName = ColDataTypeStr[colType.colDataType];
col.compressionType = colType.compressionType;
col.dctnry.fCompressionType = colType.compressionType;
if (colType.charsetNumber != 0)
{
col.cs = &datatypes::Charset(colType.charsetNumber).getCharset();
}
else
{
col.cs = &my_charset_latin1;
}
if (colType.autoincrement)
col.autoIncFlag = true;
else
col.autoIncFlag = false;
// Initialize NotNull and Default Value (based on data type)
fillInXMLDataNotNullDefault(tbl.tblName, colType, col);
if (colType.ddn.dictOID > 0)
{
col.colType = 'D';
col.dctnryWidth = colType.colWidth;
col.dctnry.dctnryOid = colType.ddn.dictOID;
}
// @bug3801: For backwards compatability, we treat
// integer types with nonzero 0 scale as decimal if scale > 0
if (((col.typeName == ColDataTypeStr[CalpontSystemCatalog::INT]) ||
(col.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT]) ||
(col.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) ||
(col.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT])) &&
(col.scale > 0))
{
col.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
}
// Initialize the saturation limits for this column
initSatLimits(col);
}
} // end of loop through columns
}
//------------------------------------------------------------------------------
// Using information from the system catalog (in colType), fill in the
// applicable NotNull Default values into the specified JobColumn.
//------------------------------------------------------------------------------
void XMLJob::fillInXMLDataNotNullDefault(const std::string& fullTblName,
execplan::CalpontSystemCatalog::ColType& colType, JobColumn& col)
{
const NullString col_defaultValue(colType.defaultValue);
if (colType.constraintType == execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
{
col.fNotNull = true;
if (!col_defaultValue.isNull())
col.fWithDefault = true;
}
else if (colType.constraintType == execplan::CalpontSystemCatalog::DEFAULT_CONSTRAINT)
{
col.fWithDefault = true;
}
if (col.fWithDefault)
{
bool bDefaultConvertError = false;
// Convert Default Value.
// We go ahead and report basic format conversion error;
// but we don't do complete validation (like checking to see
// if the default is too large for the given integer type),
// because we assume DDL is fully validating the default value.
switch (colType.colDataType)
{
case execplan::CalpontSystemCatalog::BIT:
case execplan::CalpontSystemCatalog::TINYINT:
case execplan::CalpontSystemCatalog::SMALLINT:
case execplan::CalpontSystemCatalog::MEDINT:
case execplan::CalpontSystemCatalog::INT:
case execplan::CalpontSystemCatalog::BIGINT:
{
errno = 0;
col.fDefaultInt = strtoll(col_defaultValue.str(), 0, 10);
if (errno == ERANGE)
bDefaultConvertError = true;
break;
}
case execplan::CalpontSystemCatalog::UTINYINT:
case execplan::CalpontSystemCatalog::USMALLINT:
case execplan::CalpontSystemCatalog::UMEDINT:
case execplan::CalpontSystemCatalog::UINT:
case execplan::CalpontSystemCatalog::UBIGINT:
{
errno = 0;
col.fDefaultUInt = strtoull(col_defaultValue.str(), 0, 10);
if (errno == ERANGE)
bDefaultConvertError = true;
break;
}
case execplan::CalpontSystemCatalog::DECIMAL:
case execplan::CalpontSystemCatalog::UDECIMAL:
{
if (LIKELY(colType.colWidth == datatypes::MAXDECIMALWIDTH))
{
col.fDefaultWideDecimal = colType.decimal128FromString(col_defaultValue.safeString(), &bDefaultConvertError);
}
else
{
col.fDefaultInt = Convertor::convertDecimalString(col_defaultValue.str(),
col_defaultValue.length(), colType.scale);
if (errno == ERANGE)
bDefaultConvertError = true;
}
break;
}
case execplan::CalpontSystemCatalog::DATE:
{
int convertStatus;
int32_t dt = dataconvert::DataConvert::convertColumnDate(col_defaultValue.str(),
dataconvert::CALPONTDATE_ENUM, convertStatus,
col_defaultValue.length());
if (convertStatus != 0)
bDefaultConvertError = true;
col.fDefaultInt = dt;
break;
}
case execplan::CalpontSystemCatalog::DATETIME:
{
int convertStatus;
int64_t dt = dataconvert::DataConvert::convertColumnDatetime(
col_defaultValue.str(), dataconvert::CALPONTDATETIME_ENUM, convertStatus,
col_defaultValue.length());
if (convertStatus != 0)
bDefaultConvertError = true;
col.fDefaultInt = dt;
break;
}
case execplan::CalpontSystemCatalog::TIMESTAMP:
{
int convertStatus;
int64_t dt = dataconvert::DataConvert::convertColumnTimestamp(
col_defaultValue.str(), dataconvert::CALPONTDATETIME_ENUM, convertStatus,
col_defaultValue.length(), fTimeZone);
if (convertStatus != 0)
bDefaultConvertError = true;
col.fDefaultInt = dt;
break;
}
case execplan::CalpontSystemCatalog::TIME:
{
int convertStatus;
int64_t dt = dataconvert::DataConvert::convertColumnTime(col_defaultValue.str(),
dataconvert::CALPONTTIME_ENUM, convertStatus,
col_defaultValue.length());
if (convertStatus != 0)
bDefaultConvertError = true;
col.fDefaultInt = dt;
break;
}
case execplan::CalpontSystemCatalog::FLOAT:
case execplan::CalpontSystemCatalog::DOUBLE:
case execplan::CalpontSystemCatalog::UFLOAT:
case execplan::CalpontSystemCatalog::UDOUBLE:
{
errno = 0;
col.fDefaultDbl = strtod(col_defaultValue.str(), 0);
if (errno == ERANGE)
bDefaultConvertError = true;
break;
}
default:
{
col.fDefaultChr = col_defaultValue;
break;
}
}
if (bDefaultConvertError)
{
std::ostringstream oss;
oss << "Column " << col.colName << " in table " << fullTblName
<< " has an invalid default value in system catalog.";
throw std::runtime_error(oss.str());
}
}
}
//------------------------------------------------------------------------------
// Use the table and column names from the last <Table> just loaded, to
// validate that all the columns have a <Column> or <DefaultColumn> tag
// present in the job XML file.
//------------------------------------------------------------------------------
void XMLJob::validateAllColumnsHaveTags(const execplan::CalpontSystemCatalog::RIDList& colRidList) const
{
// Validate column list for the last <Table> processed
unsigned int iTbl = fJob.jobTableList.size() - 1;
const JobTable& tbl = fJob.jobTableList[iTbl];
std::string tblName;
string::size_type startName = tbl.tblName.rfind('.');
if (startName == string::npos)
tblName.assign(tbl.tblName);
else
tblName.assign(tbl.tblName.substr(startName + 1));
try
{
// Loop through column tags, saving col OIDs to a std::set for lookups
std::set<execplan::CalpontSystemCatalog::OID> colOIDList;
typedef std::set<execplan::CalpontSystemCatalog::OID>::iterator SetIter;
std::pair<SetIter, bool> retVal;
for (unsigned int iCol = 0; iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
{
const JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
retVal = colOIDList.insert(col.mapOid);
if (!retVal.second)
{
boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
cat->identity(execplan::CalpontSystemCatalog::EC);
execplan::CalpontSystemCatalog::TableColName dbColName = cat->colName(col.mapOid);
std::ostringstream oss;
oss << "Column " << dbColName.column
<< " referenced in Job XML"
" file more than once.";
throw std::runtime_error(oss.str());
}
}
SetIter pos;
// Loop thru cols in system catalog and verify that each one has a tag
execplan::CalpontSystemCatalog::RIDList::const_iterator rid_iterator = colRidList.begin();
while (rid_iterator != colRidList.end())
{
pos = colOIDList.find(rid_iterator->objnum);
if (pos != colOIDList.end())
{
colOIDList.erase(pos); // through with this column, so delete
}
else
{
boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
cat->identity(execplan::CalpontSystemCatalog::EC);
execplan::CalpontSystemCatalog::TableColName dbColName = cat->colName(rid_iterator->objnum);
std::ostringstream oss;
oss << "No tag present in Job XML file for DB column: " << dbColName.column;
throw std::runtime_error(oss.str());
}
++rid_iterator;
}
}
catch (std::exception& ex)
{
std::ostringstream oss;
oss << "Error validating column list for table " << fJob.schema << '.' << tblName << "; " << ex.what();
throw std::runtime_error(oss.str());
}
catch (...)
{
std::ostringstream oss;
oss << "Unknown Error validating column list for table " << fJob.schema << '.' << tblName;
throw std::runtime_error(oss.str());
}
}
//------------------------------------------------------------------------------
// Generate a permanent or temporary Job XML file name path.
// sXMLJobDir Command line override for complete Job directory path
// jobDIr Job subdirectory under default <BulkRoot> path
// jobId Job ID
// bTempFile Are we creating a temporary Job Xml File
// schmaName If temp file, this is schema name to use
// tableName If temp file, this is the table name to use
// xmlDirPath The complete Job XML file path that is constructed
// errMsg Relevant error message if return value is not NO_ERROR.
//------------------------------------------------------------------------------
/* static */
int XMLJob::genJobXMLFileName(const string& sXMLJobDir, const string& jobDir, const string& jobId,
bool bTempFile, const string& schemaName, const string& tableName,
boost::filesystem::path& xmlFilePath, string& errMsg,
const std::string& tableOIDStr)
{
// get full file directory path for XML job description file
if (sXMLJobDir.empty())
{
xmlFilePath = Config::getBulkRoot();
xmlFilePath /= jobDir;
}
else
{
xmlFilePath = sXMLJobDir;
// If filespec doesn't begin with a '/' (i.e. it's not an absolute path),
// attempt to make it absolute so that we can log the full pathname.
if (!xmlFilePath.has_root_path())
{
char cwdPath[4096];
char* err;
err = getcwd(cwdPath, sizeof(cwdPath));
if (err == NULL)
{
errMsg = "Failed to get the current working directory.";
return -1;
}
string trailingPath(xmlFilePath.string());
xmlFilePath = cwdPath;
xmlFilePath /= trailingPath;
}
}
// Append the file name to the directory path
string jobFileName;
if (bTempFile)
{
// Create tmp directory if does not exist
RETURN_ON_ERROR(createTempJobDir(xmlFilePath.string(), errMsg));
jobFileName += tableOIDStr;
// jobFileName += schemaName;
// jobFileName += '_';
// jobFileName += tableName;
jobFileName += "_D";
string now(boost::posix_time::to_iso_string(boost::posix_time::second_clock::local_time()));
// microseconds
struct timeval tp;
gettimeofday(&tp, 0);
ostringstream usec;
usec << setfill('0') << setw(6) << tp.tv_usec;
jobFileName += now.substr(0, 8);
jobFileName += "_T";
jobFileName += now.substr(9, 6);
jobFileName += "_S";
jobFileName += usec.str();
jobFileName += '_';
}
jobFileName += "Job_";
jobFileName += jobId;
jobFileName += ".xml";
xmlFilePath /= jobFileName;
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Create directory for temporary XML job description files.
// OAM restart should delete any/all files in this directory.
//------------------------------------------------------------------------------
/* static */
int XMLJob::createTempJobDir(const string& xmlFilePath, string& errMsg)
{
boost::filesystem::path pathDir(xmlFilePath);
// create temp directory for XML job file if it does not exist
try
{
if (!boost::filesystem::exists(xmlFilePath))
{
string boostErrString;
try
{
boost::filesystem::create_directories(pathDir);
}
catch (exception& ex)
{
// ignore exception for now; we may have just had a
// race condition where 2 jobs were creating dirs.
boostErrString = ex.what();
}
if (!boost::filesystem::exists(xmlFilePath))
{
ostringstream oss;
oss << "Error creating XML temp job file directory(1) " << xmlFilePath << "; " << boostErrString;
errMsg = oss.str();
return ERR_DIR_CREATE;
}
}
}
catch (exception& ex)
{
ostringstream oss;
oss << "Error creating XML temp job file directory(2) " << xmlFilePath << "; " << ex.what();
errMsg = oss.str();
return ERR_DIR_CREATE;
}
if (!boost::filesystem::is_directory(pathDir))
{
ostringstream oss;
oss << "Error creating XML temp job file directory " << xmlFilePath
<< "; path already exists as non-directory" << endl;
errMsg = oss.str();
return ERR_DIR_CREATE;
}
return NO_ERROR;
}
} // namespace WriteEngine