You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1323 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1323 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/*******************************************************************************
 | 
						|
 * $Id: we_xmljob.cpp 4579 2013-03-19 23:16:54Z dhall $
 | 
						|
 *
 | 
						|
 *******************************************************************************/
 | 
						|
/** @file */
 | 
						|
 | 
						|
#include "mcs_basic_types.h"
 | 
						|
#define WRITEENGINEXMLJOB_DLLEXPORT
 | 
						|
#include "we_xmljob.h"
 | 
						|
#undef WRITEENGINEXMLJOB_DLLEXPORT
 | 
						|
 | 
						|
#include <limits>
 | 
						|
#include <sstream>
 | 
						|
#include <unistd.h>
 | 
						|
#include <stdexcept>
 | 
						|
#include <cstdlib>
 | 
						|
#include <set>
 | 
						|
#include "we_config.h"
 | 
						|
#include "we_log.h"
 | 
						|
#include "we_convertor.h"
 | 
						|
#include "dataconvert.h"
 | 
						|
#include <boost/date_time/posix_time/posix_time.hpp>
 | 
						|
#include <boost/filesystem.hpp>
 | 
						|
#include <sys/time.h>
 | 
						|
 | 
						|
using namespace std;
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
namespace WriteEngine
 | 
						|
{
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Constructor
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
XMLJob::XMLJob()
 | 
						|
 : fDeleteTempFile(false), fValidateColList(true), fTimeZone(dataconvert::systemTimeZoneOffset())
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Default Destructor
 | 
						|
// Delete temporary Job XML file if applicable.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
XMLJob::~XMLJob()
 | 
						|
{
 | 
						|
  if ((fDeleteTempFile) && (!fJobFileName.empty()))
 | 
						|
  {
 | 
						|
    unlink(fJobFileName.c_str());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Load a job xml file
 | 
						|
// fileName - name of file to load
 | 
						|
// bTempFile - are we loading a temporary file (that destructor should delete)
 | 
						|
// bValidateColumnList - validate that all db columns have an XML tag
 | 
						|
// returns NO_ERROR if success; other if fail
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
int XMLJob::loadJobXmlFile(const string& fileName, bool bTempFile, bool bValidateColumnList, string& errMsg)
 | 
						|
{
 | 
						|
  int rc;
 | 
						|
 | 
						|
  fDeleteTempFile = bTempFile;
 | 
						|
  fJobFileName = fileName;
 | 
						|
  fValidateColList = bValidateColumnList;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    rc = parseDoc(fileName.c_str());
 | 
						|
 | 
						|
    if (rc != NO_ERROR)
 | 
						|
      return rc;
 | 
						|
  }
 | 
						|
  catch (exception& ex)
 | 
						|
  {
 | 
						|
    errMsg = ex.what();
 | 
						|
    return ERR_XML_PARSE;
 | 
						|
  }
 | 
						|
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Print contents of fJob to the specified logger object.
 | 
						|
// logger - Log object to use in logging
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::printJobInfo(Log& logger) const
 | 
						|
{
 | 
						|
  const Job& job = fJob;
 | 
						|
 | 
						|
  ostringstream oss1;
 | 
						|
  oss1 << "Job " << job.id << " input\n";
 | 
						|
  oss1 << "===============================================" << endl;
 | 
						|
  oss1 << "Name : " << job.name << endl;
 | 
						|
  oss1 << "Desc : " << job.desc << endl;
 | 
						|
  oss1 << "User : " << job.userName << endl;
 | 
						|
  oss1 << "Delim: " << job.fDelimiter << endl;
 | 
						|
  oss1 << "Enclosed By : ";
 | 
						|
 | 
						|
  if (job.fEnclosedByChar)
 | 
						|
    oss1 << job.fEnclosedByChar << endl;
 | 
						|
  else
 | 
						|
    oss1 << "n/a" << endl;
 | 
						|
 | 
						|
  oss1 << "Escape Char : ";
 | 
						|
 | 
						|
  if (job.fEscapeChar)
 | 
						|
    oss1 << job.fEscapeChar << endl;
 | 
						|
  else
 | 
						|
    oss1 << "n/a" << endl;
 | 
						|
 | 
						|
  oss1 << "Read Buffers:     " << job.numberOfReadBuffers << endl;
 | 
						|
  oss1 << "Read Buffer Size: " << job.readBufferSize << endl;
 | 
						|
  oss1 << "setvbuf Size: " << job.writeBufferSize << endl;
 | 
						|
  oss1 << "Header rows : " << job.fSkipRows << endl;
 | 
						|
  oss1 << "Create Date : " << job.createDate << endl;
 | 
						|
  oss1 << "Create Time : " << job.createTime << endl;
 | 
						|
  oss1 << "Schema Name : " << job.schema << endl;
 | 
						|
 | 
						|
  oss1 << "Num Tables  : " << job.jobTableList.size() << endl;
 | 
						|
  logger.logMsg(oss1.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
  for (unsigned int i = 0; i < job.jobTableList.size(); i++)
 | 
						|
  {
 | 
						|
    const JobTable& jobTable = job.jobTableList[i];
 | 
						|
    ostringstream oss2;
 | 
						|
    oss2 << "\n-------------------------------------------------" << endl;
 | 
						|
    oss2 << "\tTable Name      : " << jobTable.tblName << endl;
 | 
						|
    oss2 << "\tTable OID       : " << jobTable.mapOid << endl;
 | 
						|
    oss2 << "\tTable Load Name : " << jobTable.loadFileName << endl;
 | 
						|
    oss2 << "\tMax Err Num     : " << jobTable.maxErrNum << endl;
 | 
						|
 | 
						|
    const JobColList& colList = jobTable.colList;
 | 
						|
 | 
						|
    oss2 << "\tNum of Columns  : " << colList.size() << endl;
 | 
						|
    logger.logMsg(oss2.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
    // Note that we don't print JobColumn.dataType because it is not carried
 | 
						|
    // in the XML file.  dataType is assigned/used internally by bulkload.
 | 
						|
    for (unsigned int j = 0; j < jobTable.fFldRefs.size(); j++)
 | 
						|
    {
 | 
						|
      unsigned idx = jobTable.fFldRefs[j].fArrayIndex;
 | 
						|
      BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
 | 
						|
      const JobColumn& jobCol =
 | 
						|
          ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ? jobTable.fIgnoredFields[idx] : jobTable.colList[idx]);
 | 
						|
      ostringstream oss3;
 | 
						|
      oss3 << "\n\t****************************************" << endl;
 | 
						|
 | 
						|
      if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
 | 
						|
        oss3 << "\t\tDefaultColumn Name: " << jobCol.colName << endl;
 | 
						|
      else
 | 
						|
        oss3 << "\t\tColumn Name       : " << jobCol.colName << endl;
 | 
						|
 | 
						|
      oss3 << "\t\tColumn OID        : " << jobCol.mapOid << endl;
 | 
						|
      oss3 << "\t\tColumn type name  : " << jobCol.typeName << endl;
 | 
						|
      oss3 << "\t\tColumn width      : " << jobCol.width << endl;
 | 
						|
      oss3 << "\t\tColumn Not Null   : " << jobCol.fNotNull << endl;
 | 
						|
      oss3 << "\t\tColumn WithDefault: " << jobCol.fWithDefault << endl;
 | 
						|
      oss3 << "\t\tColumn type       : " << jobCol.colType << endl;
 | 
						|
      oss3 << "\t\tColumn comp type  : " << jobCol.compressionType << endl;
 | 
						|
      oss3 << "\t\tColumn autoInc    : " << jobCol.autoIncFlag << endl;
 | 
						|
 | 
						|
      if (jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL])
 | 
						|
      {
 | 
						|
        oss3 << "\t\tColumn Precision  : " << jobCol.precision << endl;
 | 
						|
        oss3 << "\t\tColumn Scale      : " << jobCol.scale << endl;
 | 
						|
      }
 | 
						|
 | 
						|
      if (jobCol.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL])
 | 
						|
      {
 | 
						|
        oss3 << "\t\tColumn Precision  : " << jobCol.precision << endl;
 | 
						|
        oss3 << "\t\tColumn Scale      : " << jobCol.scale << endl;
 | 
						|
      }
 | 
						|
 | 
						|
      if (jobCol.colType == 'D')
 | 
						|
      {
 | 
						|
        oss3 << "\t\tDictionary Oid    : " << jobCol.dctnry.dctnryOid << endl;
 | 
						|
      }
 | 
						|
 | 
						|
      logger.logMsg(oss3.str(), MSGLVL_INFO2);
 | 
						|
    }  // end of loop through columns in a table
 | 
						|
  }  // end of loop through tables
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Print brief contents of specified Job to specified logger object.
 | 
						|
// logger - Log object to use in logging
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::printJobInfoBrief(Log& logger) const
 | 
						|
{
 | 
						|
  const Job& job = fJob;
 | 
						|
 | 
						|
  ostringstream oss1;
 | 
						|
  oss1 << "XMLJobFile: Delim(" << job.fDelimiter << "); EnclosedBy(";
 | 
						|
 | 
						|
  if (job.fEnclosedByChar)
 | 
						|
    oss1 << job.fEnclosedByChar;
 | 
						|
  else
 | 
						|
    oss1 << "n/a";
 | 
						|
 | 
						|
  oss1 << "); EscapeChar(";
 | 
						|
 | 
						|
  if (job.fEscapeChar)
 | 
						|
    oss1 << job.fEscapeChar;
 | 
						|
  else
 | 
						|
    oss1 << "n/a";
 | 
						|
 | 
						|
  oss1 << "); ReadBufs(" << job.numberOfReadBuffers << "); ReadBufSize(" << job.readBufferSize
 | 
						|
       << "); setvbufSize(" << job.writeBufferSize << "); "
 | 
						|
       << "SkipRows(" << job.fSkipRows << ")";
 | 
						|
  logger.logMsg(oss1.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
  for (unsigned int i = 0; i < job.jobTableList.size(); i++)
 | 
						|
  {
 | 
						|
    const JobTable& jobTable = job.jobTableList[i];
 | 
						|
    ostringstream oss2;
 | 
						|
    oss2 << "  Table(" << jobTable.tblName << "); OID(" << jobTable.mapOid << ')' << "; MaxErrNum("
 | 
						|
         << jobTable.maxErrNum << ')';
 | 
						|
    logger.logMsg(oss2.str(), MSGLVL_INFO2);
 | 
						|
 | 
						|
    for (unsigned int j = 0; j < jobTable.fFldRefs.size(); j++)
 | 
						|
    {
 | 
						|
      unsigned idx = jobTable.fFldRefs[j].fArrayIndex;
 | 
						|
      BulkFldColRel fldColType = jobTable.fFldRefs[j].fFldColType;
 | 
						|
      const JobColumn& jobCol =
 | 
						|
          ((fldColType == BULK_FLDCOL_IGNORE_FIELD) ? jobTable.fIgnoredFields[idx] : jobTable.colList[idx]);
 | 
						|
      ostringstream oss3;
 | 
						|
 | 
						|
      if (fldColType == BULK_FLDCOL_COLUMN_DEFAULT)
 | 
						|
        oss3 << "    DefaultColumn(" << jobCol.colName;
 | 
						|
      else
 | 
						|
        oss3 << "    Column(" << jobCol.colName;
 | 
						|
 | 
						|
      oss3 << "); OID(" << jobCol.mapOid << "); Type(" << jobCol.typeName << "); Width(" << jobCol.width
 | 
						|
           << "); Comp(" << jobCol.compressionType;
 | 
						|
 | 
						|
      if (jobCol.colType == 'D')
 | 
						|
        oss3 << "); DctnryOid(" << jobCol.dctnry.dctnryOid;
 | 
						|
 | 
						|
      oss3 << ')';
 | 
						|
 | 
						|
      if (jobCol.autoIncFlag)
 | 
						|
        oss3 << "; autoInc";
 | 
						|
 | 
						|
      if (jobCol.fNotNull)
 | 
						|
        oss3 << "; NotNull";
 | 
						|
 | 
						|
      if (jobCol.fWithDefault)
 | 
						|
        oss3 << "; WithDefault";
 | 
						|
 | 
						|
      logger.logMsg(oss3.str(), MSGLVL_INFO2);
 | 
						|
    }
 | 
						|
  }  // end of for( int i
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Process a node
 | 
						|
// pNode - current node
 | 
						|
// returns TRUE if success, FALSE otherwise
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
bool XMLJob::processNode(xmlNode* pNode)
 | 
						|
{
 | 
						|
  if (isTag(pNode, TAG_BULK_JOB))
 | 
						|
  {
 | 
						|
    // no work for the BulkJob tag
 | 
						|
  }
 | 
						|
  else if (isTag(pNode, TAG_CREATE_DATE))
 | 
						|
    setJobData(pNode, TAG_CREATE_DATE, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_CREATE_TIME))
 | 
						|
    setJobData(pNode, TAG_CREATE_TIME, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_COLUMN))
 | 
						|
    setJobData(pNode, TAG_COLUMN, false, TYPE_EMPTY);
 | 
						|
  else if (isTag(pNode, TAG_DEFAULT_COLUMN))
 | 
						|
    setJobData(pNode, TAG_DEFAULT_COLUMN, false, TYPE_EMPTY);
 | 
						|
  else if (isTag(pNode, TAG_DESC))
 | 
						|
    setJobData(pNode, TAG_DESC, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_ID))
 | 
						|
    setJobData(pNode, TAG_ID, true, TYPE_INT);
 | 
						|
  else if (isTag(pNode, TAG_IGNORE_FIELD))
 | 
						|
    setJobData(pNode, TAG_IGNORE_FIELD, false, TYPE_EMPTY);
 | 
						|
  else if (isTag(pNode, TAG_NAME))
 | 
						|
    setJobData(pNode, TAG_NAME, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_PATH))
 | 
						|
    setJobData(pNode, TAG_PATH, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_TABLE))
 | 
						|
    setJobData(pNode, TAG_TABLE, false, TYPE_EMPTY);
 | 
						|
  else if (isTag(pNode, TAG_TYPE))
 | 
						|
    setJobData(pNode, TAG_TYPE, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_USER))
 | 
						|
    setJobData(pNode, TAG_USER, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_SCHEMA))
 | 
						|
    setJobData(pNode, TAG_SCHEMA, false, TYPE_EMPTY);
 | 
						|
  else if (isTag(pNode, TAG_READ_BUFFERS))
 | 
						|
    setJobData(pNode, TAG_READ_BUFFERS, false, TYPE_EMPTY);
 | 
						|
  else if (isTag(pNode, TAG_WRITE_BUFFER_SIZE))
 | 
						|
    setJobData(pNode, TAG_WRITE_BUFFER_SIZE, true, TYPE_INT);
 | 
						|
  else if (isTag(pNode, TAG_DELIMITER))
 | 
						|
    setJobData(pNode, TAG_DELIMITER, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_ENCLOSED_BY_CHAR))
 | 
						|
    setJobData(pNode, TAG_ENCLOSED_BY_CHAR, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_ESCAPE_CHAR))
 | 
						|
    setJobData(pNode, TAG_ESCAPE_CHAR, true, TYPE_CHAR);
 | 
						|
  else if (isTag(pNode, TAG_SKIP_ROWS))
 | 
						|
    setJobData(pNode, TAG_SKIP_ROWS, true, TYPE_INT);
 | 
						|
  else
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Unrecognized TAG in Job XML file: <" << pNode->name << ">";
 | 
						|
    throw runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  if (XMLOp::processNode(pNode))
 | 
						|
  {
 | 
						|
    if (isTag(pNode, TAG_TABLE))
 | 
						|
    {
 | 
						|
      postProcessTableNode();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    return false;
 | 
						|
  }
 | 
						|
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Generic setter
 | 
						|
// pNode - current node
 | 
						|
// tag - xml tag
 | 
						|
// bExpectContent - should node content be present to process
 | 
						|
// tagType - data type
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::setJobData(xmlNode* pNode, const xmlTag tag, bool bExpectContent, XML_DTYPE tagType)
 | 
						|
{
 | 
						|
  int intVal = 0;
 | 
						|
  long long llVal = 0;
 | 
						|
  std::string bufString;
 | 
						|
  bool bSuccess = false;
 | 
						|
 | 
						|
  if (bExpectContent)
 | 
						|
  {
 | 
						|
    if (tagType == TYPE_INT)
 | 
						|
      bSuccess = getNodeContent(pNode, &intVal, TYPE_INT);
 | 
						|
    else  // longlong
 | 
						|
      if (tagType == TYPE_LONGLONG)
 | 
						|
        bSuccess = getNodeContent(pNode, &llVal, TYPE_LONGLONG);
 | 
						|
      else  // char
 | 
						|
        if (tagType == TYPE_CHAR)
 | 
						|
          bSuccess = getNodeContentStr(pNode, bufString);
 | 
						|
 | 
						|
    if (!bSuccess)
 | 
						|
      return;
 | 
						|
  }
 | 
						|
 | 
						|
  // process tag content and attributes
 | 
						|
  switch (tag)
 | 
						|
  {
 | 
						|
    case TAG_READ_BUFFERS: setReadBuffers(pNode); break;
 | 
						|
 | 
						|
    case TAG_COLUMN: setJobDataColumn(pNode, false); break;
 | 
						|
 | 
						|
    case TAG_CREATE_DATE: fJob.createDate = bufString; break;
 | 
						|
 | 
						|
    case TAG_CREATE_TIME: fJob.createTime = bufString; break;
 | 
						|
 | 
						|
    case TAG_DEFAULT_COLUMN: setJobDataColumn(pNode, true); break;
 | 
						|
 | 
						|
    case TAG_DESC: fJob.desc = bufString; break;
 | 
						|
 | 
						|
    case TAG_ID: fJob.id = intVal; break;
 | 
						|
 | 
						|
    case TAG_IGNORE_FIELD: setJobDataIgnoreField(); break;
 | 
						|
 | 
						|
    case TAG_NAME: fJob.name = bufString; break;
 | 
						|
 | 
						|
    case TAG_PATH:
 | 
						|
      // no action necessary, but keep for backwards compatability
 | 
						|
      break;
 | 
						|
 | 
						|
    case TAG_TABLE: setJobDataTable(pNode); break;
 | 
						|
 | 
						|
    case TAG_TYPE:
 | 
						|
      // no action necessary, but keep for backwards compatability
 | 
						|
      break;
 | 
						|
 | 
						|
    case TAG_USER: fJob.userName = bufString; break;
 | 
						|
 | 
						|
    case TAG_SCHEMA: setSchema(pNode); break;
 | 
						|
 | 
						|
    case TAG_WRITE_BUFFER_SIZE: fJob.writeBufferSize = intVal; break;
 | 
						|
 | 
						|
    case TAG_DELIMITER:
 | 
						|
    {
 | 
						|
      const char* buf = bufString.c_str();
 | 
						|
 | 
						|
      if ((!strcmp(buf, "\\t")) || (!strcmp(buf, "'\\t'")))
 | 
						|
      {
 | 
						|
        fJob.fDelimiter = '\t';
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        fJob.fDelimiter = bufString[0];
 | 
						|
      }
 | 
						|
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    case TAG_ENCLOSED_BY_CHAR:
 | 
						|
    {
 | 
						|
      fJob.fEnclosedByChar = bufString[0];
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    case TAG_ESCAPE_CHAR:
 | 
						|
    {
 | 
						|
      fJob.fEscapeChar = bufString[0];
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    case TAG_SKIP_ROWS:
 | 
						|
    {
 | 
						|
      fJob.fSkipRows = intVal;
 | 
						|
      break;
 | 
						|
    }
 | 
						|
 | 
						|
    default: break;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Set table information parms.
 | 
						|
// pNode - current node
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::setJobDataTable(xmlNode* pNode)
 | 
						|
{
 | 
						|
  int intVal;
 | 
						|
  std::string bufString;
 | 
						|
  JobTable curTable;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_ORIG_NAME], bufString))
 | 
						|
    curTable.tblName = bufString;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_TBL_NAME], bufString))
 | 
						|
    curTable.tblName = bufString;
 | 
						|
 | 
						|
  if (curTable.tblName.empty())
 | 
						|
  {
 | 
						|
    throw runtime_error("Required table name attribute (tblName) missing from Table tag");
 | 
						|
  }
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_TBL_OID], &intVal, TYPE_INT))
 | 
						|
    curTable.mapOid = intVal;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_LOAD_NAME], bufString))
 | 
						|
    curTable.loadFileName = bufString;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_MAX_ERR_ROW], bufString))
 | 
						|
  {
 | 
						|
    if (bufString == "all")
 | 
						|
      curTable.maxErrNum = MAX_ERRORS_ALL;
 | 
						|
    else
 | 
						|
      curTable.maxErrNum = atoi(bufString.c_str());
 | 
						|
  }
 | 
						|
 | 
						|
  fJob.jobTableList.push_back(curTable);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Set column information parms.
 | 
						|
// pNode - current node
 | 
						|
// bDefaultCol - is this a <DefaultColumn> tag
 | 
						|
//
 | 
						|
// Note on Supported Tags: (Bug 2828)
 | 
						|
// Note that the "notnull" and "defaultValue" attribute tags are not recognized
 | 
						|
// by this function because by the time we added support for these tags, we had
 | 
						|
// changed to only store the table and column names in the XML file.  Much of
 | 
						|
// the functionality in setJobDataColumn() is only present to provide backwards
 | 
						|
// compatability for an old Job XML file that a user might still be using.
 | 
						|
//
 | 
						|
// Any other new tags probably don't need adding to setJobDataColumn() either,
 | 
						|
// for the same reason.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::setJobDataColumn(xmlNode* pNode, bool bDefaultCol)
 | 
						|
{
 | 
						|
  int intVal;
 | 
						|
  std::string bufString;
 | 
						|
  JobColumn curColumn;
 | 
						|
 | 
						|
  if (fJob.jobTableList.size() == 0)
 | 
						|
    return;
 | 
						|
 | 
						|
  int tableNo = fJob.jobTableList.size() - 1;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_ORIG_NAME], bufString))
 | 
						|
    curColumn.colName = bufString;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_COL_NAME], bufString))
 | 
						|
    curColumn.colName = bufString;
 | 
						|
 | 
						|
  if (curColumn.colName.empty())
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Required column name attribute (colName) missing from "
 | 
						|
           "Column tag for table "
 | 
						|
        << fJob.jobTableList[tableNo].tblName;
 | 
						|
    throw runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_COL_OID], &intVal, TYPE_INT))
 | 
						|
    curColumn.mapOid = intVal;
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_WIDTH], &intVal, TYPE_INT))
 | 
						|
  {
 | 
						|
    curColumn.width = intVal;
 | 
						|
    curColumn.definedWidth = intVal;  //@Bug 3040
 | 
						|
  }
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_PRECISION], &intVal, TYPE_INT))
 | 
						|
    curColumn.precision = intVal;
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_SCALE], &intVal, TYPE_INT))
 | 
						|
    curColumn.scale = intVal;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_DATA_TYPE], bufString))
 | 
						|
    curColumn.typeName = bufString;
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_COMPRESS_TYPE], &intVal, TYPE_INT))
 | 
						|
  {
 | 
						|
    curColumn.compressionType = intVal;
 | 
						|
    curColumn.dctnry.fCompressionType = intVal;
 | 
						|
  }
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_AUTOINCREMENT_FLAG], &intVal, TYPE_INT))
 | 
						|
  {
 | 
						|
    if (intVal)
 | 
						|
      curColumn.autoIncFlag = true;
 | 
						|
    else
 | 
						|
      curColumn.autoIncFlag = false;
 | 
						|
  }
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_COL_TYPE], bufString))
 | 
						|
  {
 | 
						|
    const char* buf = bufString.c_str();
 | 
						|
 | 
						|
    if (!strcmp(buf, "D"))
 | 
						|
    {
 | 
						|
      curColumn.colType = 'D';
 | 
						|
 | 
						|
      // @Bug 2565: Retain dictionary width to use in truncating strings,
 | 
						|
      // since BulkLoad eventually stores column token width in 'width'.
 | 
						|
      curColumn.dctnryWidth = curColumn.width;
 | 
						|
 | 
						|
      if (getNodeAttribute(pNode, xmlTagTable[TAG_DVAL_OID], &intVal, TYPE_INT))
 | 
						|
        curColumn.dctnry.dctnryOid = intVal;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // This is a workaround that DBBuilder can not pass decimal type to XML file
 | 
						|
  if ((curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT] ||
 | 
						|
       curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT] ||
 | 
						|
       curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT] ||
 | 
						|
       curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT]) &&
 | 
						|
      curColumn.scale > 0)
 | 
						|
    curColumn.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
 | 
						|
 | 
						|
  // end of workaround
 | 
						|
 | 
						|
  // Initialize the saturation limits for this column
 | 
						|
  initSatLimits(curColumn);
 | 
						|
 | 
						|
  // Save default columns in separate list, so that we can intentionally
 | 
						|
  // add/keep them at the "end" of colList later, after all other columns.
 | 
						|
  if (bDefaultCol)  // temporarily save in separate list
 | 
						|
  {
 | 
						|
    curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_DEFAULT;
 | 
						|
    fDefaultColumns.push_back(curColumn);
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    // Add to list of db columns to be loaded
 | 
						|
    curColumn.fFldColRelation = BULK_FLDCOL_COLUMN_FIELD;
 | 
						|
    fJob.jobTableList[tableNo].colList.push_back(curColumn);
 | 
						|
 | 
						|
    // Add to combined field list of columns and ignored fields
 | 
						|
    JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_FIELD, fJob.jobTableList[tableNo].colList.size() - 1);
 | 
						|
    fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Set column information parms for an input field that is to be ignored
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::setJobDataIgnoreField()
 | 
						|
{
 | 
						|
  JobColumn curColumn;
 | 
						|
 | 
						|
  int tableNo = fJob.jobTableList.size() - 1;
 | 
						|
  ostringstream oss;
 | 
						|
  oss << "IgnoreField" << fJob.jobTableList[tableNo].fFldRefs.size() + 1;
 | 
						|
  curColumn.colName = oss.str();
 | 
						|
 | 
						|
  // Add to list of ignored fields
 | 
						|
  curColumn.fFldColRelation = BULK_FLDCOL_IGNORE_FIELD;
 | 
						|
  fJob.jobTableList[tableNo].fIgnoredFields.push_back(curColumn);
 | 
						|
 | 
						|
  // Add to combined field list of columns and ignored fields
 | 
						|
  JobFieldRef fieldRef(BULK_FLDCOL_IGNORE_FIELD, fJob.jobTableList[tableNo].fIgnoredFields.size() - 1);
 | 
						|
  fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Initialize the saturation limits for the specified column.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::initSatLimits(JobColumn& curColumn) const
 | 
						|
{
 | 
						|
  // If one of the integer types, we set the min/max saturation value.
 | 
						|
  // For DECIMAL columns this will vary with the precision.
 | 
						|
  if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::INT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_INT;
 | 
						|
    curColumn.fMaxIntSat = MAX_INT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_UINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_UINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_BIGINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_BIGINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UBIGINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_UBIGINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_UBIGINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::MEDINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_MEDINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_MEDINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UMEDINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_UMEDINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_UMEDINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_SMALLINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_SMALLINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::USMALLINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_USMALLINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_USMALLINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_TINYINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_TINYINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UTINYINT])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = MIN_UTINYINT;
 | 
						|
    curColumn.fMaxIntSat = MAX_UTINYINT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::DECIMAL])
 | 
						|
  {
 | 
						|
    curColumn.fMaxIntSat = dataconvert::decimalRangeUp<int128_t>(curColumn.precision);
 | 
						|
    curColumn.fMinIntSat = -curColumn.fMaxIntSat;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDECIMAL])
 | 
						|
  {
 | 
						|
    curColumn.fMinIntSat = 0;
 | 
						|
    curColumn.fMaxIntSat = dataconvert::decimalRangeUp<int128_t>(curColumn.precision);
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::FLOAT])
 | 
						|
  {
 | 
						|
    curColumn.fMinDblSat = MIN_FLOAT;
 | 
						|
    curColumn.fMaxDblSat = MAX_FLOAT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UFLOAT])
 | 
						|
  {
 | 
						|
    curColumn.fMinDblSat = 0.0;
 | 
						|
    curColumn.fMaxDblSat = MAX_FLOAT;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::DOUBLE])
 | 
						|
  {
 | 
						|
    curColumn.fMinDblSat = MIN_DOUBLE;
 | 
						|
    curColumn.fMaxDblSat = MAX_DOUBLE;
 | 
						|
  }
 | 
						|
  else if (curColumn.typeName == ColDataTypeStr[CalpontSystemCatalog::UDOUBLE])
 | 
						|
  {
 | 
						|
    curColumn.fMinDblSat = 0.0;
 | 
						|
    curColumn.fMaxDblSat = MAX_DOUBLE;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Set Read Buffers attributes
 | 
						|
// pNode - current node
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::setReadBuffers(xmlNode* pNode)
 | 
						|
{
 | 
						|
  int intVal = 0;
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_NO_OF_READ_BUFFERS], &intVal, TYPE_INT))
 | 
						|
    fJob.numberOfReadBuffers = intVal;
 | 
						|
 | 
						|
  if (getNodeAttribute(pNode, xmlTagTable[TAG_READ_BUFFER_SIZE], &intVal, TYPE_INT))
 | 
						|
    fJob.readBufferSize = intVal;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Set Schema attributes
 | 
						|
// pNode - current node
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::setSchema(xmlNode* pNode)
 | 
						|
{
 | 
						|
  std::string bufString;
 | 
						|
 | 
						|
  if (getNodeAttributeStr(pNode, xmlTagTable[TAG_SCHEMA_NAME], bufString))
 | 
						|
    fJob.schema = bufString;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Transfer any/all <DefaultColumn> columns from temporary fDefaultColumns, to
 | 
						|
// the end of the column/field lists.
 | 
						|
// It is assumed that we are working with the last table in jobTableList.
 | 
						|
// Then get additional information from system catalog to finish populating
 | 
						|
// our Job structs with all the table and column attributes we need.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::postProcessTableNode()
 | 
						|
{
 | 
						|
  bool bValidateNoDefColWithoutDefValue = false;
 | 
						|
 | 
						|
  if (fDefaultColumns.size() > 0)
 | 
						|
  {
 | 
						|
    bValidateNoDefColWithoutDefValue = true;
 | 
						|
    int tableNo = fJob.jobTableList.size() - 1;
 | 
						|
 | 
						|
    for (unsigned k = 0; k < fDefaultColumns.size(); k++)
 | 
						|
    {
 | 
						|
      // Add to list of db columns to be loaded
 | 
						|
      fJob.jobTableList[tableNo].colList.push_back(fDefaultColumns[k]);
 | 
						|
 | 
						|
      // Add to combined list of columns and ignored fields
 | 
						|
      JobFieldRef fieldRef(BULK_FLDCOL_COLUMN_DEFAULT, fJob.jobTableList[tableNo].colList.size() - 1);
 | 
						|
      fJob.jobTableList[tableNo].fFldRefs.push_back(fieldRef);
 | 
						|
    }
 | 
						|
 | 
						|
    fDefaultColumns.clear();
 | 
						|
  }
 | 
						|
 | 
						|
  // Supplement xml file contents with information from syscat
 | 
						|
  execplan::CalpontSystemCatalog::RIDList colRidList;
 | 
						|
  fillInXMLDataAsLoaded(colRidList);
 | 
						|
 | 
						|
  // After getting all the system catalog information...
 | 
						|
  // Validate that if there are any <DefaultColumn> tags for a NotNull
 | 
						|
  // column, that the column is defined as NotNull With Default.
 | 
						|
  if (bValidateNoDefColWithoutDefValue)
 | 
						|
  {
 | 
						|
    int tableNo = fJob.jobTableList.size() - 1;
 | 
						|
 | 
						|
    for (unsigned int iCol = 0; iCol < fJob.jobTableList[tableNo].colList.size(); iCol++)
 | 
						|
    {
 | 
						|
      JobColumn& col = fJob.jobTableList[tableNo].colList[iCol];
 | 
						|
 | 
						|
      if (col.fFldColRelation == BULK_FLDCOL_COLUMN_DEFAULT)
 | 
						|
      {
 | 
						|
        if ((col.fNotNull) && (!col.fWithDefault))
 | 
						|
        {
 | 
						|
          std::ostringstream oss;
 | 
						|
          oss << "Column " << col.colName << " in table " << fJob.jobTableList[tableNo].tblName
 | 
						|
              << " is NotNull "
 | 
						|
                 "w/o default; cannot be used with <DefaultColumn>";
 | 
						|
          throw std::runtime_error(oss.str());
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // Make sure all Columns in the DB are counted for with <Column> or
 | 
						|
  // <DefaultColumn> tags (unless validate is disabled)
 | 
						|
  if (fValidateColList)
 | 
						|
    validateAllColumnsHaveTags(colRidList);
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Use the table and column names from the last <Table> just loaded, to
 | 
						|
// collect the remaining information from the system catalog, in order to
 | 
						|
// populate the JobColumn structure.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::fillInXMLDataAsLoaded(execplan::CalpontSystemCatalog::RIDList& colRidList)
 | 
						|
{
 | 
						|
  boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
 | 
						|
      execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
 | 
						|
  cat->identity(execplan::CalpontSystemCatalog::EC);
 | 
						|
 | 
						|
  // Get the table and column attributes for the last <Table> processed
 | 
						|
  unsigned int iTbl = fJob.jobTableList.size() - 1;
 | 
						|
  JobTable& tbl = fJob.jobTableList[iTbl];
 | 
						|
 | 
						|
  std::string tblName;
 | 
						|
  string::size_type startName = tbl.tblName.rfind('.');
 | 
						|
 | 
						|
  if (startName == string::npos)
 | 
						|
    tblName.assign(tbl.tblName);
 | 
						|
  else
 | 
						|
    tblName.assign(tbl.tblName.substr(startName + 1));
 | 
						|
 | 
						|
  execplan::CalpontSystemCatalog::TableName table(fJob.schema, tblName);
 | 
						|
 | 
						|
  if (fJob.jobTableList[iTbl].mapOid == 0)
 | 
						|
  {
 | 
						|
    execplan::CalpontSystemCatalog::OID tblOid = cat->tableRID(table).objnum;
 | 
						|
    tbl.mapOid = tblOid;
 | 
						|
  }
 | 
						|
 | 
						|
  // This call is made to improve performance.
 | 
						|
  // The call forces all the column information for this table to be
 | 
						|
  // cached at one time, instead of doing it piece-meal through repeated
 | 
						|
  // calls to lookupOID().
 | 
						|
  colRidList = cat->columnRIDs(table, true);
 | 
						|
 | 
						|
  // Loop through the columns to get the column attributes
 | 
						|
  for (unsigned int iCol = 0; iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
 | 
						|
  {
 | 
						|
    JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
 | 
						|
 | 
						|
    if (col.mapOid == 0)
 | 
						|
    {
 | 
						|
      execplan::CalpontSystemCatalog::TableColName column;
 | 
						|
      column.schema = fJob.schema;
 | 
						|
      column.table = tblName;
 | 
						|
      column.column = col.colName;
 | 
						|
      execplan::CalpontSystemCatalog::OID colOid = cat->lookupOID(column);
 | 
						|
 | 
						|
      if (colOid < 0)
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Column OID lookup failed for: " << column;
 | 
						|
        throw runtime_error(oss.str());
 | 
						|
      }
 | 
						|
 | 
						|
      col.mapOid = colOid;
 | 
						|
 | 
						|
      execplan::CalpontSystemCatalog::ColType colType = cat->colType(col.mapOid);
 | 
						|
 | 
						|
      col.width = colType.colWidth;
 | 
						|
      col.definedWidth = colType.colWidth;
 | 
						|
 | 
						|
      if ((colType.scale > 0) || (colType.colDataType == execplan::CalpontSystemCatalog::DECIMAL) ||
 | 
						|
          (colType.colDataType == execplan::CalpontSystemCatalog::UDECIMAL))
 | 
						|
      {
 | 
						|
        col.precision = colType.precision;
 | 
						|
        col.scale = colType.scale;
 | 
						|
      }
 | 
						|
 | 
						|
      col.typeName = ColDataTypeStr[colType.colDataType];
 | 
						|
      col.compressionType = colType.compressionType;
 | 
						|
      col.dctnry.fCompressionType = colType.compressionType;
 | 
						|
 | 
						|
      if (colType.charsetNumber != 0)
 | 
						|
      {
 | 
						|
        col.cs = &datatypes::Charset(colType.charsetNumber).getCharset();
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        col.cs = &my_charset_latin1;
 | 
						|
      }
 | 
						|
 | 
						|
      if (colType.autoincrement)
 | 
						|
        col.autoIncFlag = true;
 | 
						|
      else
 | 
						|
        col.autoIncFlag = false;
 | 
						|
 | 
						|
      // Initialize NotNull and Default Value (based on data type)
 | 
						|
      fillInXMLDataNotNullDefault(tbl.tblName, colType, col);
 | 
						|
 | 
						|
      if (colType.ddn.dictOID > 0)
 | 
						|
      {
 | 
						|
        col.colType = 'D';
 | 
						|
        col.dctnryWidth = colType.colWidth;
 | 
						|
        col.dctnry.dctnryOid = colType.ddn.dictOID;
 | 
						|
      }
 | 
						|
 | 
						|
      // @bug3801: For backwards compatability, we treat
 | 
						|
      // integer types with nonzero 0 scale as decimal if scale > 0
 | 
						|
      if (((col.typeName == ColDataTypeStr[CalpontSystemCatalog::INT]) ||
 | 
						|
           (col.typeName == ColDataTypeStr[CalpontSystemCatalog::BIGINT]) ||
 | 
						|
           (col.typeName == ColDataTypeStr[CalpontSystemCatalog::SMALLINT]) ||
 | 
						|
           (col.typeName == ColDataTypeStr[CalpontSystemCatalog::TINYINT])) &&
 | 
						|
          (col.scale > 0))
 | 
						|
      {
 | 
						|
        col.typeName = ColDataTypeStr[CalpontSystemCatalog::DECIMAL];
 | 
						|
      }
 | 
						|
 | 
						|
      // Initialize the saturation limits for this column
 | 
						|
      initSatLimits(col);
 | 
						|
    }
 | 
						|
  }  // end of loop through columns
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Using information from the system catalog (in colType), fill in the
 | 
						|
// applicable NotNull Default values into the specified JobColumn.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::fillInXMLDataNotNullDefault(const std::string& fullTblName,
 | 
						|
                                         execplan::CalpontSystemCatalog::ColType& colType, JobColumn& col)
 | 
						|
{
 | 
						|
  const NullString col_defaultValue(colType.defaultValue);
 | 
						|
 | 
						|
  if (colType.constraintType == execplan::CalpontSystemCatalog::NOTNULL_CONSTRAINT)
 | 
						|
  {
 | 
						|
    col.fNotNull = true;
 | 
						|
 | 
						|
    if (!col_defaultValue.isNull())
 | 
						|
      col.fWithDefault = true;
 | 
						|
  }
 | 
						|
  else if (colType.constraintType == execplan::CalpontSystemCatalog::DEFAULT_CONSTRAINT)
 | 
						|
  {
 | 
						|
    col.fWithDefault = true;
 | 
						|
  }
 | 
						|
 | 
						|
  if (col.fWithDefault)
 | 
						|
  {
 | 
						|
    bool bDefaultConvertError = false;
 | 
						|
 | 
						|
    // Convert Default Value.
 | 
						|
    // We go ahead and report basic format conversion error;
 | 
						|
    // but we don't do complete validation (like checking to see
 | 
						|
    // if the default is too large for the given integer type),
 | 
						|
    // because we assume DDL is fully validating the default value.
 | 
						|
    switch (colType.colDataType)
 | 
						|
    {
 | 
						|
      case execplan::CalpontSystemCatalog::BIT:
 | 
						|
      case execplan::CalpontSystemCatalog::TINYINT:
 | 
						|
      case execplan::CalpontSystemCatalog::SMALLINT:
 | 
						|
      case execplan::CalpontSystemCatalog::MEDINT:
 | 
						|
      case execplan::CalpontSystemCatalog::INT:
 | 
						|
      case execplan::CalpontSystemCatalog::BIGINT:
 | 
						|
      {
 | 
						|
        errno = 0;
 | 
						|
        col.fDefaultInt = strtoll(col_defaultValue.str(), 0, 10);
 | 
						|
 | 
						|
        if (errno == ERANGE)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::UTINYINT:
 | 
						|
      case execplan::CalpontSystemCatalog::USMALLINT:
 | 
						|
      case execplan::CalpontSystemCatalog::UMEDINT:
 | 
						|
      case execplan::CalpontSystemCatalog::UINT:
 | 
						|
      case execplan::CalpontSystemCatalog::UBIGINT:
 | 
						|
      {
 | 
						|
        errno = 0;
 | 
						|
        col.fDefaultUInt = strtoull(col_defaultValue.str(), 0, 10);
 | 
						|
 | 
						|
        if (errno == ERANGE)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::DECIMAL:
 | 
						|
      case execplan::CalpontSystemCatalog::UDECIMAL:
 | 
						|
      {
 | 
						|
        if (LIKELY(colType.colWidth == datatypes::MAXDECIMALWIDTH))
 | 
						|
        {
 | 
						|
          col.fDefaultWideDecimal =
 | 
						|
              colType.decimal128FromString(col_defaultValue.safeString(), &bDefaultConvertError);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          col.fDefaultInt = Convertor::convertDecimalString(col_defaultValue.str(), col_defaultValue.length(),
 | 
						|
                                                            colType.scale);
 | 
						|
 | 
						|
          if (errno == ERANGE)
 | 
						|
            bDefaultConvertError = true;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::DATE:
 | 
						|
      {
 | 
						|
        int convertStatus;
 | 
						|
        int32_t dt = dataconvert::DataConvert::convertColumnDate(
 | 
						|
            col_defaultValue.str(), dataconvert::CALPONTDATE_ENUM, convertStatus, col_defaultValue.length());
 | 
						|
 | 
						|
        if (convertStatus != 0)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        col.fDefaultInt = dt;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::DATETIME:
 | 
						|
      {
 | 
						|
        int convertStatus;
 | 
						|
        int64_t dt = dataconvert::DataConvert::convertColumnDatetime(
 | 
						|
            col_defaultValue.str(), dataconvert::CALPONTDATETIME_ENUM, convertStatus,
 | 
						|
            col_defaultValue.length());
 | 
						|
 | 
						|
        if (convertStatus != 0)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        col.fDefaultInt = dt;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::TIMESTAMP:
 | 
						|
      {
 | 
						|
        int convertStatus;
 | 
						|
        int64_t dt = dataconvert::DataConvert::convertColumnTimestamp(
 | 
						|
            col_defaultValue.str(), dataconvert::CALPONTDATETIME_ENUM, convertStatus,
 | 
						|
            col_defaultValue.length(), fTimeZone);
 | 
						|
 | 
						|
        if (convertStatus != 0)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        col.fDefaultInt = dt;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::TIME:
 | 
						|
      {
 | 
						|
        int convertStatus;
 | 
						|
        int64_t dt = dataconvert::DataConvert::convertColumnTime(
 | 
						|
            col_defaultValue.str(), dataconvert::CALPONTTIME_ENUM, convertStatus, col_defaultValue.length());
 | 
						|
 | 
						|
        if (convertStatus != 0)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        col.fDefaultInt = dt;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      case execplan::CalpontSystemCatalog::FLOAT:
 | 
						|
      case execplan::CalpontSystemCatalog::DOUBLE:
 | 
						|
      case execplan::CalpontSystemCatalog::UFLOAT:
 | 
						|
      case execplan::CalpontSystemCatalog::UDOUBLE:
 | 
						|
      {
 | 
						|
        errno = 0;
 | 
						|
        col.fDefaultDbl = strtod(col_defaultValue.str(), 0);
 | 
						|
 | 
						|
        if (errno == ERANGE)
 | 
						|
          bDefaultConvertError = true;
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      default:
 | 
						|
      {
 | 
						|
        col.fDefaultChr = col_defaultValue;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (bDefaultConvertError)
 | 
						|
    {
 | 
						|
      std::ostringstream oss;
 | 
						|
      oss << "Column " << col.colName << " in table " << fullTblName
 | 
						|
          << " has an invalid default value in system catalog.";
 | 
						|
      throw std::runtime_error(oss.str());
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Use the table and column names from the last <Table> just loaded, to
 | 
						|
// validate that all the columns have a <Column> or <DefaultColumn> tag
 | 
						|
// present in the job XML file.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void XMLJob::validateAllColumnsHaveTags(const execplan::CalpontSystemCatalog::RIDList& colRidList) const
 | 
						|
{
 | 
						|
  // Validate column list for the last <Table> processed
 | 
						|
  unsigned int iTbl = fJob.jobTableList.size() - 1;
 | 
						|
  const JobTable& tbl = fJob.jobTableList[iTbl];
 | 
						|
 | 
						|
  std::string tblName;
 | 
						|
  string::size_type startName = tbl.tblName.rfind('.');
 | 
						|
 | 
						|
  if (startName == string::npos)
 | 
						|
    tblName.assign(tbl.tblName);
 | 
						|
  else
 | 
						|
    tblName.assign(tbl.tblName.substr(startName + 1));
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // Loop through column tags, saving col OIDs to a std::set for lookups
 | 
						|
    std::set<execplan::CalpontSystemCatalog::OID> colOIDList;
 | 
						|
    typedef std::set<execplan::CalpontSystemCatalog::OID>::iterator SetIter;
 | 
						|
    std::pair<SetIter, bool> retVal;
 | 
						|
 | 
						|
    for (unsigned int iCol = 0; iCol < fJob.jobTableList[iTbl].colList.size(); iCol++)
 | 
						|
    {
 | 
						|
      const JobColumn& col = fJob.jobTableList[iTbl].colList[iCol];
 | 
						|
      retVal = colOIDList.insert(col.mapOid);
 | 
						|
 | 
						|
      if (!retVal.second)
 | 
						|
      {
 | 
						|
        boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
 | 
						|
            execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
 | 
						|
        cat->identity(execplan::CalpontSystemCatalog::EC);
 | 
						|
 | 
						|
        execplan::CalpontSystemCatalog::TableColName dbColName = cat->colName(col.mapOid);
 | 
						|
        std::ostringstream oss;
 | 
						|
        oss << "Column " << dbColName.column
 | 
						|
            << " referenced in Job XML"
 | 
						|
               " file more than once.";
 | 
						|
        throw std::runtime_error(oss.str());
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    SetIter pos;
 | 
						|
 | 
						|
    // Loop thru cols in system catalog and verify that each one has a tag
 | 
						|
    execplan::CalpontSystemCatalog::RIDList::const_iterator rid_iterator = colRidList.begin();
 | 
						|
 | 
						|
    while (rid_iterator != colRidList.end())
 | 
						|
    {
 | 
						|
      pos = colOIDList.find(rid_iterator->objnum);
 | 
						|
 | 
						|
      if (pos != colOIDList.end())
 | 
						|
      {
 | 
						|
        colOIDList.erase(pos);  // through with this column, so delete
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        boost::shared_ptr<execplan::CalpontSystemCatalog> cat =
 | 
						|
            execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(BULK_SYSCAT_SESSION_ID);
 | 
						|
        cat->identity(execplan::CalpontSystemCatalog::EC);
 | 
						|
 | 
						|
        execplan::CalpontSystemCatalog::TableColName dbColName = cat->colName(rid_iterator->objnum);
 | 
						|
        std::ostringstream oss;
 | 
						|
        oss << "No tag present in Job XML file for DB column: " << dbColName.column;
 | 
						|
        throw std::runtime_error(oss.str());
 | 
						|
      }
 | 
						|
 | 
						|
      ++rid_iterator;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Error validating column list for table " << fJob.schema << '.' << tblName << "; " << ex.what();
 | 
						|
    throw std::runtime_error(oss.str());
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Unknown Error validating column list for table " << fJob.schema << '.' << tblName;
 | 
						|
    throw std::runtime_error(oss.str());
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Generate a permanent or temporary Job XML file name path.
 | 
						|
// sXMLJobDir Command line override for complete Job directory path
 | 
						|
// jobDIr     Job subdirectory under default <BulkRoot> path
 | 
						|
// jobId      Job ID
 | 
						|
// bTempFile  Are we creating a temporary Job Xml File
 | 
						|
// schmaName  If temp file, this is schema name to use
 | 
						|
// tableName  If temp file, this is the table name to use
 | 
						|
// xmlDirPath The complete Job XML file path that is constructed
 | 
						|
// errMsg     Relevant error message if return value is not NO_ERROR.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
/* static */
 | 
						|
int XMLJob::genJobXMLFileName(const string& sXMLJobDir, const string& jobDir, const string& jobId,
 | 
						|
                              bool bTempFile, const string& /*schemaName*/, const string& /*tableName*/,
 | 
						|
                              boost::filesystem::path& xmlFilePath, string& errMsg,
 | 
						|
                              const std::string& tableOIDStr)
 | 
						|
{
 | 
						|
  // get full file directory path for XML job description file
 | 
						|
  if (sXMLJobDir.empty())
 | 
						|
  {
 | 
						|
    xmlFilePath = Config::getBulkRoot();
 | 
						|
    xmlFilePath /= jobDir;
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    xmlFilePath = sXMLJobDir;
 | 
						|
 | 
						|
    // If filespec doesn't begin with a '/' (i.e. it's not an absolute path),
 | 
						|
    // attempt to make it absolute so that we can log the full pathname.
 | 
						|
    if (!xmlFilePath.has_root_path())
 | 
						|
    {
 | 
						|
      char cwdPath[4096];
 | 
						|
      char* err;
 | 
						|
      err = getcwd(cwdPath, sizeof(cwdPath));
 | 
						|
      if (err == NULL)
 | 
						|
      {
 | 
						|
        errMsg = "Failed to get the current working directory.";
 | 
						|
        return -1;
 | 
						|
      }
 | 
						|
      string trailingPath(xmlFilePath.string());
 | 
						|
      xmlFilePath = cwdPath;
 | 
						|
      xmlFilePath /= trailingPath;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // Append the file name to the directory path
 | 
						|
  string jobFileName;
 | 
						|
 | 
						|
  if (bTempFile)
 | 
						|
  {
 | 
						|
    // Create tmp directory if does not exist
 | 
						|
    RETURN_ON_ERROR(createTempJobDir(xmlFilePath.string(), errMsg));
 | 
						|
    jobFileName += tableOIDStr;
 | 
						|
    // jobFileName += schemaName;
 | 
						|
    // jobFileName += '_';
 | 
						|
    // jobFileName += tableName;
 | 
						|
    jobFileName += "_D";
 | 
						|
 | 
						|
    string now(boost::posix_time::to_iso_string(boost::posix_time::second_clock::local_time()));
 | 
						|
 | 
						|
    // microseconds
 | 
						|
    struct timeval tp;
 | 
						|
    gettimeofday(&tp, 0);
 | 
						|
    ostringstream usec;
 | 
						|
    usec << setfill('0') << setw(6) << tp.tv_usec;
 | 
						|
 | 
						|
    jobFileName += now.substr(0, 8);
 | 
						|
    jobFileName += "_T";
 | 
						|
    jobFileName += now.substr(9, 6);
 | 
						|
    jobFileName += "_S";
 | 
						|
    jobFileName += usec.str();
 | 
						|
    jobFileName += '_';
 | 
						|
  }
 | 
						|
 | 
						|
  jobFileName += "Job_";
 | 
						|
  jobFileName += jobId;
 | 
						|
  jobFileName += ".xml";
 | 
						|
 | 
						|
  xmlFilePath /= jobFileName;
 | 
						|
 | 
						|
  return NO_ERROR;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Create directory for temporary XML job description files.
 | 
						|
// OAM restart should delete any/all files in this directory.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
/* static */
 | 
						|
int XMLJob::createTempJobDir(const string& xmlFilePath, string& errMsg)
 | 
						|
{
 | 
						|
  boost::filesystem::path pathDir(xmlFilePath);
 | 
						|
 | 
						|
  // create temp directory for XML job file if it does not exist
 | 
						|
  try
 | 
						|
  {
 | 
						|
    if (!boost::filesystem::exists(xmlFilePath))
 | 
						|
    {
 | 
						|
      string boostErrString;
 | 
						|
 | 
						|
      try
 | 
						|
      {
 | 
						|
        boost::filesystem::create_directories(pathDir);
 | 
						|
      }
 | 
						|
      catch (exception& ex)
 | 
						|
      {
 | 
						|
        // ignore exception for now; we may have just had a
 | 
						|
        // race condition where 2 jobs were creating dirs.
 | 
						|
        boostErrString = ex.what();
 | 
						|
      }
 | 
						|
 | 
						|
      if (!boost::filesystem::exists(xmlFilePath))
 | 
						|
      {
 | 
						|
        ostringstream oss;
 | 
						|
        oss << "Error creating XML temp job file directory(1) " << xmlFilePath << "; " << boostErrString;
 | 
						|
        errMsg = oss.str();
 | 
						|
 | 
						|
        return ERR_DIR_CREATE;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (exception& ex)
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Error creating XML temp job file directory(2) " << xmlFilePath << "; " << ex.what();
 | 
						|
    errMsg = oss.str();
 | 
						|
 | 
						|
    return ERR_DIR_CREATE;
 | 
						|
  }
 | 
						|
 | 
						|
  if (!boost::filesystem::is_directory(pathDir))
 | 
						|
  {
 | 
						|
    ostringstream oss;
 | 
						|
    oss << "Error creating XML temp job file directory " << xmlFilePath
 | 
						|
        << "; path already exists as non-directory" << endl;
 | 
						|
    errMsg = oss.str();
 | 
						|
 | 
						|
    return ERR_DIR_CREATE;
 | 
						|
  }
 | 
						|
 | 
						|
  return NO_ERROR;
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace WriteEngine
 |