You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1393 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1393 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
   Copyright (C) 2016 MariaDB Corporation
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
//   $Id: ddlpackageprocessor.cpp 9627 2013-06-18 13:59:21Z rdempsey $
 | 
						|
 | 
						|
#include <fstream>
 | 
						|
#include <iomanip>
 | 
						|
#include <iostream>
 | 
						|
// #define NDEBUG
 | 
						|
#include <cassert>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include "ddlpackageprocessor.h"
 | 
						|
 | 
						|
#include "mcs_datatype.h"
 | 
						|
#include "dataconvert.h"
 | 
						|
using namespace dataconvert;
 | 
						|
#include "calpontselectexecutionplan.h"
 | 
						|
#include "simplecolumn.h"
 | 
						|
#include "constantcolumn.h"
 | 
						|
#include "simplefilter.h"
 | 
						|
#include "constantfilter.h"
 | 
						|
using namespace execplan;
 | 
						|
#include "joblist.h"
 | 
						|
#include "joblistfactory.h"
 | 
						|
#include "distributedenginecomm.h"
 | 
						|
using namespace joblist;
 | 
						|
 | 
						|
#include "we_messages.h"
 | 
						|
using namespace WriteEngine;
 | 
						|
#include <boost/algorithm/string/case_conv.hpp>
 | 
						|
using namespace boost::algorithm;
 | 
						|
 | 
						|
#include "cacheutils.h"
 | 
						|
using namespace cacheutils;
 | 
						|
 | 
						|
#include "brmtypes.h"
 | 
						|
using namespace BRM;
 | 
						|
 | 
						|
#include "bytestream.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
 | 
						|
#include "oamcache.h"
 | 
						|
using namespace oam;
 | 
						|
 | 
						|
namespace
 | 
						|
{
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
const SOP opeq(new Operator("="));
 | 
						|
const SOP opne(new Operator("<>"));
 | 
						|
const SOP opor(new Operator("or"));
 | 
						|
const SOP opand(new Operator("and"));
 | 
						|
const SOP opis(new Operator("is"));
 | 
						|
}  // namespace
 | 
						|
 | 
						|
#include <boost/lexical_cast.hpp>
 | 
						|
using boost::any_cast;
 | 
						|
using boost::lexical_cast;
 | 
						|
 | 
						|
using namespace std;
 | 
						|
using namespace execplan;
 | 
						|
using namespace ddlpackage;
 | 
						|
 | 
						|
namespace ddlpackageprocessor
 | 
						|
{
 | 
						|
boost::mutex DDLPackageProcessor::dbrmMutex;
 | 
						|
 | 
						|
DDLPackageProcessor::~DDLPackageProcessor()
 | 
						|
{
 | 
						|
  // cout << "in DDLPackageProcessor destructor " << this << endl;
 | 
						|
  delete fWEClient;
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::getColumnsForTable(uint32_t sessionID, std::string schema, std::string table,
 | 
						|
                                             ColumnList& colList)
 | 
						|
{
 | 
						|
  CalpontSystemCatalog::TableName tableName;
 | 
						|
  tableName.schema = schema;
 | 
						|
  tableName.table = table;
 | 
						|
  std::string err;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
        CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
 | 
						|
    systemCatalogPtr->identity(CalpontSystemCatalog::EC);
 | 
						|
 | 
						|
    const CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName);
 | 
						|
 | 
						|
    CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();
 | 
						|
 | 
						|
    while (rid_iterator != ridList.end())
 | 
						|
    {
 | 
						|
      CalpontSystemCatalog::ROPair roPair = *rid_iterator;
 | 
						|
 | 
						|
      DDLColumn column;
 | 
						|
      column.oid = roPair.objnum;
 | 
						|
      column.colType = systemCatalogPtr->colType(column.oid);
 | 
						|
      column.tableColName = systemCatalogPtr->colName(column.oid);
 | 
						|
 | 
						|
      colList.push_back(column);
 | 
						|
 | 
						|
      ++rid_iterator;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    err = "DDLPackageProcessor::getColumnsForTable: while reading columns for table " + schema + '.' + table +
 | 
						|
          ": " + ex.what();
 | 
						|
    throw std::runtime_error(err);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    err = "DDLPackageProcessor::getColumnsForTable: caught unkown exception!";
 | 
						|
    throw std::runtime_error(err);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
execplan::CalpontSystemCatalog::ColDataType DDLPackageProcessor::convertDataType(int dataType)
 | 
						|
{
 | 
						|
  execplan::CalpontSystemCatalog::ColDataType colDataType;
 | 
						|
 | 
						|
  switch (dataType)
 | 
						|
  {
 | 
						|
    case ddlpackage::DDL_CHAR: colDataType = CalpontSystemCatalog::CHAR; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_VARCHAR: colDataType = CalpontSystemCatalog::VARCHAR; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_VARBINARY: colDataType = CalpontSystemCatalog::VARBINARY; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_BIT: colDataType = CalpontSystemCatalog::BIT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_REAL:
 | 
						|
    case ddlpackage::DDL_DECIMAL:
 | 
						|
    case ddlpackage::DDL_NUMERIC:
 | 
						|
    case ddlpackage::DDL_NUMBER: colDataType = CalpontSystemCatalog::DECIMAL; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_FLOAT: colDataType = CalpontSystemCatalog::FLOAT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_DOUBLE: colDataType = CalpontSystemCatalog::DOUBLE; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_INT:
 | 
						|
    case ddlpackage::DDL_INTEGER: colDataType = CalpontSystemCatalog::INT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_BIGINT: colDataType = CalpontSystemCatalog::BIGINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_MEDINT: colDataType = CalpontSystemCatalog::MEDINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_SMALLINT: colDataType = CalpontSystemCatalog::SMALLINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_TINYINT: colDataType = CalpontSystemCatalog::TINYINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_DECIMAL:
 | 
						|
    case ddlpackage::DDL_UNSIGNED_NUMERIC: colDataType = CalpontSystemCatalog::UDECIMAL; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_FLOAT: colDataType = CalpontSystemCatalog::UFLOAT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_DOUBLE: colDataType = CalpontSystemCatalog::UDOUBLE; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_INT: colDataType = CalpontSystemCatalog::UINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_BIGINT: colDataType = CalpontSystemCatalog::UBIGINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_MEDINT: colDataType = CalpontSystemCatalog::UMEDINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_SMALLINT: colDataType = CalpontSystemCatalog::USMALLINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNSIGNED_TINYINT: colDataType = CalpontSystemCatalog::UTINYINT; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_DATE: colDataType = CalpontSystemCatalog::DATE; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_DATETIME: colDataType = CalpontSystemCatalog::DATETIME; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_TIME: colDataType = CalpontSystemCatalog::TIME; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_TIMESTAMP: colDataType = CalpontSystemCatalog::TIMESTAMP; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_CLOB: colDataType = CalpontSystemCatalog::CLOB; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_BLOB: colDataType = CalpontSystemCatalog::BLOB; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_TEXT: colDataType = CalpontSystemCatalog::TEXT; break;
 | 
						|
 | 
						|
    default: throw runtime_error("Unsupported datatype!");
 | 
						|
  }
 | 
						|
 | 
						|
  return colDataType;
 | 
						|
}
 | 
						|
 | 
						|
std::string DDLPackageProcessor::buildTableConstraintName(const int oid, ddlpackage::DDL_CONSTRAINTS type)
 | 
						|
{
 | 
						|
  std::stringstream oid_number;
 | 
						|
  oid_number << oid;
 | 
						|
  std::string indexName;
 | 
						|
  std::string prefix;
 | 
						|
 | 
						|
  switch (type)
 | 
						|
  {
 | 
						|
    case ddlpackage::DDL_PRIMARY_KEY:
 | 
						|
      // prefix = "pk_";
 | 
						|
      // @note this name is supplied by the previous create index statement
 | 
						|
      // generated by Oracle. Use Oracle's PK name instead of making up our own
 | 
						|
      indexName = fPKName;
 | 
						|
      break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_FOREIGN_KEY:
 | 
						|
    case ddlpackage::DDL_REFERENCES: prefix = "fk_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNIQUE: prefix = "uk_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_CHECK: prefix = "ck_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_NOT_NULL: prefix = "nk_"; break;
 | 
						|
 | 
						|
    default: throw runtime_error("Unsupported constraint type!"); break;
 | 
						|
  }
 | 
						|
 | 
						|
  if (type != ddlpackage::DDL_PRIMARY_KEY)
 | 
						|
    indexName = prefix + oid_number.str();
 | 
						|
 | 
						|
  boost::to_lower(indexName);
 | 
						|
 | 
						|
  return indexName;
 | 
						|
}
 | 
						|
 | 
						|
std::string DDLPackageProcessor::buildColumnConstraintName(const std::string& schema,
 | 
						|
                                                           const std::string& table,
 | 
						|
                                                           const std::string& column,
 | 
						|
                                                           ddlpackage::DDL_CONSTRAINTS type)
 | 
						|
{
 | 
						|
  std::string indexName;
 | 
						|
 | 
						|
  std::string prefix;
 | 
						|
 | 
						|
  switch (type)
 | 
						|
  {
 | 
						|
    case ddlpackage::DDL_PRIMARY_KEY: prefix = "pk_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_FOREIGN_KEY:
 | 
						|
    case ddlpackage::DDL_REFERENCES: prefix = "fk_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNIQUE: prefix = "uk_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_CHECK: prefix = "ck_"; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_NOT_NULL: prefix = "nk_"; break;
 | 
						|
 | 
						|
    default: throw runtime_error("Unsupported constraint type!"); break;
 | 
						|
  }
 | 
						|
 | 
						|
  indexName = prefix + schema + "_" + table + "_" + column;
 | 
						|
 | 
						|
  boost::to_lower(indexName);
 | 
						|
 | 
						|
  return indexName;
 | 
						|
}
 | 
						|
 | 
						|
char DDLPackageProcessor::getConstraintCode(ddlpackage::DDL_CONSTRAINTS type)
 | 
						|
{
 | 
						|
  char constraint_type;
 | 
						|
 | 
						|
  switch (type)
 | 
						|
  {
 | 
						|
    case ddlpackage::DDL_PRIMARY_KEY: constraint_type = 'p'; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_REFERENCES:
 | 
						|
    case ddlpackage::DDL_FOREIGN_KEY: constraint_type = 'f'; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNIQUE: constraint_type = 'u'; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_CHECK: constraint_type = 'c'; break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_NOT_NULL: constraint_type = 'n'; break;
 | 
						|
 | 
						|
    default: constraint_type = '0'; break;
 | 
						|
  }
 | 
						|
 | 
						|
  return constraint_type;
 | 
						|
}
 | 
						|
 | 
						|
bool DDLPackageProcessor::isIndexConstraint(ddlpackage::DDL_CONSTRAINTS type)
 | 
						|
{
 | 
						|
  bool indexConstraint = false;
 | 
						|
 | 
						|
  switch (type)
 | 
						|
  {
 | 
						|
    case ddlpackage::DDL_PRIMARY_KEY:
 | 
						|
 | 
						|
    // @bug fix for #418, #416. Do not insert into sysindex
 | 
						|
    // case ddlpackage::DDL_REFERENCES:
 | 
						|
    case ddlpackage::DDL_UNIQUE: indexConstraint = true; break;
 | 
						|
 | 
						|
    default: break;
 | 
						|
  }
 | 
						|
 | 
						|
  return indexConstraint;
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::getColumnReferences(ddlpackage::TableConstraintDef& tableConstraint,
 | 
						|
                                              ddlpackage::ColumnNameList& columns)
 | 
						|
{
 | 
						|
  switch (tableConstraint.fConstraintType)
 | 
						|
  {
 | 
						|
    case ddlpackage::DDL_PRIMARY_KEY:
 | 
						|
    {
 | 
						|
      ddlpackage::TablePrimaryKeyConstraintDef& pkConstraint =
 | 
						|
          dynamic_cast<ddlpackage::TablePrimaryKeyConstraintDef&>(tableConstraint);
 | 
						|
 | 
						|
      columns = pkConstraint.fColumnNameList;
 | 
						|
    }
 | 
						|
    break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_REFERENCES:
 | 
						|
    case ddlpackage::DDL_FOREIGN_KEY:
 | 
						|
    {
 | 
						|
      ddlpackage::TableReferencesConstraintDef& fkConstraint =
 | 
						|
          dynamic_cast<ddlpackage::TableReferencesConstraintDef&>(tableConstraint);
 | 
						|
 | 
						|
      columns = fkConstraint.fColumns;
 | 
						|
    }
 | 
						|
    break;
 | 
						|
 | 
						|
    case ddlpackage::DDL_UNIQUE:
 | 
						|
    {
 | 
						|
      ddlpackage::TableUniqueConstraintDef& ukConstraint =
 | 
						|
          dynamic_cast<ddlpackage::TableUniqueConstraintDef&>(tableConstraint);
 | 
						|
      columns = ukConstraint.fColumnNameList;
 | 
						|
    }
 | 
						|
    break;
 | 
						|
 | 
						|
    default: break;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
boost::any DDLPackageProcessor::tokenizeData(execplan::CalpontSystemCatalog::SCN txnID,
 | 
						|
                                             const DDLResult& result,
 | 
						|
                                             const execplan::CalpontSystemCatalog::ColType& colType,
 | 
						|
                                             const boost::any& data)
 | 
						|
{
 | 
						|
  std::string err("DDLPackageProcessor::tokenizeData ");
 | 
						|
  SUMMARY_INFO(err);
 | 
						|
  boost::any value;
 | 
						|
 | 
						|
  if (result.result == NO_ERROR)
 | 
						|
  {
 | 
						|
    try
 | 
						|
    {
 | 
						|
      std::string str;
 | 
						|
 | 
						|
      if (data.type() == typeid(int))
 | 
						|
        str = lexical_cast<std::string>(any_cast<int>(data));
 | 
						|
      else if (data.type() == typeid(float))
 | 
						|
        str = lexical_cast<std::string>(any_cast<float>(data));
 | 
						|
      else if (data.type() == typeid(long long))
 | 
						|
        str = lexical_cast<std::string>(any_cast<long long>(data));
 | 
						|
      else if (data.type() == typeid(double))
 | 
						|
        str = lexical_cast<std::string>(any_cast<double>(data));
 | 
						|
      else if (data.type() == typeid(bool))
 | 
						|
        str = lexical_cast<std::string>(any_cast<bool>(data));
 | 
						|
      else if (data.type() == typeid(short))
 | 
						|
        str = lexical_cast<std::string>(any_cast<short>(data));
 | 
						|
      else if (data.type() == typeid(char))
 | 
						|
        str = lexical_cast<std::string>(any_cast<char>(data));
 | 
						|
      else
 | 
						|
        str = any_cast<std::string>(data);
 | 
						|
 | 
						|
      // Tokenize the data value
 | 
						|
      WriteEngine::DctnryStruct dictStruct;
 | 
						|
      dictStruct.dctnryOid = colType.ddn.dictOID;
 | 
						|
      // added for multifiles per oid
 | 
						|
      dictStruct.columnOid = colType.columnOID;
 | 
						|
      WriteEngine::DctnryTuple dictTuple;
 | 
						|
      dictTuple.sigValue = (unsigned char*)str.c_str();
 | 
						|
      dictTuple.sigSize = str.length();
 | 
						|
      int error = NO_ERROR;
 | 
						|
 | 
						|
      if (NO_ERROR !=
 | 
						|
          (error = fWriteEngine.tokenize(txnID, dictStruct, dictTuple, false)))  // @bug 5572 HDFS tmp file
 | 
						|
      {
 | 
						|
        WErrorCodes ec;
 | 
						|
        throw std::runtime_error("WE: Tokenization failed " + ec.errorString(error));
 | 
						|
      }
 | 
						|
 | 
						|
      WriteEngine::Token aToken = dictTuple.token;
 | 
						|
 | 
						|
      value = aToken;
 | 
						|
    }
 | 
						|
    catch (std::exception& ex)
 | 
						|
    {
 | 
						|
      err += ex.what();
 | 
						|
      throw std::runtime_error(err);
 | 
						|
    }
 | 
						|
    catch (...)
 | 
						|
    {
 | 
						|
      err += "Unknown exception caught, tokenizeData failed.";
 | 
						|
      throw std::runtime_error(err);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return value;
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::flushPrimprocCache(std::vector<execplan::CalpontSystemCatalog::OID>& oidList)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::flushPrimprocCache");
 | 
						|
 | 
						|
  int err = 0;
 | 
						|
  CalpontSystemCatalog::ROPair roPair;
 | 
						|
  std::vector<CalpontSystemCatalog::OID>::const_iterator iter = oidList.begin();
 | 
						|
  std::string error;
 | 
						|
  BRM::DBRM dbrm;
 | 
						|
  BRM::LBIDRange_v lbidRanges;
 | 
						|
  LBIDRange_v::iterator it;
 | 
						|
  BRM::BlockList_t blockList;
 | 
						|
  execplan::CalpontSystemCatalog::SCN verID = 0;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    while (iter != oidList.end())
 | 
						|
    {
 | 
						|
      WriteEngine::OID oid = *iter;
 | 
						|
 | 
						|
      if (oid < 3000)
 | 
						|
      {
 | 
						|
        ++iter;
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      //@Bug 1708 Flush primproc cache for associated lbids.
 | 
						|
      err = dbrm.lookup(oid, lbidRanges);
 | 
						|
 | 
						|
      if (err)
 | 
						|
      {
 | 
						|
        error = "DBRM lookUp error.";
 | 
						|
        throw std::runtime_error(error);
 | 
						|
      }
 | 
						|
 | 
						|
      blockList.clear();
 | 
						|
 | 
						|
      for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
 | 
						|
      {
 | 
						|
        for (LBID_t lbid = it->start; lbid < (it->start + it->size); lbid++)
 | 
						|
        {
 | 
						|
          blockList.push_back(BRM::LVP_t(lbid, verID));
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      // Need find a more efficient way to do this.
 | 
						|
      err = cacheutils::flushPrimProcBlocks(blockList);
 | 
						|
      // No need to handle this error as the error comes from timeout, not real error
 | 
						|
      (void)err;
 | 
						|
 | 
						|
      ++iter;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    error = ex.what();
 | 
						|
    throw std::runtime_error(error);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    error = "Unknown exception caught";
 | 
						|
    throw std::runtime_error(error);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::removeFiles(const uint64_t uniqueId,
 | 
						|
                                      std::vector<execplan::CalpontSystemCatalog::OID>& oidList)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::removeFiles");
 | 
						|
  ByteStream bytestream;
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  fWEClient->addQueue(uniqueId);
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << (uint32_t)oidList.size();
 | 
						|
 | 
						|
  for (unsigned i = 0; i < oidList.size(); i++)
 | 
						|
  {
 | 
						|
    bytestream << (uint32_t)oidList[i];
 | 
						|
  }
 | 
						|
 | 
						|
  uint32_t msgRecived = 0;
 | 
						|
  ByteStream::byte rc = 0;
 | 
						|
  ByteStream::byte tmp8;
 | 
						|
  string errorMsg;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      if (msgRecived == fWEClient->getPmCount())
 | 
						|
        break;
 | 
						|
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Network error while deleting files.";
 | 
						|
        fWEClient->removeQueue(uniqueId);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> tmp8;
 | 
						|
        rc = tmp8;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
          fWEClient->removeQueue(uniqueId);
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
          msgRecived++;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    throw std::runtime_error(ex.what());
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    throw std::runtime_error("Unknown error caught while deleting files.");
 | 
						|
  }
 | 
						|
 | 
						|
  fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
  {
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::createFiles(CalpontSystemCatalog::TableName aTableName, const int useDBRoot,
 | 
						|
                                      const uint64_t uniqueId, const uint32_t numOids)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::createFiles");
 | 
						|
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
      CalpontSystemCatalog::makeCalpontSystemCatalog(1);
 | 
						|
  CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(aTableName);
 | 
						|
  CalpontSystemCatalog::OID tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(aTableName);
 | 
						|
 | 
						|
  if (tableAUXColOid > 3000)
 | 
						|
  {
 | 
						|
    CalpontSystemCatalog::ROPair auxRoPair;
 | 
						|
    auxRoPair.rid = 0;
 | 
						|
    auxRoPair.objnum = tableAUXColOid;
 | 
						|
    ridList.push_back(auxRoPair);
 | 
						|
  }
 | 
						|
 | 
						|
  fWEClient->addQueue(uniqueId);
 | 
						|
  CalpontSystemCatalog::ColType colType;
 | 
						|
  ByteStream bytestream;
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
 | 
						|
  bytestream << (uint32_t)1;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << numOids;
 | 
						|
 | 
						|
  for (unsigned col = 0; col < ridList.size(); col++)
 | 
						|
  {
 | 
						|
    colType = systemCatalogPtr->colType(ridList[col].objnum);
 | 
						|
    bytestream << (uint32_t)ridList[col].objnum;
 | 
						|
    bytestream << (uint8_t)colType.colDataType;
 | 
						|
    bytestream << (uint8_t) false;
 | 
						|
    bytestream << (uint32_t)colType.colWidth;
 | 
						|
    bytestream << (uint16_t)useDBRoot;
 | 
						|
    bytestream << (uint32_t)colType.compressionType;
 | 
						|
 | 
						|
    if (colType.ddn.dictOID > 3000)
 | 
						|
    {
 | 
						|
      bytestream << (uint32_t)colType.ddn.dictOID;
 | 
						|
      bytestream << (uint8_t)colType.colDataType;
 | 
						|
      bytestream << (uint8_t) true;
 | 
						|
      bytestream << (uint32_t)colType.colWidth;
 | 
						|
      bytestream << (uint16_t)useDBRoot;
 | 
						|
      bytestream << (uint32_t)colType.compressionType;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  ByteStream::byte rc = 0;
 | 
						|
  ByteStream::byte tmp8;
 | 
						|
  string errorMsg;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    OamCache* oamcache = OamCache::makeOamCache();
 | 
						|
    boost::shared_ptr<std::map<int, int> > dbRootPMMap = oamcache->getDBRootToPMMap();
 | 
						|
    int pmNum = (*dbRootPMMap)[useDBRoot];
 | 
						|
 | 
						|
    fWEClient->write(bytestream, (uint32_t)pmNum);
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Network error while creating files.";
 | 
						|
        fWEClient->removeQueue(uniqueId);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> tmp8;
 | 
						|
        rc = tmp8;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    throw std::runtime_error(ex.what());
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    throw std::runtime_error("Unknown error caught while creating files.");
 | 
						|
  }
 | 
						|
 | 
						|
  fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::removePartitionFiles(std::vector<execplan::CalpontSystemCatalog::OID>& oidList,
 | 
						|
                                               const PartitionNums& partitions, uint64_t uniqueId)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::removeFiles");
 | 
						|
 | 
						|
  ByteStream::byte rc = 0;
 | 
						|
  std::string errorMsg;
 | 
						|
 | 
						|
  // get a unique number.
 | 
						|
  fWEClient->addQueue(uniqueId);
 | 
						|
  // Write the tables metadata to the system catalog
 | 
						|
  VERBOSE_INFO("Remove Partition Files");
 | 
						|
  ByteStream bs;
 | 
						|
  bs << (ByteStream::byte)WE_SVR_DROP_PARTITIONS;
 | 
						|
  bs << uniqueId;
 | 
						|
  bs << (uint32_t)oidList.size();
 | 
						|
  PartitionNums::const_iterator partIt;
 | 
						|
  vector<BRM::PartitionInfo> partInfos;
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < oidList.size(); i++)
 | 
						|
  {
 | 
						|
    bs << (uint32_t)oidList[i];
 | 
						|
 | 
						|
    // add oid to LogicalPartition to form PartitionInfo
 | 
						|
    for (partIt = partitions.begin(); partIt != partitions.end(); ++partIt)
 | 
						|
    {
 | 
						|
      PartitionInfo pi;
 | 
						|
      pi.lp = (*partIt);
 | 
						|
      pi.oid = oidList[i];
 | 
						|
      partInfos.push_back(pi);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  bs << (uint32_t)partInfos.size();
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < partInfos.size(); i++)
 | 
						|
    partInfos[i].serialize(bs);
 | 
						|
 | 
						|
  fWEClient->write_to_all(bs);
 | 
						|
  uint32_t pmCount = fWEClient->getPmCount();
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bsIn.reset(new ByteStream());
 | 
						|
 | 
						|
  while (pmCount)
 | 
						|
  {
 | 
						|
    bsIn->restart();
 | 
						|
    fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
    if (bsIn->length() == 0)  // read error
 | 
						|
    {
 | 
						|
      rc = NETWORK_ERROR;
 | 
						|
      errorMsg = "Lost connection to Write Engine Server while dropping partitions";
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      *bsIn >> rc;
 | 
						|
 | 
						|
      if (rc != 0)
 | 
						|
      {
 | 
						|
        *bsIn >> errorMsg;
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      pmCount--;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (rc)
 | 
						|
  {
 | 
						|
    WErrorCodes ec;
 | 
						|
    errorMsg = "WE: Error removing files " + ec.errorString(rc);
 | 
						|
    rc = cacheutils::dropPrimProcFdCache();
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
  }
 | 
						|
 | 
						|
  //@Bug 2171,3327. Drop PrimProc fd cache
 | 
						|
  rc = cacheutils::dropPrimProcFdCache();
 | 
						|
  fWEClient->removeQueue(uniqueId);
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::removeExtents(std::vector<execplan::CalpontSystemCatalog::OID>& oidList)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::removeExtents");
 | 
						|
  int err = 0;
 | 
						|
  err = fDbrm->deleteOIDs(oidList);
 | 
						|
 | 
						|
  if (err)
 | 
						|
  {
 | 
						|
    string errMsg;
 | 
						|
    BRM::errString(err, errMsg);
 | 
						|
    throw std::runtime_error(errMsg);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::createWriteDropLogFile(execplan::CalpontSystemCatalog::OID tableOid,
 | 
						|
                                                 uint64_t uniqueId,
 | 
						|
                                                 std::vector<execplan::CalpontSystemCatalog::OID>& oidList)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::createWriteDropLogFile");
 | 
						|
  // For shared nothing, the meta files are created under data1 with controllernode.
 | 
						|
  OamCache* oamcache = OamCache::makeOamCache();
 | 
						|
  std::string OAMParentModuleName = oamcache->getOAMParentModuleName();
 | 
						|
  OAMParentModuleName = OAMParentModuleName.substr(2, OAMParentModuleName.length());
 | 
						|
  int parentId = atoi(OAMParentModuleName.c_str());
 | 
						|
  ByteStream bytestream;
 | 
						|
  uint8_t rc = 0;
 | 
						|
  std::string errorMsg;
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPTABLE;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << (uint32_t)tableOid;
 | 
						|
  bytestream << (uint32_t)oidList.size();
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < oidList.size(); i++)
 | 
						|
  {
 | 
						|
    bytestream << (uint32_t)oidList[i];
 | 
						|
  }
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fWEClient->write(bytestream, (uint32_t)parentId);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      bsIn.reset(new ByteStream());
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Lost connection to Write Engine Server while writting drop table Log";
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> rc;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (runtime_error& ex)  // write error
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = ex.what();
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = "Got unknown exception while writting drop table Log.";
 | 
						|
  }
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::deleteLogFile(LogFileType fileType, execplan::CalpontSystemCatalog::OID tableOid,
 | 
						|
                                        uint64_t uniqueId)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::deleteLogFile");
 | 
						|
  OamCache* oamcache = OamCache::makeOamCache();
 | 
						|
  std::string OAMParentModuleName = oamcache->getOAMParentModuleName();
 | 
						|
  OAMParentModuleName = OAMParentModuleName.substr(2, OAMParentModuleName.length());
 | 
						|
  int parentId = atoi(OAMParentModuleName.c_str());
 | 
						|
  ByteStream bytestream;
 | 
						|
  uint8_t rc = 0;
 | 
						|
  std::string errorMsg;
 | 
						|
  fWEClient->addQueue(uniqueId);
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_DELETE_DDLLOG;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << (uint32_t)fileType;
 | 
						|
  bytestream << (uint32_t)tableOid;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fWEClient->write(bytestream, (uint32_t)parentId);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      bsIn.reset(new ByteStream());
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Lost connection to Write Engine Server while deleting DDL log";
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> rc;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (runtime_error& ex)  // write error
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = ex.what();
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = "Got unknown exception while deleting DDL Log.";
 | 
						|
  }
 | 
						|
 | 
						|
  fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
  {
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::fetchLogFile(TableLogInfo& tableLogInfos, uint64_t uniqueId)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::fetchLogFile");
 | 
						|
  OamCache* oamcache = OamCache::makeOamCache();
 | 
						|
  std::string OAMParentModuleName = oamcache->getOAMParentModuleName();
 | 
						|
 | 
						|
  // Use a sensible default so that substr doesn't throw...
 | 
						|
  if (OAMParentModuleName.empty())
 | 
						|
    OAMParentModuleName = "pm1";
 | 
						|
 | 
						|
  int parentId = atoi(OAMParentModuleName.substr(2, OAMParentModuleName.length()).c_str());
 | 
						|
  ByteStream bytestream;
 | 
						|
  uint8_t rc = 0;
 | 
						|
  uint32_t tmp32, tableOid, numOids, numPartitions;
 | 
						|
  LogFileType logFileType;
 | 
						|
  std::string errorMsg;
 | 
						|
  fWEClient->addQueue(uniqueId);
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_FETCH_DDL_LOGS;
 | 
						|
  bytestream << uniqueId;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fWEClient->write(bytestream, (uint32_t)parentId);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      bsIn.reset(new ByteStream());
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Lost connection to Write Engine Server while deleting DDL log";
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> rc;
 | 
						|
        *bsIn >> errorMsg;
 | 
						|
 | 
						|
        while (bsIn->length() > 0)
 | 
						|
        {
 | 
						|
          *bsIn >> tmp32;
 | 
						|
          tableOid = tmp32;
 | 
						|
          *bsIn >> tmp32;
 | 
						|
          logFileType = (LogFileType)tmp32;
 | 
						|
          *bsIn >> tmp32;
 | 
						|
          numOids = tmp32;
 | 
						|
          OidList oidsList;
 | 
						|
          PartitionNums partitionNums;
 | 
						|
 | 
						|
          for (unsigned i = 0; i < numOids; i++)
 | 
						|
          {
 | 
						|
            *bsIn >> tmp32;
 | 
						|
            oidsList.push_back(tmp32);
 | 
						|
          }
 | 
						|
 | 
						|
          *bsIn >> tmp32;
 | 
						|
          numPartitions = tmp32;
 | 
						|
          BRM::LogicalPartition lp;
 | 
						|
 | 
						|
          for (unsigned i = 0; i < numPartitions; i++)
 | 
						|
          {
 | 
						|
            lp.unserialize(*bsIn);
 | 
						|
            partitionNums.insert(lp);
 | 
						|
          }
 | 
						|
 | 
						|
          // build the tableloginfo
 | 
						|
          LogInfo aLog;
 | 
						|
          aLog.fileType = logFileType;
 | 
						|
          aLog.oids = oidsList;
 | 
						|
          aLog.partitionNums = partitionNums;
 | 
						|
          tableLogInfos[tableOid] = aLog;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (runtime_error& ex)  // write error
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = ex.what();
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = "Got unknown exception while fetching DDL Log.";
 | 
						|
  }
 | 
						|
 | 
						|
  fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::createWritePartitionLogFile(
 | 
						|
    execplan::CalpontSystemCatalog::OID tableOid, const PartitionNums& partitionNums,
 | 
						|
    std::vector<execplan::CalpontSystemCatalog::OID>& oidList, uint64_t uniqueId)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::createWritePartitionLogFile");
 | 
						|
  fWEClient->addQueue(uniqueId);
 | 
						|
  OamCache* oamcache = OamCache::makeOamCache();
 | 
						|
  std::string OAMParentModuleName = oamcache->getOAMParentModuleName();
 | 
						|
  OAMParentModuleName = OAMParentModuleName.substr(2, OAMParentModuleName.length());
 | 
						|
  int parentId = atoi(OAMParentModuleName.c_str());
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  ByteStream bytestream;
 | 
						|
  std::string errorMsg;
 | 
						|
  uint8_t rc = 0;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPPARTITION;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << (uint32_t)tableOid;
 | 
						|
  bytestream << (uint32_t)partitionNums.size();
 | 
						|
  PartitionNums::const_iterator it;
 | 
						|
 | 
						|
  for (it = partitionNums.begin(); it != partitionNums.end(); ++it)
 | 
						|
    (*it).serialize(bytestream);
 | 
						|
 | 
						|
  bytestream << (uint32_t)oidList.size();
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < oidList.size(); i++)
 | 
						|
  {
 | 
						|
    bytestream << (uint32_t)oidList[i];
 | 
						|
  }
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fWEClient->write(bytestream, (uint32_t)parentId);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      bsIn.reset(new ByteStream());
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Lost connection to Write Engine Server while writing DDL drop partition log";
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> rc;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (runtime_error& ex)  // write error
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = ex.what();
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = "Got unknown exception while writting truncate Log.";
 | 
						|
  }
 | 
						|
 | 
						|
  fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::createWriteTruncateTableLogFile(
 | 
						|
    execplan::CalpontSystemCatalog::OID tableOid, uint64_t uniqueId,
 | 
						|
    std::vector<execplan::CalpontSystemCatalog::OID>& oidList)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("DDLPackageProcessor::createWriteTruncateTableLogFile");
 | 
						|
  // For shared nothing, the meta files are created under data1 with controllernode.
 | 
						|
  OamCache* oamcache = OamCache::makeOamCache();
 | 
						|
  std::string OAMParentModuleName = oamcache->getOAMParentModuleName();
 | 
						|
  OAMParentModuleName = OAMParentModuleName.substr(2, OAMParentModuleName.length());
 | 
						|
  int parentId = atoi(OAMParentModuleName.c_str());
 | 
						|
  ByteStream bytestream;
 | 
						|
  uint8_t rc = 0;
 | 
						|
  std::string errorMsg;
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_WRITE_TRUNCATE;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << (uint32_t)tableOid;
 | 
						|
  bytestream << (uint32_t)oidList.size();
 | 
						|
 | 
						|
  for (uint32_t i = 0; i < oidList.size(); i++)
 | 
						|
  {
 | 
						|
    bytestream << (uint32_t)oidList[i];
 | 
						|
  }
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    fWEClient->write(bytestream, (uint32_t)parentId);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      bsIn.reset(new ByteStream());
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Lost connection to Write Engine Server while writing truncate table log";
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> rc;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
        }
 | 
						|
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (runtime_error& ex)  // write error
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = ex.what();
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    rc = NETWORK_ERROR;
 | 
						|
    errorMsg = "Got unknown exception while writting truncate table Log.";
 | 
						|
  }
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
    throw std::runtime_error(errorMsg);
 | 
						|
}
 | 
						|
 | 
						|
DDLPackageProcessor::DDLResult DDLPackageProcessor::processPackage(SqlStatement* sqlStmt)
 | 
						|
{
 | 
						|
  auto result = processPackageInternal(sqlStmt);
 | 
						|
  uint32_t tries = 0;
 | 
						|
  while ((result.result == PP_LOST_CONNECTION) && (tries < 5))
 | 
						|
  {
 | 
						|
    std::cerr << "DDLPackageProcessor: NETWORK ERROR; attempt # " << tries << std::endl;
 | 
						|
    joblist::ResourceManager* rm = joblist::ResourceManager::instance(true);
 | 
						|
    joblist::DistributedEngineComm* fEc = joblist::DistributedEngineComm::instance(rm);
 | 
						|
    if (fEc->Setup())
 | 
						|
      return result;
 | 
						|
 | 
						|
    result = processPackageInternal(sqlStmt);
 | 
						|
    ++tries;
 | 
						|
  }
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
DDLPackageProcessor::DDLResult DDLPackageProcessor::processPackageInternal(SqlStatement* /*sqlStmt*/)
 | 
						|
{
 | 
						|
  // This should not be called.
 | 
						|
  DDLPackageProcessor::DDLResult result;
 | 
						|
  result.result = NOT_ACCEPTING_PACKAGES;
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
bool DDLPackageProcessor::checkPPLostConnection(std::string error)
 | 
						|
{
 | 
						|
  return error.find(PPLostConnectionErrorCode) != std::string::npos;
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::returnOIDs(execplan::CalpontSystemCatalog::RIDList& ridList,
 | 
						|
                                     execplan::CalpontSystemCatalog::DictOIDList& dictOIDList)
 | 
						|
{
 | 
						|
  CalpontSystemCatalog::ROPair roPair;
 | 
						|
  CalpontSystemCatalog::RIDList::const_iterator col_iter = ridList.begin();
 | 
						|
  std::string err;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    execplan::ObjectIDManager fObjectIDManager;
 | 
						|
 | 
						|
    while (col_iter != ridList.end())
 | 
						|
    {
 | 
						|
      roPair = *col_iter;
 | 
						|
 | 
						|
      if (roPair.objnum < 3000)
 | 
						|
      {
 | 
						|
        ++col_iter;
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      fObjectIDManager.returnOID(roPair.objnum);
 | 
						|
      ++col_iter;
 | 
						|
    }
 | 
						|
 | 
						|
    CalpontSystemCatalog::DictOID dictOID;
 | 
						|
    CalpontSystemCatalog::DictOIDList::const_iterator dict_iter = dictOIDList.begin();
 | 
						|
 | 
						|
    while (dict_iter != dictOIDList.end())
 | 
						|
    {
 | 
						|
      dictOID = *dict_iter;
 | 
						|
 | 
						|
      if (dictOID.dictOID < 3000)
 | 
						|
      {
 | 
						|
        ++dict_iter;
 | 
						|
        continue;
 | 
						|
      }
 | 
						|
 | 
						|
      fObjectIDManager.returnOID(dictOID.dictOID);
 | 
						|
      ++dict_iter;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    err = ex.what();
 | 
						|
    throw std::runtime_error(err);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    err = "returnOIDs:Unknown exception caught";
 | 
						|
    throw std::runtime_error(err);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::findColumnData(uint32_t sessionID,
 | 
						|
                                         execplan::CalpontSystemCatalog::TableName& systableName,
 | 
						|
                                         const std::string& colName, DDLColumn& sysCol)
 | 
						|
{
 | 
						|
  ColumnList columns;
 | 
						|
  ColumnList::const_iterator column_iterator;
 | 
						|
  std::string err;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    getColumnsForTable(sessionID, systableName.schema, systableName.table, columns);
 | 
						|
    column_iterator = columns.begin();
 | 
						|
 | 
						|
    while (column_iterator != columns.end())
 | 
						|
    {
 | 
						|
      sysCol = *column_iterator;
 | 
						|
      boost::to_lower(sysCol.tableColName.column);
 | 
						|
 | 
						|
      if (colName == sysCol.tableColName.column)
 | 
						|
      {
 | 
						|
        break;
 | 
						|
      }
 | 
						|
 | 
						|
      ++column_iterator;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    err = ex.what();
 | 
						|
    throw std::runtime_error(err);
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    err = "findColumnData:Unknown exception caught";
 | 
						|
    throw std::runtime_error(err);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::cleanString(string& s)
 | 
						|
{
 | 
						|
  string::size_type pos = s.find_first_not_of(" ");
 | 
						|
 | 
						|
  // stripe off space and ' or '' at beginning and end
 | 
						|
  if (pos < s.length())
 | 
						|
  {
 | 
						|
    s = s.substr(pos, s.length() - pos);
 | 
						|
 | 
						|
    if ((pos = s.find_last_of(" ")) < s.length())
 | 
						|
    {
 | 
						|
      s = s.substr(0, pos);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (s[0] == '\'')
 | 
						|
  {
 | 
						|
    s = s.substr(1, s.length() - 2);
 | 
						|
 | 
						|
    if (s[0] == '\'')
 | 
						|
      s = s.substr(1, s.length() - 2);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void DDLPackageProcessor::convertRidToColumn(uint64_t& rid, unsigned& dbRoot, unsigned& partition,
 | 
						|
                                             unsigned& segment, unsigned filesPerColumnPartition,
 | 
						|
                                             unsigned extentsPerSegmentFile, unsigned extentRows,
 | 
						|
                                             unsigned startDBRoot, unsigned dbrootCnt)
 | 
						|
{
 | 
						|
  partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);
 | 
						|
 | 
						|
  segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) %
 | 
						|
            filesPerColumnPartition;
 | 
						|
 | 
						|
  dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;
 | 
						|
 | 
						|
  // Calculate the relative rid for this segment file
 | 
						|
  uint64_t relRidInPartition = rid - ((uint64_t)partition * (uint64_t)filesPerColumnPartition *
 | 
						|
                                      (uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
 | 
						|
  idbassert(relRidInPartition <=
 | 
						|
            (uint64_t)filesPerColumnPartition * (uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
 | 
						|
  uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
 | 
						|
  unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
 | 
						|
  uint64_t relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
 | 
						|
  rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows;
 | 
						|
}
 | 
						|
 | 
						|
int DDLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID)
 | 
						|
{
 | 
						|
  ByteStream bytestream;
 | 
						|
  bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_BLOCKS;
 | 
						|
  bytestream << uniqueId;
 | 
						|
  bytestream << sessionID;
 | 
						|
  bytestream << (uint32_t)txnID.id;
 | 
						|
  uint32_t msgRecived = 0;
 | 
						|
  fWEClient->write_to_all(bytestream);
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  bsIn.reset(new ByteStream());
 | 
						|
  int rc = 0;
 | 
						|
  ByteStream::byte tmp8;
 | 
						|
  std::string errorMsg;
 | 
						|
 | 
						|
  while (1)
 | 
						|
  {
 | 
						|
    if (msgRecived == fWEClient->getPmCount())
 | 
						|
      break;
 | 
						|
 | 
						|
    fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
    if (bsIn->length() == 0)  // read error
 | 
						|
    {
 | 
						|
      rc = NETWORK_ERROR;
 | 
						|
      fWEClient->removeQueue(uniqueId);
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      *bsIn >> tmp8;
 | 
						|
      rc = tmp8;
 | 
						|
 | 
						|
      if (rc != 0)
 | 
						|
      {
 | 
						|
        *bsIn >> errorMsg;
 | 
						|
        fWEClient->removeQueue(uniqueId);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
        msgRecived++;
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if ((msgRecived == fWEClient->getPmCount()) && (rc == 0))
 | 
						|
  {
 | 
						|
    bytestream.restart();
 | 
						|
    bytestream << (ByteStream::byte)WE_SVR_ROLLBACK_VERSION;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << sessionID;
 | 
						|
    bytestream << (uint32_t)txnID.id;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    msgRecived = 0;
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
      if (msgRecived == fWEClient->getPmCount())
 | 
						|
        break;
 | 
						|
 | 
						|
      fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
      if (bsIn->length() == 0)  // read error
 | 
						|
      {
 | 
						|
        fWEClient->removeQueue(uniqueId);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        *bsIn >> tmp8;
 | 
						|
        rc = tmp8;
 | 
						|
 | 
						|
        if (rc != 0)
 | 
						|
        {
 | 
						|
          *bsIn >> errorMsg;
 | 
						|
          fWEClient->removeQueue(uniqueId);
 | 
						|
          break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
          msgRecived++;
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DDLPackageProcessor::commitTransaction(uint64_t /*uniqueId*/, BRM::TxnID txnID)
 | 
						|
{
 | 
						|
  int rc = fDbrm->vbCommit(txnID.id);
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace ddlpackageprocessor
 |