/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /******************************************************************************* * $Id: we_xmljob.cpp 4579 2013-03-19 23:16:54Z dhall $ * *******************************************************************************/ /** @file */ #include "mcs_basic_types.h" #define WRITEENGINEXMLJOB_DLLEXPORT #include "we_xmljob.h" #undef WRITEENGINEXMLJOB_DLLEXPORT #include #include #include #include #include #include #include "we_config.h" #include "we_log.h" #include "we_convertor.h" #include "dataconvert.h" #include #include #include #include using namespace std; using namespace execplan; namespace WriteEngine { //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XMLJob::XMLJob() : fDebugLevel(DEBUG_0) , fDeleteTempFile(false) , fValidateColList(true) , fTimeZone(dataconvert::systemTimeZoneOffset()) { } //------------------------------------------------------------------------------ // Default Destructor // Delete temporary Job XML file if applicable. //------------------------------------------------------------------------------ XMLJob::~XMLJob() { if ((fDeleteTempFile) && (!fJobFileName.empty())) { unlink(fJobFileName.c_str()); } } //------------------------------------------------------------------------------ // Load a job xml file // fileName - name of file to load // bTempFile - are we loading a temporary file (that destructor should delete) // bValidateColumnList - validate that all db columns have an XML tag // returns NO_ERROR if success; other if fail //------------------------------------------------------------------------------ int XMLJob::loadJobXmlFile(const string& fileName, bool bTempFile, bool bValidateColumnList, string& errMsg) { int rc; fDeleteTempFile = bTempFile; fJobFileName = fileName; fValidateColList = bValidateColumnList; try { rc = parseDoc(fileName.c_str()); if (rc != NO_ERROR) return rc; } catch (exception& ex) { errMsg = ex.what(); return ERR_XML_PARSE; } return rc; } //------------------------------------------------------------------------------ // Print contents of fJob to the specified logger object. // logger - Log object to use in logging //------------------------------------------------------------------------------ void XMLJob::printJobInfo(Log& logger) const { const Job& job = fJob; ostringstream oss1; oss1 << "Job " << job.id << " input\n"; oss1 << "===============================================" << endl; oss1 << "Name : " << job.name << endl; oss1 << "Desc : " << job.desc << endl; oss1 << "User : " << job.userName << endl; oss1 << "Delim: " << job.fDelimiter << endl; oss1 << "Enclosed By : "; if (job.fEnclosedByChar) oss1 << job.fEnclosedByChar << endl; else oss1 << "n/a" << endl; oss1 << "Escape Char : "; if (job.fEscapeChar) oss1 << job.fEscapeChar << endl; else oss1 << "n/a" << endl; oss1 << "Read Buffers: " << job.numberOfReadBuffers << endl; oss1 << "Read Buffer Size: " << job.readBufferSize << endl; oss1 << "setvbuf Size: " << job.writeBufferSize << endl; oss1 << "Create Date : " << job.createDate << endl; oss1 << "Create Time : " << job.createTime << endl; oss1 << "Schema Name : " << job.schema << endl; oss1 << "Num Tables : " << job.jobTableList.size() << endl; logger.logMsg(oss1.str(), MSGLVL_INFO2); for (unsigned int i = 0; i < job.jobTableList.size(); i++) { const JobTable& jobTable = job.jobTableList[i]; ostringstream oss2; oss2 << "\n-------------------------------------------------" << endl; oss2 << "\tTable Name : " << jobTable.tblName << endl; oss2 << "\tTable OID : " << jobTable.mapOid << endl; oss2 << "\tTable Load Name : " << jobTable.loadFileName << endl; oss2 << "\tMax Err Num : " << jobTable.maxErrNum << endl; const JobColList& colList = jobTable.colList; oss2 << "\tNum of Columns : " << colList.size() << endl; logger.logMsg(oss2.str(), MSGLVL_INFO2); // Note that we don't print JobColumn.dataType because it is not carried // in the XML file. dataType is assigned/used internally by bulkload. for (unsigned int j = 0; j < jobTable.fFldRefs.size(); j++) { unsigned idx = jobTable.fFldRefs[j].fArrayIndex; BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType; const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ? jobTable.fIgnoredFields[idx] : jobTable.colList[idx]); ostringstream oss3; oss3 << "\n\t****************************************" << endl; if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT) oss3 << "\t\tDefaultColumn Name: " << jobCol.colName << endl; else oss3 << "\t\tColumn Name : " << jobCol.colName << endl; oss3 << "\t\tColumn OID : " << jobCol.mapOid << endl; oss3 << "\t\tColumn type name : " << jobCol.typeName << endl; oss3 << "\t\tColumn width : " << jobCol.width << endl; oss3 << "\t\tColumn Not Null : " << jobCol.fNotNull << endl; oss3 << "\t\tColumn WithDefault: " << jobCol.fWithDefault << endl; oss3 << "\t\tColumn type : " << jobCol.colType << endl; oss3 << "\t\tColumn comp type : " << jobCol.compressionType << endl; oss3 << "\t\tColumn autoInc : " << jobCol.autoIncFlag << endl; if (jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL]) { oss3 << "\t\tColumn Precision : " << jobCol.precision << endl; oss3 << "\t\tColumn Scale : " << jobCol.scale << endl; } if (jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL]) { oss3 << "\t\tColumn Precision : " << jobCol.precision << endl; oss3 << "\t\tColumn Scale : " << jobCol.scale << endl; } if (jobCol.colType == 'D') { oss3 << "\t\tDictionary Oid : " << jobCol.dctnry.dctnryOid << endl; } logger.logMsg(oss3.str(), MSGLVL_INFO2); } // end of loop through columns in a table } // end of loop through tables } //------------------------------------------------------------------------------ // Print brief contents of specified Job to specified logger object. // logger - Log object to use in logging //------------------------------------------------------------------------------ void XMLJob::printJobInfoBrief(Log& logger) const { const Job& job = fJob; ostringstream oss1; oss1 << "XMLJobFile: Delim(" << job.fDelimiter << "); EnclosedBy("; if (job.fEnclosedByChar) oss1 << job.fEnclosedByChar; else oss1 << "n/a"; oss1 << "); EscapeChar("; if (job.fEscapeChar) oss1 << job.fEscapeChar; else oss1 << "n/a"; oss1 << "); ReadBufs(" << job.numberOfReadBuffers << "); ReadBufSize(" << job.readBufferSize << "); setvbufSize(" << job.writeBufferSize << ')'; logger.logMsg(oss1.str(), MSGLVL_INFO2); for (unsigned int i = 0; i < job.jobTableList.size(); i++) { const JobTable& jobTable = job.jobTableList[i]; ostringstream oss2; oss2 << " Table(" << jobTable.tblName << "); OID(" << jobTable.mapOid << ')' << "; MaxErrNum(" << jobTable.maxErrNum << ')'; logger.logMsg(oss2.str(), MSGLVL_INFO2); for (unsigned int j = 0; j < jobTable.fFldRefs.size(); j++) { unsigned idx = jobTable.fFldRefs[j].fArrayIndex; BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType; const JobColumn& jobCol = ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ? jobTable.fIgnoredFields[idx] : jobTable.colList[idx]); ostringstream oss3; if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT) oss3 << " DefaultColumn(" << jobCol.colName; else oss3 << " Column(" << jobCol.colName; oss3 << "); OID(" << jobCol.mapOid << "); Type(" << jobCol.typeName << "); Width(" << jobCol.width << "); Comp(" << jobCol.compressionType; if (jobCol.colType == 'D') oss3 << "); DctnryOid(" << jobCol.dctnry.dctnryOid; oss3 << ')'; if (jobCol.autoIncFlag) oss3 << "; autoInc"; if (jobCol.fNotNull) oss3 << "; NotNull"; if (jobCol.fWithDefault) oss3 << "; WithDefault"; logger.logMsg(oss3.str(), MSGLVL_INFO2); } } // end of for( int i } //------------------------------------------------------------------------------ // Process a node // pNode - current node // returns TRUE if success, FALSE otherwise //------------------------------------------------------------------------------ bool XMLJob::processNode(xmlNode* pNode) { if (isTag(pNode, TAG_BULK_JOB)) { // no work for the BulkJob tag } else if (isTag(pNode, TAG_CREATE_DATE)) setJobData(pNode, TAG_CREATE_DATE, true, TYPE_CHAR); else if (isTag(pNode, TAG_CREATE_TIME)) setJobData(pNode, TAG_CREATE_TIME, true, TYPE_CHAR); else if (isTag(pNode, TAG_COLUMN)) setJobData(pNode, TAG_COLUMN, false, TYPE_EMPTY); else if (isTag(pNode, TAG_DEFAULT_COLUMN)) setJobData(pNode, TAG_DEFAULT_COLUMN, false, TYPE_EMPTY); else if (isTag(pNode, TAG_DESC)) setJobData(pNode, TAG_DESC, true, TYPE_CHAR); else if (isTag(pNode, TAG_ID)) setJobData(pNode, TAG_ID, true, TYPE_INT); else if (isTag(pNode, TAG_IGNORE_FIELD)) setJobData(pNode, TAG_IGNORE_FIELD, false, TYPE_EMPTY); else if (isTag(pNode, TAG_NAME)) setJobData(pNode, TAG_NAME, true, TYPE_CHAR); else if (isTag(pNode, TAG_PATH)) setJobData(pNode, TAG_PATH, true, TYPE_CHAR); else if (isTag(pNode, TAG_TABLE)) setJobData(pNode, TAG_TABLE, false, TYPE_EMPTY); else if (isTag(pNode, TAG_TYPE)) setJobData(pNode, TAG_TYPE, true, TYPE_CHAR); else if (isTag(pNode, TAG_USER)) setJobData(pNode, TAG_USER, true, TYPE_CHAR); else if (isTag(pNode, TAG_SCHEMA)) setJobData(pNode, TAG_SCHEMA, false, TYPE_EMPTY); else if (isTag(pNode, TAG_READ_BUFFERS)) setJobData(pNode, TAG_READ_BUFFERS, false, TYPE_EMPTY); else if (isTag(pNode, TAG_WRITE_BUFFER_SIZE)) setJobData(pNode, TAG_WRITE_BUFFER_SIZE, true, TYPE_INT); else if (isTag(pNode, TAG_DELIMITER)) setJobData(pNode, TAG_DELIMITER, true, TYPE_CHAR); else if (isTag(pNode, TAG_ENCLOSED_BY_CHAR)) setJobData(pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR); else if (isTag(pNode, TAG_ESCAPE_CHAR)) setJobData(pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR); else { ostringstream oss; oss << "Unrecognized TAG in Job XML file: <" << pNode->name << ">"; throw runtime_error(oss.str()); } if (XMLOp::processNode(pNode)) { if (isTag(pNode, TAG_TABLE)) { postProcessTableNode(); } } else { return false; } return true; } //------------------------------------------------------------------------------ // Generic setter // pNode - current node // tag - xml tag // bExpectContent - should node content be present to process // tagType - data type //------------------------------------------------------------------------------ void XMLJob::setJobData(xmlNode* pNode, const xmlTag tag, bool bExpectContent, XML_DTYPE tagType) { int intVal = 0; long long llVal = 0; std::string bufString; bool bSuccess = false; if (bExpectContent) { if (tagType == TYPE_INT) bSuccess = getNodeContent(pNode, &intVal, TYPE_INT); else // longlong if (tagType == TYPE_LONGLONG) bSuccess = getNodeContent(pNode, &llVal, TYPE_LONGLONG); else // char if (tagType == TYPE_CHAR) bSuccess = getNodeContentStr(pNode, bufString); if (!bSuccess) return; } // process tag content and attributes switch (tag) { case TAG_READ_BUFFERS: setReadBuffers(pNode); break; case TAG_COLUMN: setJobDataColumn(pNode, false); break; case TAG_CREATE_DATE: fJob.createDate = bufString; break; case TAG_CREATE_TIME: fJob.createTime = bufString; break; case TAG_DEFAULT_COLUMN: setJobDataColumn(pNode, true); break; case TAG_DESC: fJob.desc = bufString; break; case TAG_ID: fJob.id = intVal; break; case TAG_IGNORE_FIELD: setJobDataIgnoreField(); break; case TAG_NAME: fJob.name = bufString; break; case TAG_PATH: // no action necessary, but keep for backwards compatability break; case TAG_TABLE: setJobDataTable(pNode); break; case TAG_TYPE: // no action necessary, but keep for backwards compatability break; case TAG_USER: fJob.userName = bufString; break; case TAG_SCHEMA: setSchema(pNode); break; case TAG_WRITE_BUFFER_SIZE: fJob.writeBufferSize = intVal; break; case TAG_DELIMITER: { const char* buf = bufString.c_str(); if ((!strcmp(buf, "\\t")) || (!strcmp(buf, "'\\t'"))) { fJob.fDelimiter = '\t'; } else { fJob.fDelimiter = bufString[0]; } break; } case TAG_ENCLOSED_BY_CHAR: { fJob.fEnclosedByChar = bufString[0]; break; } case TAG_ESCAPE_CHAR: { fJob.fEscapeChar = bufString[0]; break; } default: break; } } //------------------------------------------------------------------------------ // Set table information parms. // pNode - current node //------------------------------------------------------------------------------ void XMLJob::setJobDataTable(xmlNode* pNode) { int intVal; std::string bufString; JobTable curTable; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_ORIG_NAME], bufString)) curTable.tblName = bufString; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_TBL_NAME], bufString)) curTable.tblName = bufString; if (curTable.tblName.empty()) { throw runtime_error("Required table name attribute (tblName) missing from Table tag"); } if (getNodeAttribute(pNode, xmlTagTable[TAG_TBL_OID], &intVal, TYPE_INT)) curTable.mapOid = intVal; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_LOAD_NAME], bufString)) curTable.loadFileName = bufString; if (getNodeAttribute(pNode, xmlTagTable[TAG_MAX_ERR_ROW], &intVal, TYPE_INT)) curTable.maxErrNum = intVal; fJob.jobTableList.push_back(curTable); } //------------------------------------------------------------------------------ // Set column information parms. // pNode - current node // bDefaultCol - is this a tag // // Note on Supported Tags: (Bug 2828) // Note that the "notnull" and "defaultValue" attribute tags are not recognized // by this function because by the time we added support for these tags, we had // changed to only store the table and column names in the XML file. Much of // the functionality in setJobDataColumn() is only present to provide backwards // compatability for an old Job XML file that a user might still be using. // // Any other new tags probably don't need adding to setJobDataColumn() either, // for the same reason. //------------------------------------------------------------------------------ void XMLJob::setJobDataColumn(xmlNode* pNode, bool bDefaultCol) { int intVal; std::string bufString; JobColumn curColumn; if (fJob.jobTableList.size() == 0) return; int tableNo = fJob.jobTableList.size() - 1; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_ORIG_NAME], bufString)) curColumn.colName = bufString; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_COL_NAME], bufString)) curColumn.colName = bufString; if (curColumn.colName.empty()) { ostringstream oss; oss << "Required column name attribute (colName) missing from " "Column tag for table " << fJob.jobTableList[tableNo].tblName; throw runtime_error(oss.str()); } if (getNodeAttribute(pNode, xmlTagTable[TAG_COL_OID], &intVal, TYPE_INT)) curColumn.mapOid = intVal; if (getNodeAttribute(pNode, xmlTagTable[TAG_WIDTH], &intVal, TYPE_INT)) { curColumn.width = intVal; curColumn.definedWidth = intVal; //@Bug 3040 } if (getNodeAttribute(pNode, xmlTagTable[TAG_PRECISION], &intVal, TYPE_INT)) curColumn.precision = intVal; if (getNodeAttribute(pNode, xmlTagTable[TAG_SCALE], &intVal, TYPE_INT)) curColumn.scale = intVal; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_DATA_TYPE], bufString)) curColumn.typeName = bufString; if (getNodeAttribute(pNode, xmlTagTable[TAG_COMPRESS_TYPE], &intVal, TYPE_INT)) { curColumn.compressionType = intVal; curColumn.dctnry.fCompressionType = intVal; } if (getNodeAttribute(pNode, xmlTagTable[TAG_AUTOINCREMENT_FLAG], &intVal, TYPE_INT)) { if (intVal) curColumn.autoIncFlag = true; else curColumn.autoIncFlag = false; } if (getNodeAttributeStr(pNode, xmlTagTable[TAG_COL_TYPE], bufString)) { const char* buf = bufString.c_str(); if (!strcmp(buf, "D")) { curColumn.colType = 'D'; // @Bug 2565: Retain dictionary width to use in truncating strings, // since BulkLoad eventually stores column token width in 'width'. curColumn.dctnryWidth = curColumn.width; if (getNodeAttribute(pNode, xmlTagTable[TAG_DVAL_OID], &intVal, TYPE_INT)) curColumn.dctnry.dctnryOid = intVal; } } // This is a workaround that DBBuilder can not pass decimal type to XML file if ((curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT] || curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT] || curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT] || curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) && curColumn.scale > 0) curColumn.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL]; // end of workaround // Initialize the saturation limits for this column initSatLimits(curColumn); // Save default columns in separate list, so that we can intentionally // add/keep them at the "end" of colList later, after all other columns. if (bDefaultCol) // temporarily save in separate list { curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT; fDefaultColumns.push_back(curColumn); } else { // Add to list of db columns to be loaded curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_FIELD; fJob.jobTableList[tableNo].colList.push_back(curColumn); // Add to combined field list of columns and ignored fields JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_FIELD, fJob.jobTableList[tableNo].colList.size() - 1); fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef); } } //------------------------------------------------------------------------------ // Set column information parms for an input field that is to be ignored //------------------------------------------------------------------------------ void XMLJob::setJobDataIgnoreField() { JobColumn curColumn; int tableNo = fJob.jobTableList.size() - 1; ostringstream oss; oss << "IgnoreField" << fJob.jobTableList[tableNo].fFldRefs.size() + 1; curColumn.colName = oss.str(); // Add to list of ignored fields curColumn.fFldColRelation = BULK_FLDCOL_IGNORE_FIELD; fJob.jobTableList[tableNo].fIgnoredFields.push_back(curColumn); // Add to combined field list of columns and ignored fields JobFieldRef fieldRef(BULK_FLDCOL_IGNORE_FIELD, fJob.jobTableList[tableNo].fIgnoredFields.size() - 1); fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef); } //------------------------------------------------------------------------------ // Initialize the saturation limits for the specified column. //------------------------------------------------------------------------------ void XMLJob::initSatLimits(JobColumn& curColumn) const { // If one of the integer types, we set the min/max saturation value. // For DECIMAL columns this will vary with the precision. if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT]) { curColumn.fMinIntSat = MIN_INT; curColumn.fMaxIntSat = MAX_INT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UINT]) { curColumn.fMinIntSat = MIN_UINT; curColumn.fMaxIntSat = MAX_UINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT]) { curColumn.fMinIntSat = MIN_BIGINT; curColumn.fMaxIntSat = MAX_BIGINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UBIGINT]) { curColumn.fMinIntSat = MIN_UBIGINT; curColumn.fMaxIntSat = MAX_UBIGINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::MEDINT]) { curColumn.fMinIntSat = MIN_MEDINT; curColumn.fMaxIntSat = MAX_MEDINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UMEDINT]) { curColumn.fMinIntSat = MIN_UMEDINT; curColumn.fMaxIntSat = MAX_UMEDINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) { curColumn.fMinIntSat = MIN_SMALLINT; curColumn.fMaxIntSat = MAX_SMALLINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::USMALLINT]) { curColumn.fMinIntSat = MIN_USMALLINT; curColumn.fMaxIntSat = MAX_USMALLINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) { curColumn.fMinIntSat = MIN_TINYINT; curColumn.fMaxIntSat = MAX_TINYINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UTINYINT]) { curColumn.fMinIntSat = MIN_UTINYINT; curColumn.fMaxIntSat = MAX_UTINYINT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL]) { curColumn.fMaxIntSat = dataconvert::decimalRangeUp(curColumn.precision); curColumn.fMinIntSat = -curColumn.fMaxIntSat; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL]) { curColumn.fMinIntSat = 0; curColumn.fMaxIntSat = dataconvert::decimalRangeUp(curColumn.precision); } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::FLOAT]) { curColumn.fMinDblSat = MIN_FLOAT; curColumn.fMaxDblSat = MAX_FLOAT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UFLOAT]) { curColumn.fMinDblSat = 0.0; curColumn.fMaxDblSat = MAX_FLOAT; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::DOUBLE]) { curColumn.fMinDblSat = MIN_DOUBLE; curColumn.fMaxDblSat = MAX_DOUBLE; } else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDOUBLE]) { curColumn.fMinDblSat = 0.0; curColumn.fMaxDblSat = MAX_DOUBLE; } } //------------------------------------------------------------------------------ // Set Read Buffers attributes // pNode - current node //------------------------------------------------------------------------------ void XMLJob::setReadBuffers(xmlNode* pNode) { int intVal = 0; if (getNodeAttribute(pNode, xmlTagTable[TAG_NO_OF_READ_BUFFERS], &intVal, TYPE_INT)) fJob.numberOfReadBuffers = intVal; if (getNodeAttribute(pNode, xmlTagTable[TAG_READ_BUFFER_SIZE], &intVal, TYPE_INT)) fJob.readBufferSize = intVal; } //------------------------------------------------------------------------------ // Set Schema attributes // pNode - current node //------------------------------------------------------------------------------ void XMLJob::setSchema(xmlNode* pNode) { std::string bufString; if (getNodeAttributeStr(pNode, xmlTagTable[TAG_SCHEMA_NAME], bufString)) fJob.schema = bufString; } //------------------------------------------------------------------------------ // Transfer any/all columns from temporary fDefaultColumns, to // the end of the column/field lists. // It is assumed that we are working with the last table in jobTableList. // Then get additional information from system catalog to finish populating // our Job structs with all the table and column attributes we need. //------------------------------------------------------------------------------ void XMLJob::postProcessTableNode() { bool bValidateNoDefColWithoutDefValue = false; if (fDefaultColumns.size() > 0) { bValidateNoDefColWithoutDefValue = true; int tableNo = fJob.jobTableList.size() - 1; for (unsigned k = 0; k < fDefaultColumns.size(); k++) { // Add to list of db columns to be loaded fJob.jobTableList[tableNo].colList.push_back(fDefaultColumns[k]); // Add to combined list of columns and ignored fields JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, fJob.jobTableList[tableNo].colList.size() - 1); fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef); } fDefaultColumns.clear(); } // Supplement xml file contents with information from syscat execplan::CalpontSystemCatalog::RIDList colRidList; fillInXMLDataAsLoaded(colRidList); // After getting all the system catalog information... // Validate that if there are any tags for a NotNull // column, that the column is defined as NotNull With Default. if (bValidateNoDefColWithoutDefValue) { int tableNo = fJob.jobTableList.size() - 1; for (unsigned int iCol = 0; iCol < fJob.jobTableList[tableNo].colList.size(); iCol++) { JobColumn& col = fJob.jobTableList[tableNo].colList[iCol]; if (col.fFldColRelation == BULK_FLDCOL_COLUMN_DEFAULT) { if ((col.fNotNull) && (!col.fWithDefault)) { std::ostringstream oss; oss << "Column " << col.colName << " in table " << fJob.jobTableList[tableNo].tblName << " is NotNull " "w/o default; cannot be used with "; throw std::runtime_error(oss.str()); } } } } // Make sure all Columns in the DB are counted for with or // tags (unless validate is disabled) if (fValidateColList) validateAllColumnsHaveTags(colRidList); } //------------------------------------------------------------------------------ // Use the table and column names from the last just loaded, to // collect the remaining information from the system catalog, in order to // populate the JobColumn structure. //------------------------------------------------------------------------------ void XMLJob::fillInXMLDataAsLoaded(execplan::CalpontSystemCatalog::RIDList& colRidList) { boost::shared_ptr cat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID); cat->identity(execplan::CalpontSystemCatalog::EC); // Get the table and column attributes for the last
processed unsigned int iTbl = fJob.jobTableList.size() - 1; JobTable& tbl = fJob.jobTableList[iTbl]; std::string tblName; string::size_type startName = tbl.tblName.rfind('.'); if (startName == string::npos) tblName.assign(tbl.tblName); else tblName.assign(tbl.tblName.substr(startName + 1)); execplan::CalpontSystemCatalog::TableName table(fJob.schema, tblName); if (fJob.jobTableList[iTbl].mapOid == 0) { execplan::CalpontSystemCatalog::OID tblOid = cat->tableRID(table).objnum; tbl.mapOid = tblOid; } // This call is made to improve performance. // The call forces all the column information for this table to be // cached at one time, instead of doing it piece-meal through repeated // calls to lookupOID(). colRidList = cat->columnRIDs(table, true); // Loop through the columns to get the column attributes for (unsigned int iCol = 0; iCol < fJob.jobTableList[iTbl].colList.size(); iCol++) { JobColumn& col = fJob.jobTableList[iTbl].colList[iCol]; if (col.mapOid == 0) { execplan::CalpontSystemCatalog::TableColName column; column.schema = fJob.schema; column.table = tblName; column.column = col.colName; execplan::CalpontSystemCatalog::OID colOid = cat->lookupOID(column); if (colOid < 0) { ostringstream oss; oss << "Column OID lookup failed for: " << column; throw runtime_error(oss.str()); } col.mapOid = colOid; execplan::CalpontSystemCatalog::ColType colType = cat->colType(col.mapOid); col.width = colType.colWidth; col.definedWidth = colType.colWidth; if ((colType.scale > 0) || (colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) || (colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL)) { col.precision = colType.precision; col.scale = colType.scale; } col.typeName = ColDataTypeStr[colType.colDataType]; col.compressionType = colType.compressionType; col.dctnry.fCompressionType = colType.compressionType; if (colType.charsetNumber != 0) { col.cs = &datatypes::Charset(colType.charsetNumber).getCharset(); } else { col.cs = &my_charset_latin1; } if (colType.autoincrement) col.autoIncFlag = true; else col.autoIncFlag = false; // Initialize NotNull and Default Value (based on data type) fillInXMLDataNotNullDefault(tbl.tblName, colType, col); if (colType.ddn.dictOID > 0) { col.colType = 'D'; col.dctnryWidth = colType.colWidth; col.dctnry.dctnryOid = colType.ddn.dictOID; } // @bug3801: For backwards compatability, we treat // integer types with nonzero 0 scale as decimal if scale > 0 if (((col.typeName == ColDataTypeStr[CalpontSystemCatalog::INT]) || (col.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT]) || (col.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) || (col.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT])) && (col.scale > 0)) { col.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL]; } // Initialize the saturation limits for this column initSatLimits(col); } } // end of loop through columns } //------------------------------------------------------------------------------ // Using information from the system catalog (in colType), fill in the // applicable NotNull Default values into the specified JobColumn. //------------------------------------------------------------------------------ void XMLJob::fillInXMLDataNotNullDefault(const std::string& fullTblName, execplan::CalpontSystemCatalog::ColType& colType, JobColumn& col) { const NullString col_defaultValue(colType.defaultValue); if (colType.constraintType == execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT) { col.fNotNull = true; if (!col_defaultValue.isNull()) col.fWithDefault = true; } else if (colType.constraintType == execplan::CalpontSystemCatalog::DEFAULT_CONSTRAINT) { col.fWithDefault = true; } if (col.fWithDefault) { bool bDefaultConvertError = false; // Convert Default Value. // We go ahead and report basic format conversion error; // but we don't do complete validation (like checking to see // if the default is too large for the given integer type), // because we assume DDL is fully validating the default value. switch (colType.colDataType) { case execplan::CalpontSystemCatalog::BIT: case execplan::CalpontSystemCatalog::TINYINT: case execplan::CalpontSystemCatalog::SMALLINT: case execplan::CalpontSystemCatalog::MEDINT: case execplan::CalpontSystemCatalog::INT: case execplan::CalpontSystemCatalog::BIGINT: { errno = 0; col.fDefaultInt = strtoll(col_defaultValue.str(), 0, 10); if (errno == ERANGE) bDefaultConvertError = true; break; } case execplan::CalpontSystemCatalog::UTINYINT: case execplan::CalpontSystemCatalog::USMALLINT: case execplan::CalpontSystemCatalog::UMEDINT: case execplan::CalpontSystemCatalog::UINT: case execplan::CalpontSystemCatalog::UBIGINT: { errno = 0; col.fDefaultUInt = strtoull(col_defaultValue.str(), 0, 10); if (errno == ERANGE) bDefaultConvertError = true; break; } case execplan::CalpontSystemCatalog::DECIMAL: case execplan::CalpontSystemCatalog::UDECIMAL: { if (LIKELY(colType.colWidth == datatypes::MAXDECIMALWIDTH)) { col.fDefaultWideDecimal = colType.decimal128FromString(col_defaultValue.safeString(), &bDefaultConvertError); } else { col.fDefaultInt = Convertor::convertDecimalString(col_defaultValue.str(), col_defaultValue.length(), colType.scale); if (errno == ERANGE) bDefaultConvertError = true; } break; } case execplan::CalpontSystemCatalog::DATE: { int convertStatus; int32_t dt = dataconvert::DataConvert::convertColumnDate(col_defaultValue.str(), dataconvert::CALPONTDATE_ENUM, convertStatus, col_defaultValue.length()); if (convertStatus != 0) bDefaultConvertError = true; col.fDefaultInt = dt; break; } case execplan::CalpontSystemCatalog::DATETIME: { int convertStatus; int64_t dt = dataconvert::DataConvert::convertColumnDatetime( col_defaultValue.str(), dataconvert::CALPONTDATETIME_ENUM, convertStatus, col_defaultValue.length()); if (convertStatus != 0) bDefaultConvertError = true; col.fDefaultInt = dt; break; } case execplan::CalpontSystemCatalog::TIMESTAMP: { int convertStatus; int64_t dt = dataconvert::DataConvert::convertColumnTimestamp( col_defaultValue.str(), dataconvert::CALPONTDATETIME_ENUM, convertStatus, col_defaultValue.length(), fTimeZone); if (convertStatus != 0) bDefaultConvertError = true; col.fDefaultInt = dt; break; } case execplan::CalpontSystemCatalog::TIME: { int convertStatus; int64_t dt = dataconvert::DataConvert::convertColumnTime(col_defaultValue.str(), dataconvert::CALPONTTIME_ENUM, convertStatus, col_defaultValue.length()); if (convertStatus != 0) bDefaultConvertError = true; col.fDefaultInt = dt; break; } case execplan::CalpontSystemCatalog::FLOAT: case execplan::CalpontSystemCatalog::DOUBLE: case execplan::CalpontSystemCatalog::UFLOAT: case execplan::CalpontSystemCatalog::UDOUBLE: { errno = 0; col.fDefaultDbl = strtod(col_defaultValue.str(), 0); if (errno == ERANGE) bDefaultConvertError = true; break; } default: { col.fDefaultChr = col_defaultValue; break; } } if (bDefaultConvertError) { std::ostringstream oss; oss << "Column " << col.colName << " in table " << fullTblName << " has an invalid default value in system catalog."; throw std::runtime_error(oss.str()); } } } //------------------------------------------------------------------------------ // Use the table and column names from the last
just loaded, to // validate that all the columns have a or tag // present in the job XML file. //------------------------------------------------------------------------------ void XMLJob::validateAllColumnsHaveTags(const execplan::CalpontSystemCatalog::RIDList& colRidList) const { // Validate column list for the last
processed unsigned int iTbl = fJob.jobTableList.size() - 1; const JobTable& tbl = fJob.jobTableList[iTbl]; std::string tblName; string::size_type startName = tbl.tblName.rfind('.'); if (startName == string::npos) tblName.assign(tbl.tblName); else tblName.assign(tbl.tblName.substr(startName + 1)); try { // Loop through column tags, saving col OIDs to a std::set for lookups std::set colOIDList; typedef std::set::iterator SetIter; std::pair retVal; for (unsigned int iCol = 0; iCol < fJob.jobTableList[iTbl].colList.size(); iCol++) { const JobColumn& col = fJob.jobTableList[iTbl].colList[iCol]; retVal = colOIDList.insert(col.mapOid); if (!retVal.second) { boost::shared_ptr cat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID); cat->identity(execplan::CalpontSystemCatalog::EC); execplan::CalpontSystemCatalog::TableColName dbColName = cat->colName(col.mapOid); std::ostringstream oss; oss << "Column " << dbColName.column << " referenced in Job XML" " file more than once."; throw std::runtime_error(oss.str()); } } SetIter pos; // Loop thru cols in system catalog and verify that each one has a tag execplan::CalpontSystemCatalog::RIDList::const_iterator rid_iterator = colRidList.begin(); while (rid_iterator != colRidList.end()) { pos = colOIDList.find(rid_iterator->objnum); if (pos != colOIDList.end()) { colOIDList.erase(pos); // through with this column, so delete } else { boost::shared_ptr cat = execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID); cat->identity(execplan::CalpontSystemCatalog::EC); execplan::CalpontSystemCatalog::TableColName dbColName = cat->colName(rid_iterator->objnum); std::ostringstream oss; oss << "No tag present in Job XML file for DB column: " << dbColName.column; throw std::runtime_error(oss.str()); } ++rid_iterator; } } catch (std::exception& ex) { std::ostringstream oss; oss << "Error validating column list for table " << fJob.schema << '.' << tblName << "; " << ex.what(); throw std::runtime_error(oss.str()); } catch (...) { std::ostringstream oss; oss << "Unknown Error validating column list for table " << fJob.schema << '.' << tblName; throw std::runtime_error(oss.str()); } } //------------------------------------------------------------------------------ // Generate a permanent or temporary Job XML file name path. // sXMLJobDir Command line override for complete Job directory path // jobDIr Job subdirectory under default path // jobId Job ID // bTempFile Are we creating a temporary Job Xml File // schmaName If temp file, this is schema name to use // tableName If temp file, this is the table name to use // xmlDirPath The complete Job XML file path that is constructed // errMsg Relevant error message if return value is not NO_ERROR. //------------------------------------------------------------------------------ /* static */ int XMLJob::genJobXMLFileName(const string& sXMLJobDir, const string& jobDir, const string& jobId, bool bTempFile, const string& schemaName, const string& tableName, boost::filesystem::path& xmlFilePath, string& errMsg, const std::string& tableOIDStr) { // get full file directory path for XML job description file if (sXMLJobDir.empty()) { xmlFilePath = Config::getBulkRoot(); xmlFilePath /= jobDir; } else { xmlFilePath = sXMLJobDir; // If filespec doesn't begin with a '/' (i.e. it's not an absolute path), // attempt to make it absolute so that we can log the full pathname. if (!xmlFilePath.has_root_path()) { char cwdPath[4096]; char* err; err = getcwd(cwdPath, sizeof(cwdPath)); if (err == NULL) { errMsg = "Failed to get the current working directory."; return -1; } string trailingPath(xmlFilePath.string()); xmlFilePath = cwdPath; xmlFilePath /= trailingPath; } } // Append the file name to the directory path string jobFileName; if (bTempFile) { // Create tmp directory if does not exist RETURN_ON_ERROR(createTempJobDir(xmlFilePath.string(), errMsg)); jobFileName += tableOIDStr; // jobFileName += schemaName; // jobFileName += '_'; // jobFileName += tableName; jobFileName += "_D"; string now(boost::posix_time::to_iso_string(boost::posix_time::second_clock::local_time())); // microseconds struct timeval tp; gettimeofday(&tp, 0); ostringstream usec; usec << setfill('0') << setw(6) << tp.tv_usec; jobFileName += now.substr(0, 8); jobFileName += "_T"; jobFileName += now.substr(9, 6); jobFileName += "_S"; jobFileName += usec.str(); jobFileName += '_'; } jobFileName += "Job_"; jobFileName += jobId; jobFileName += ".xml"; xmlFilePath /= jobFileName; return NO_ERROR; } //------------------------------------------------------------------------------ // Create directory for temporary XML job description files. // OAM restart should delete any/all files in this directory. //------------------------------------------------------------------------------ /* static */ int XMLJob::createTempJobDir(const string& xmlFilePath, string& errMsg) { boost::filesystem::path pathDir(xmlFilePath); // create temp directory for XML job file if it does not exist try { if (!boost::filesystem::exists(xmlFilePath)) { string boostErrString; try { boost::filesystem::create_directories(pathDir); } catch (exception& ex) { // ignore exception for now; we may have just had a // race condition where 2 jobs were creating dirs. boostErrString = ex.what(); } if (!boost::filesystem::exists(xmlFilePath)) { ostringstream oss; oss << "Error creating XML temp job file directory(1) " << xmlFilePath << "; " << boostErrString; errMsg = oss.str(); return ERR_DIR_CREATE; } } } catch (exception& ex) { ostringstream oss; oss << "Error creating XML temp job file directory(2) " << xmlFilePath << "; " << ex.what(); errMsg = oss.str(); return ERR_DIR_CREATE; } if (!boost::filesystem::is_directory(pathDir)) { ostringstream oss; oss << "Error creating XML temp job file directory " << xmlFilePath << "; path already exists as non-directory" << endl; errMsg = oss.str(); return ERR_DIR_CREATE; } return NO_ERROR; } } // namespace WriteEngine