You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			1335 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1335 lines
		
	
	
		
			44 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 * Copyright (C) 2016 MariaDB Corporation.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/***********************************************************************
 | 
						|
 *   $Id: commandpackageprocessor.cpp 9613 2013-06-12 15:44:24Z dhall $
 | 
						|
 *
 | 
						|
 *
 | 
						|
 ***********************************************************************/
 | 
						|
#include <ctime>
 | 
						|
#include <iostream>
 | 
						|
#include <sstream>
 | 
						|
#include <set>
 | 
						|
#include <vector>
 | 
						|
#include <boost/scoped_ptr.hpp>
 | 
						|
 | 
						|
#include "commandpackageprocessor.h"
 | 
						|
#include "messagelog.h"
 | 
						|
#include "dbrm.h"
 | 
						|
#include "sqllogger.h"
 | 
						|
#include "tablelockdata.h"
 | 
						|
#include "we_messages.h"
 | 
						|
#include "we_ddlcommandclient.h"
 | 
						|
#include "oamcache.h"
 | 
						|
#include "liboamcpp.h"
 | 
						|
#include "resourcemanager.h"
 | 
						|
#include "simplecolumn.h"
 | 
						|
#include "functioncolumn.h"
 | 
						|
#include "aggregatecolumn.h"
 | 
						|
#include "simplefilter.h"
 | 
						|
#include "constantcolumn.h"
 | 
						|
#include "pseudocolumn.h"
 | 
						|
#include "functor_str.h"
 | 
						|
 | 
						|
using namespace std;
 | 
						|
using namespace WriteEngine;
 | 
						|
using namespace dmlpackage;
 | 
						|
using namespace execplan;
 | 
						|
using namespace logging;
 | 
						|
using namespace boost;
 | 
						|
using namespace BRM;
 | 
						|
using namespace funcexp;
 | 
						|
 | 
						|
namespace dmlpackageprocessor
 | 
						|
{
 | 
						|
typedef CalpontSelectExecutionPlan::ColumnMap::value_type CMVT_;
 | 
						|
 | 
						|
// Tracks active cleartablelock commands by storing set of table lock IDs
 | 
						|
/*static*/ std::set<uint64_t> CommandPackageProcessor::fActiveClearTableLockCmds;
 | 
						|
/*static*/ boost::mutex CommandPackageProcessor::fActiveClearTableLockCmdMutex;
 | 
						|
 | 
						|
DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackageInternal(
 | 
						|
    dmlpackage::CalpontDMLPackage& cpackage)
 | 
						|
{
 | 
						|
  SUMMARY_INFO("CommandPackageProcessor::processPackage");
 | 
						|
 | 
						|
  DMLResult result;
 | 
						|
  result.result = NO_ERROR;
 | 
						|
 | 
						|
  VERBOSE_INFO("Processing Command DML Package...");
 | 
						|
  std::string stmt = cpackage.get_DMLStatement();
 | 
						|
  boost::algorithm::to_upper(stmt);
 | 
						|
  trim(stmt);
 | 
						|
  fSessionID = cpackage.get_SessionID();
 | 
						|
  BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
 | 
						|
  uint64_t uniqueId = 0;
 | 
						|
 | 
						|
  // Bug 5070. Added exception handling
 | 
						|
  try
 | 
						|
  {
 | 
						|
    uniqueId = fDbrm->getUnique64();
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    logging::Message::Args args;
 | 
						|
    logging::Message message(9);
 | 
						|
    args.add(ex.what());
 | 
						|
    message.format(args);
 | 
						|
    result.result = COMMAND_ERROR;
 | 
						|
    result.message = message;
 | 
						|
    fSessionManager.rolledback(txnid);
 | 
						|
    return result;
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    logging::Message::Args args;
 | 
						|
    logging::Message message(9);
 | 
						|
    args.add("Unknown error occurred while getting unique number.");
 | 
						|
    message.format(args);
 | 
						|
    result.result = COMMAND_ERROR;
 | 
						|
    result.message = message;
 | 
						|
    fSessionManager.rolledback(txnid);
 | 
						|
    return result;
 | 
						|
  }
 | 
						|
 | 
						|
  string errorMsg;
 | 
						|
  bool queRemoved = false;
 | 
						|
  logging::LoggingID lid(20);
 | 
						|
  logging::MessageLog ml(lid);
 | 
						|
  LoggingID logid(DMLLoggingId, fSessionID, txnid.id);
 | 
						|
  logging::Message::Args args1;
 | 
						|
  logging::Message msg(1);
 | 
						|
  Logger logger(logid.fSubsysID);
 | 
						|
 | 
						|
  if (stmt != "CLEANUP")
 | 
						|
  {
 | 
						|
    args1.add("Start SQL statement: ");
 | 
						|
    args1.add(stmt);
 | 
						|
    msg.format(args1);
 | 
						|
    logger.logMessage(LOG_TYPE_DEBUG, msg, logid);
 | 
						|
  }
 | 
						|
 | 
						|
  // fWEClient->addQueue(uniqueId);
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // set-up the transaction
 | 
						|
    if ((stmt == "COMMIT") || (stmt == "ROLLBACK"))
 | 
						|
    {
 | 
						|
      // SQLLogger sqlLogger(stmt, DMLLoggingId, cpackage.get_SessionID(), txnid.id);
 | 
						|
 | 
						|
      if ((txnid.valid))
 | 
						|
      {
 | 
						|
        vector<LBID_t> lbidList;
 | 
						|
        fDbrm->getUncommittedExtentLBIDs(static_cast<VER_t>(txnid.id), lbidList);
 | 
						|
        bool cpInvalidated = false;
 | 
						|
 | 
						|
        // cout << "get a valid txnid " << txnid.id << " and stmt is " << stmt << " and isBachinsert is " <<
 | 
						|
        // cpackage.get_isBatchInsert() << endl;
 | 
						|
        if ((stmt == "COMMIT") && (cpackage.get_isBatchInsert()))
 | 
						|
        {
 | 
						|
          // update syscolumn for the next value
 | 
						|
          boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
              CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
 | 
						|
          CalpontSystemCatalog::TableName tableName;
 | 
						|
          tableName = systemCatalogPtr->tableName(cpackage.getTableOid());
 | 
						|
 | 
						|
          try
 | 
						|
          {
 | 
						|
            uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
 | 
						|
 | 
						|
            if (nextVal != AUTOINCR_SATURATED)  // need to update syscolumn
 | 
						|
            {
 | 
						|
              // get autoincrement column oid
 | 
						|
              int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
 | 
						|
              // get the current nextVal from controller
 | 
						|
              scoped_ptr<DBRM> aDbrm(new DBRM());
 | 
						|
              uint64_t nextValInController = 0;
 | 
						|
              bool validNextVal = false;
 | 
						|
              aDbrm->getAILock(columnOid);
 | 
						|
              nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);  // in case it has changed
 | 
						|
              validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
 | 
						|
 | 
						|
              if ((validNextVal) && (nextValInController > nextVal))
 | 
						|
              {
 | 
						|
                fWEClient->removeQueue(uniqueId);
 | 
						|
                queRemoved = true;
 | 
						|
                WE_DDLCommandClient ddlClient;
 | 
						|
                uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController);
 | 
						|
                //@bug 5894. Need release lock.
 | 
						|
                aDbrm->releaseAILock(columnOid);
 | 
						|
 | 
						|
                if (rc != 0)
 | 
						|
                  throw std::runtime_error("Error in UpdateSyscolumnNextval");
 | 
						|
              }
 | 
						|
              else
 | 
						|
                aDbrm->releaseAILock(columnOid);
 | 
						|
            }
 | 
						|
          }
 | 
						|
          catch (std::exception& ex)
 | 
						|
          {
 | 
						|
            // Rollback transaction
 | 
						|
            rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
 | 
						|
            fSessionManager.rolledback(txnid);
 | 
						|
            throw std::runtime_error(ex.what());
 | 
						|
          }
 | 
						|
 | 
						|
          // systemCatalogPtr->updateColinfoCache(nextValMap);
 | 
						|
          int weRc = 0;
 | 
						|
 | 
						|
          if (cpackage.get_isAutocommitOn())
 | 
						|
          {
 | 
						|
            weRc = commitBatchAutoOnTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
 | 
						|
 | 
						|
            if (weRc != 0)
 | 
						|
              BRM::errString(weRc, errorMsg);
 | 
						|
 | 
						|
            cpInvalidated = true;
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            weRc = fDbrm->vbCommit(txnid.id);
 | 
						|
 | 
						|
            if (weRc != 0)
 | 
						|
              BRM::errString(weRc, errorMsg);
 | 
						|
 | 
						|
            // weRc = commitBatchAutoOffTransaction(uniqueId, txnid, cpackage.getTableOid(), errorMsg);
 | 
						|
          }
 | 
						|
 | 
						|
          if (weRc != 0)
 | 
						|
          {
 | 
						|
            throw std::runtime_error(errorMsg);
 | 
						|
          }
 | 
						|
 | 
						|
          logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
 | 
						|
          fSessionManager.committed(txnid);
 | 
						|
          // cout << "releasing  transaction id for batchinsert" <<  txnid.id << endl;
 | 
						|
        }
 | 
						|
        else if (stmt == "COMMIT")
 | 
						|
        {
 | 
						|
          // cout << "success in commitTransaction" << endl;
 | 
						|
          // update syscolumn for the next value
 | 
						|
          boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
              CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
 | 
						|
          CalpontSystemCatalog::TableName tableName;
 | 
						|
          uint32_t tableOid = cpackage.getTableOid();
 | 
						|
          std::vector<TableLockInfo> tableLocks = fDbrm->getAllTableLocks();
 | 
						|
 | 
						|
          if (tableOid == 0)  // special case: transaction commit for autocommit off and not following a dml
 | 
						|
                              // statement immediately
 | 
						|
          {
 | 
						|
            TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
 | 
						|
            TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
 | 
						|
            TablelockData::OIDTablelock::iterator iter;
 | 
						|
 | 
						|
            if (!tablelockMap.empty())
 | 
						|
            {
 | 
						|
              for (unsigned k = 0; k < tableLocks.size(); k++)
 | 
						|
              {
 | 
						|
                iter = tablelockMap.find(tableLocks[k].tableOID);
 | 
						|
 | 
						|
                if (iter != tablelockMap.end())
 | 
						|
                {
 | 
						|
                  tableName = systemCatalogPtr->tableName(tableLocks[k].tableOID);
 | 
						|
 | 
						|
                  try
 | 
						|
                  {
 | 
						|
                    uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
 | 
						|
 | 
						|
                    if (nextVal != AUTOINCR_SATURATED)  // neet to update syscolumn
 | 
						|
                    {
 | 
						|
                      // get autoincrement column oid
 | 
						|
                      int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
 | 
						|
                      // get the current nextVal from controller
 | 
						|
                      scoped_ptr<DBRM> aDbrm(new DBRM());
 | 
						|
                      uint64_t nextValInController = 0;
 | 
						|
                      bool validNextVal = false;
 | 
						|
                      aDbrm->getAILock(columnOid);
 | 
						|
                      nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);  // in case it has changed
 | 
						|
                      validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
 | 
						|
 | 
						|
                      if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
 | 
						|
                      {
 | 
						|
                        fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
                        queRemoved = true;
 | 
						|
                        WE_DDLCommandClient ddlClient;
 | 
						|
                        uint8_t rc =
 | 
						|
                            ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
 | 
						|
                        aDbrm->releaseAILock(columnOid);
 | 
						|
 | 
						|
                        if (rc != 0)
 | 
						|
                        {
 | 
						|
                          // for now
 | 
						|
                          fSessionManager.committed(txnid);
 | 
						|
                          throw std::runtime_error("Error in UpdateSyscolumnNextval");
 | 
						|
                        }
 | 
						|
                      }
 | 
						|
                      else
 | 
						|
                        aDbrm->releaseAILock(columnOid);
 | 
						|
                    }
 | 
						|
                  }
 | 
						|
                  catch (std::exception& ex)
 | 
						|
                  {
 | 
						|
                    // Rollback transaction, release tablelock
 | 
						|
                    rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
 | 
						|
                    fDbrm->releaseTableLock(iter->second);
 | 
						|
                    fSessionManager.rolledback(txnid);
 | 
						|
                    throw std::runtime_error(ex.what());
 | 
						|
                  }
 | 
						|
                }
 | 
						|
              }
 | 
						|
            }
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            if (tableOid >= 3000)
 | 
						|
            {
 | 
						|
              tableName = systemCatalogPtr->tableName(tableOid);
 | 
						|
 | 
						|
              try
 | 
						|
              {
 | 
						|
                uint64_t nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);
 | 
						|
 | 
						|
                if (nextVal != AUTOINCR_SATURATED)  // need to update syscolumn
 | 
						|
                {
 | 
						|
                  // get autoincrement column oid
 | 
						|
                  int32_t columnOid = systemCatalogPtr->autoColumOid(tableName);
 | 
						|
                  // get the current nextVal from controller
 | 
						|
                  scoped_ptr<DBRM> aDbrm(new DBRM());
 | 
						|
                  uint64_t nextValInController = 0;
 | 
						|
                  bool validNextVal = false;
 | 
						|
                  aDbrm->getAILock(columnOid);
 | 
						|
                  nextVal = systemCatalogPtr->nextAutoIncrValue(tableName);  // in case it has changed
 | 
						|
                  validNextVal = aDbrm->getAIValue(columnOid, &nextValInController);
 | 
						|
 | 
						|
                  if ((validNextVal) && (nextValInController > (uint64_t)nextVal))
 | 
						|
                  {
 | 
						|
                    fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
                    queRemoved = true;
 | 
						|
                    WE_DDLCommandClient ddlClient;
 | 
						|
                    uint8_t rc = ddlClient.UpdateSyscolumnNextval(columnOid, nextValInController, fSessionID);
 | 
						|
                    aDbrm->releaseAILock(columnOid);
 | 
						|
 | 
						|
                    if (rc != 0)
 | 
						|
                    {
 | 
						|
                      // for now
 | 
						|
                      fSessionManager.committed(txnid);
 | 
						|
                      throw std::runtime_error("Error in UpdateSyscolumnNextval");
 | 
						|
                    }
 | 
						|
                  }
 | 
						|
                  else
 | 
						|
                    aDbrm->releaseAILock(columnOid);
 | 
						|
                }
 | 
						|
              }
 | 
						|
              catch (std::exception& ex)
 | 
						|
              {
 | 
						|
                // Rollback transaction, release tablelock
 | 
						|
                rollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
 | 
						|
 | 
						|
                for (unsigned k = 0; k < tableLocks.size(); k++)
 | 
						|
                {
 | 
						|
                  if (tableLocks[k].tableOID == tableOid)
 | 
						|
                  {
 | 
						|
                    try
 | 
						|
                    {
 | 
						|
                      fDbrm->releaseTableLock(tableLocks[k].id);
 | 
						|
                    }
 | 
						|
                    catch (std::exception&)
 | 
						|
                    {
 | 
						|
                    }
 | 
						|
                  }
 | 
						|
                }
 | 
						|
 | 
						|
                fSessionManager.rolledback(txnid);
 | 
						|
                throw std::runtime_error(ex.what());
 | 
						|
              }
 | 
						|
            }
 | 
						|
          }
 | 
						|
 | 
						|
          int weRc = commitTransaction(uniqueId, txnid);
 | 
						|
          logging::logCommand(cpackage.get_SessionID(), txnid.id, "COMMIT;");
 | 
						|
 | 
						|
          if (weRc != WriteEngine::NO_ERROR)
 | 
						|
          {
 | 
						|
            // cout << "Got an error in commitTransaction" << endl;
 | 
						|
            WErrorCodes ec;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "COMMIT failed: " << ec.errorString(weRc);
 | 
						|
            throw std::runtime_error(oss.str());
 | 
						|
          }
 | 
						|
 | 
						|
          fSessionManager.committed(txnid);
 | 
						|
          // cout << "commit releasing  transaction id " <<  txnid.id << endl;
 | 
						|
        }
 | 
						|
        else if ((stmt == "ROLLBACK") && (cpackage.get_isBatchInsert()))
 | 
						|
        {
 | 
						|
          int weRc = 0;
 | 
						|
 | 
						|
          // version rollback, Bulkrollback
 | 
						|
          weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
 | 
						|
 | 
						|
          if (weRc == 0)
 | 
						|
          {
 | 
						|
            // MCOL-5021
 | 
						|
            fDbrm->addToLBIDList(fSessionID, lbidList);
 | 
						|
 | 
						|
            //@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids.
 | 
						|
            fDbrm->invalidateUncommittedExtentLBIDs(0, true, &lbidList);
 | 
						|
            cpInvalidated = true;
 | 
						|
            weRc =
 | 
						|
                rollBackBatchAutoOnTransaction(uniqueId, txnid, fSessionID, cpackage.getTableOid(), errorMsg);
 | 
						|
          }
 | 
						|
          else
 | 
						|
          {
 | 
						|
            throw std::runtime_error(errorMsg);
 | 
						|
          }
 | 
						|
 | 
						|
          logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
 | 
						|
 | 
						|
          if (weRc != 0)
 | 
						|
          {
 | 
						|
            //@Bug 4524. Don't set to readonly. Just error out.
 | 
						|
            WErrorCodes ec;
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "ROLLBACK batch insert failed due to: "
 | 
						|
                << "(" << weRc << ")" << ec.errorString(weRc);
 | 
						|
            // Log to error log
 | 
						|
            logging::Message::Args args1;
 | 
						|
            logging::Message message1(2);
 | 
						|
            args1.add(oss.str());
 | 
						|
            message1.format(args1);
 | 
						|
            ml.logErrorMessage(message1);
 | 
						|
            throw std::runtime_error(oss.str());
 | 
						|
          }
 | 
						|
 | 
						|
          fSessionManager.rolledback(txnid);
 | 
						|
          // cout << "batch rollback releasing  transaction id " <<  txnid.id << endl;
 | 
						|
        }
 | 
						|
        else if (stmt == "ROLLBACK")
 | 
						|
        {
 | 
						|
          std::string errorMsg("");
 | 
						|
          logging::logCommand(cpackage.get_SessionID(), txnid.id, "ROLLBACK;");
 | 
						|
          int weRc = tryToRollBackTransaction(uniqueId, txnid, fSessionID, errorMsg);
 | 
						|
 | 
						|
          if (weRc != 0)
 | 
						|
          {
 | 
						|
            // cout << "Rollback failed" << endl;
 | 
						|
            //@Bug 4524. Don't set to readonly. Just error out.
 | 
						|
            ostringstream oss;
 | 
						|
            oss << "ROLLBACK failed due to: " << errorMsg;
 | 
						|
            // Log to error log
 | 
						|
            logging::Message::Args args2;
 | 
						|
            logging::Message message2(2);
 | 
						|
            args2.add(oss.str());
 | 
						|
            message2.format(args2);
 | 
						|
            ml.logErrorMessage(message2);
 | 
						|
            throw std::runtime_error(oss.str());
 | 
						|
          }
 | 
						|
 | 
						|
          fSessionManager.rolledback(txnid);
 | 
						|
          // cout << "Rollback releasing  transaction id " <<  txnid.id << endl;
 | 
						|
        }
 | 
						|
 | 
						|
        if (!cpInvalidated)
 | 
						|
        {
 | 
						|
          // MCOL-5021
 | 
						|
          if (stmt == "ROLLBACK")
 | 
						|
          {
 | 
						|
            fDbrm->addToLBIDList(fSessionID, lbidList);
 | 
						|
          }
 | 
						|
 | 
						|
          fDbrm->invalidateUncommittedExtentLBIDs(0, stmt == "ROLLBACK", &lbidList);
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    else if (stmt == "CLEANUP")
 | 
						|
    {
 | 
						|
      execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID());
 | 
						|
      execplan::CalpontSystemCatalog::removeCalpontSystemCatalog(cpackage.get_SessionID() | 0x80000000);
 | 
						|
    }
 | 
						|
    else if (stmt == "VIEWTABLELOCK")
 | 
						|
    {
 | 
						|
      viewTableLock(cpackage, result);
 | 
						|
    }
 | 
						|
    else if (stmt == "CLEARTABLELOCK")
 | 
						|
    {
 | 
						|
      clearTableLock(uniqueId, cpackage, result);
 | 
						|
    }
 | 
						|
    else if (stmt == "ANALYZEPARTITIONBLOAT")
 | 
						|
    {
 | 
						|
      analyzePartitionBloat(cpackage, result);
 | 
						|
    }
 | 
						|
    else if (!cpackage.get_Logging())
 | 
						|
    {
 | 
						|
      BRM::TxnID txnid = fSessionManager.getTxnID(cpackage.get_SessionID());
 | 
						|
      logging::logDML(cpackage.get_SessionID(), txnid.id, cpackage.get_DMLStatement() + ";",
 | 
						|
                      cpackage.get_SchemaName());
 | 
						|
      SQLLogger sqlLogger(cpackage.get_DMLStatement(), DMLLoggingId, fSessionID, txnid.id);
 | 
						|
      // cout << "commandpackageprocessor Logging " << cpackage.get_DMLStatement()+ ";" << endl;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      std::string err = "Unknown command.";
 | 
						|
      SUMMARY_INFO(err);
 | 
						|
      throw std::runtime_error(err);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (logging::IDBExcept& noTable)  //@Bug 2606 catch no table found exception
 | 
						|
  {
 | 
						|
    cerr << "CommandPackageProcessor::processPackage: " << noTable.what() << endl;
 | 
						|
 | 
						|
    result.result = COMMAND_ERROR;
 | 
						|
    result.message = Message(noTable.what());
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    if (checkPPLostConnection(ex))
 | 
						|
    {
 | 
						|
      result.result = PP_LOST_CONNECTION;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
      cerr << "CommandPackageProcessor::processPackage: " << ex.what() << endl;
 | 
						|
 | 
						|
      logging::Message::Args args;
 | 
						|
      logging::Message message(1);
 | 
						|
      args.add(ex.what());
 | 
						|
      args.add("");
 | 
						|
      args.add("");
 | 
						|
      message.format(args);
 | 
						|
 | 
						|
      result.result = COMMAND_ERROR;
 | 
						|
      result.message = message;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (...)
 | 
						|
  {
 | 
						|
    cerr << "CommandPackageProcessor::processPackage: caught unknown exception!" << endl;
 | 
						|
    logging::Message::Args args;
 | 
						|
    logging::Message message(1);
 | 
						|
    args.add("Command Failed: ");
 | 
						|
    args.add("encountered unkown exception");
 | 
						|
    args.add("");
 | 
						|
    args.add("");
 | 
						|
    message.format(args);
 | 
						|
 | 
						|
    result.result = COMMAND_ERROR;
 | 
						|
    result.message = message;
 | 
						|
  }
 | 
						|
 | 
						|
  if (!queRemoved)
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
 | 
						|
  // release table lock
 | 
						|
  if ((result.result == NO_ERROR) && ((stmt == "COMMIT") || (stmt == "ROLLBACK")))
 | 
						|
  {
 | 
						|
    TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
 | 
						|
    TablelockData::OIDTablelock tablelockMap = tablelockData->getOidTablelockMap();
 | 
						|
    bool lockReleased = true;
 | 
						|
 | 
						|
    if (!tablelockMap.empty())
 | 
						|
    {
 | 
						|
      TablelockData::OIDTablelock::iterator it = tablelockMap.begin();
 | 
						|
 | 
						|
      while (it != tablelockMap.end())
 | 
						|
      {
 | 
						|
        try
 | 
						|
        {
 | 
						|
          lockReleased = fDbrm->releaseTableLock(it->second);
 | 
						|
          // cout << "releasing tablelock " << it->second << endl;
 | 
						|
        }
 | 
						|
        catch (std::exception& ex)
 | 
						|
        {
 | 
						|
          logging::Message::Args args;
 | 
						|
          logging::Message message(1);
 | 
						|
          args.add(ex.what());
 | 
						|
          args.add("");
 | 
						|
          args.add("");
 | 
						|
          message.format(args);
 | 
						|
 | 
						|
          result.result = COMMAND_ERROR;
 | 
						|
          result.message = message;
 | 
						|
        }
 | 
						|
 | 
						|
        if (!lockReleased)  // log an error
 | 
						|
        {
 | 
						|
          ostringstream os;
 | 
						|
          os << "tablelock for table oid " << it->first << " is not found";
 | 
						|
          logging::Message::Args args;
 | 
						|
          logging::Message message(1);
 | 
						|
          args.add(os.str());
 | 
						|
          args.add("");
 | 
						|
          args.add("");
 | 
						|
          message.format(args);
 | 
						|
          logging::LoggingID lid(21);
 | 
						|
          logging::MessageLog ml(lid);
 | 
						|
 | 
						|
          ml.logErrorMessage(message);
 | 
						|
        }
 | 
						|
 | 
						|
        // cout << "tablelock " << it->second << " is released" << endl;
 | 
						|
        it++;
 | 
						|
      }
 | 
						|
 | 
						|
      //@Bug 3557. Clean tablelock cache after commit/rollback.
 | 
						|
      TablelockData::removeTablelockData(cpackage.get_SessionID());
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  VERBOSE_INFO("Finished processing Command DML Package");
 | 
						|
  // LoggingID logid( DMLLoggingId, fSessionID, txnid.id);
 | 
						|
  logging::Message::Args args2;
 | 
						|
  logging::Message msg1(1);
 | 
						|
 | 
						|
  if (stmt != "CLEANUP")
 | 
						|
  {
 | 
						|
    args2.add("End SQL statement");
 | 
						|
    msg1.format(args2);
 | 
						|
    // Logger logger(logid.fSubsysID);
 | 
						|
    logger.logMessage(LOG_TYPE_DEBUG, msg1, logid);
 | 
						|
  }
 | 
						|
 | 
						|
  return result;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Process viewtablelock command; return table lock information for the
 | 
						|
// specified table.
 | 
						|
// This function closely resembles printTableLocks in viewtablelock.cpp.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void CommandPackageProcessor::viewTableLock(const dmlpackage::CalpontDMLPackage& cpackage,
 | 
						|
                                            DMLPackageProcessor::DMLResult& result)
 | 
						|
{
 | 
						|
  // Initialize System Catalog object used to get table name
 | 
						|
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
      CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
 | 
						|
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
 | 
						|
  CalpontSystemCatalog::TableName tableName;
 | 
						|
  tableName.schema = cpackage.get_SchemaName();
 | 
						|
  tableName.table = cpackage.get_TableName();
 | 
						|
  execplan::CalpontSystemCatalog::ROPair roPair;
 | 
						|
 | 
						|
  roPair = systemCatalogPtr->tableRID(tableName);
 | 
						|
 | 
						|
  // Get list of table locks for the requested table
 | 
						|
  std::vector<BRM::TableLockInfo> tableLocks;
 | 
						|
  tableLocks = fDbrm->getAllTableLocks();
 | 
						|
 | 
						|
  // Make preliminary pass through the table locks in order to determine our
 | 
						|
  // output column widths based on the data.  Min column widths are based on
 | 
						|
  // the width of the column heading (except for the 'state' column).
 | 
						|
  uint64_t maxLockID = 0;
 | 
						|
  uint32_t maxPID = 0;
 | 
						|
  int32_t maxSessionID = 0;
 | 
						|
  int32_t minSessionID = 0;
 | 
						|
  int32_t maxTxnID = 0;
 | 
						|
 | 
						|
  unsigned int lockIDColumnWidth = 6;       // "LockID"
 | 
						|
  unsigned int ownerColumnWidth = 7;        // "Process"
 | 
						|
  unsigned int pidColumnWidth = 3;          // "PID"
 | 
						|
  unsigned int sessionIDColumnWidth = 7;    // "Session"
 | 
						|
  unsigned int txnIDColumnWidth = 3;        // "Txn"
 | 
						|
  unsigned int createTimeColumnWidth = 12;  // "CreationTime"
 | 
						|
  unsigned int pmColumnWidth = 7;           // "DBRoots"
 | 
						|
  std::vector<std::string> createTimes;
 | 
						|
  char cTimeBuffer[1024];
 | 
						|
 | 
						|
  for (unsigned idx = 0; idx < tableLocks.size(); idx++)
 | 
						|
  {
 | 
						|
    if (tableLocks[idx].id > maxLockID)
 | 
						|
      maxLockID = tableLocks[idx].id;
 | 
						|
 | 
						|
    if (tableLocks[idx].ownerName.length() > ownerColumnWidth)
 | 
						|
      ownerColumnWidth = tableLocks[idx].ownerName.length();
 | 
						|
 | 
						|
    if (tableLocks[idx].ownerPID > maxPID)
 | 
						|
      maxPID = tableLocks[idx].ownerPID;
 | 
						|
 | 
						|
    if (tableLocks[idx].ownerSessionID > maxSessionID)
 | 
						|
      maxSessionID = tableLocks[idx].ownerSessionID;
 | 
						|
 | 
						|
    if (tableLocks[idx].ownerSessionID < minSessionID)
 | 
						|
      minSessionID = tableLocks[idx].ownerSessionID;
 | 
						|
 | 
						|
    if (tableLocks[idx].ownerTxnID > maxTxnID)
 | 
						|
      maxTxnID = tableLocks[idx].ownerTxnID;
 | 
						|
 | 
						|
    ctime_r(&tableLocks[idx].creationTime, cTimeBuffer);
 | 
						|
    cTimeBuffer[strlen(cTimeBuffer) - 1] = '\0';  // strip trailing '\n'
 | 
						|
    std::string cTimeStr(cTimeBuffer);
 | 
						|
 | 
						|
    if (cTimeStr.length() > createTimeColumnWidth)
 | 
						|
      createTimeColumnWidth = cTimeStr.length();
 | 
						|
 | 
						|
    createTimes.push_back(cTimeStr);
 | 
						|
 | 
						|
    std::ostringstream pms;  // It is dbroots now
 | 
						|
 | 
						|
    for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
 | 
						|
    {
 | 
						|
      if (k > 0)
 | 
						|
        pms << ',';
 | 
						|
 | 
						|
      pms << tableLocks[idx].dbrootList[k];
 | 
						|
    }
 | 
						|
 | 
						|
    if (pms.str().length() > pmColumnWidth)
 | 
						|
      pmColumnWidth = pms.str().length();
 | 
						|
  }
 | 
						|
 | 
						|
  ownerColumnWidth += 2;
 | 
						|
  pmColumnWidth += 2;
 | 
						|
  createTimeColumnWidth += 2;
 | 
						|
 | 
						|
  std::ostringstream idString;
 | 
						|
  idString << maxLockID;
 | 
						|
 | 
						|
  if (idString.str().length() > lockIDColumnWidth)
 | 
						|
    lockIDColumnWidth = idString.str().length();
 | 
						|
 | 
						|
  lockIDColumnWidth += 2;
 | 
						|
 | 
						|
  std::ostringstream pidString;
 | 
						|
  pidString << maxPID;
 | 
						|
 | 
						|
  if (pidString.str().length() > pidColumnWidth)
 | 
						|
    pidColumnWidth = pidString.str().length();
 | 
						|
 | 
						|
  pidColumnWidth += 2;
 | 
						|
 | 
						|
  const std::string sessionNoneStr("BulkLoad");
 | 
						|
  std::ostringstream sessionString;
 | 
						|
  sessionString << maxSessionID;
 | 
						|
 | 
						|
  if (sessionString.str().length() > sessionIDColumnWidth)
 | 
						|
    sessionIDColumnWidth = sessionString.str().length();
 | 
						|
 | 
						|
  if ((minSessionID < 0) && (sessionNoneStr.length() > sessionIDColumnWidth))
 | 
						|
    sessionIDColumnWidth = sessionNoneStr.length();
 | 
						|
 | 
						|
  sessionIDColumnWidth += 2;
 | 
						|
 | 
						|
  const std::string txnNoneStr("n/a");
 | 
						|
  std::ostringstream txnString;
 | 
						|
  txnString << maxTxnID;
 | 
						|
 | 
						|
  if (txnString.str().length() > txnIDColumnWidth)
 | 
						|
    txnIDColumnWidth = txnString.str().length();
 | 
						|
 | 
						|
  txnIDColumnWidth += 2;
 | 
						|
 | 
						|
  // Make second pass through the table locks to build our result.
 | 
						|
  // Keep in mind the same table could have more than 1 lock
 | 
						|
  // (on different PMs), so we don't exit loop after "first" match.
 | 
						|
  bool found = false;
 | 
						|
  ostringstream os;
 | 
						|
 | 
						|
  for (unsigned idx = 0; idx < tableLocks.size(); idx++)
 | 
						|
  {
 | 
						|
    if (roPair.objnum == (CalpontSystemCatalog::OID)tableLocks[idx].tableOID)
 | 
						|
    {
 | 
						|
      std::ostringstream pms;
 | 
						|
 | 
						|
      for (unsigned k = 0; k < tableLocks[idx].dbrootList.size(); k++)
 | 
						|
      {
 | 
						|
        if (k > 0)
 | 
						|
          pms << ',';
 | 
						|
 | 
						|
        pms << tableLocks[idx].dbrootList[k];
 | 
						|
      }
 | 
						|
 | 
						|
      if (found)  // write newline between lines
 | 
						|
      {
 | 
						|
        os << endl;
 | 
						|
      }
 | 
						|
      else  // write the column headers before the first entry
 | 
						|
      {
 | 
						|
        os.setf(ios::left, ios::adjustfield);
 | 
						|
        os << setw(lockIDColumnWidth) << "LockID" << setw(ownerColumnWidth) << "Process"
 | 
						|
           << setw(pidColumnWidth) << "PID" << setw(sessionIDColumnWidth) << "Session"
 | 
						|
           << setw(txnIDColumnWidth) << "Txn" << setw(createTimeColumnWidth) << "CreationTime" << setw(9)
 | 
						|
           << "State" << setw(pmColumnWidth) << "DBRoots" << endl;
 | 
						|
      }
 | 
						|
 | 
						|
      os << "  " << setw(lockIDColumnWidth) << tableLocks[idx].id << setw(ownerColumnWidth)
 | 
						|
         << tableLocks[idx].ownerName << setw(pidColumnWidth) << tableLocks[idx].ownerPID;
 | 
						|
 | 
						|
      // Log session ID, or "BulkLoad" if session is -1
 | 
						|
      if (tableLocks[idx].ownerSessionID < 0)
 | 
						|
        os << setw(sessionIDColumnWidth) << sessionNoneStr;
 | 
						|
      else
 | 
						|
        os << setw(sessionIDColumnWidth) << tableLocks[idx].ownerSessionID;
 | 
						|
 | 
						|
      // Log txn ID, or "n/a" if txn is -1
 | 
						|
      if (tableLocks[idx].ownerTxnID < 0)
 | 
						|
        os << setw(txnIDColumnWidth) << txnNoneStr;
 | 
						|
      else
 | 
						|
        os << setw(txnIDColumnWidth) << tableLocks[idx].ownerTxnID;
 | 
						|
 | 
						|
      os << setw(createTimeColumnWidth) << createTimes[idx] << setw(9)
 | 
						|
         << ((tableLocks[idx].state == BRM::LOADING) ? "LOADING" : "CLEANUP") << setw(pmColumnWidth)
 | 
						|
         << pms.str();
 | 
						|
 | 
						|
      found = true;
 | 
						|
    }  // end of displaying a table lock match
 | 
						|
  }  // end of loop through all table locks
 | 
						|
 | 
						|
  if (!found)
 | 
						|
  {
 | 
						|
    os << " Table " << tableName.schema << "." << tableName.table << " is not locked by any process.";
 | 
						|
  }
 | 
						|
 | 
						|
  result.tableLockInfo = os.str();
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Process cleartablelock command; execute bulk rollback and release table lock
 | 
						|
// for the specified table.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void CommandPackageProcessor::clearTableLock(uint64_t uniqueId, const dmlpackage::CalpontDMLPackage& cpackage,
 | 
						|
                                             DMLPackageProcessor::DMLResult& result)
 | 
						|
{
 | 
						|
  CalpontSystemCatalog::TableName tableName;
 | 
						|
  tableName.schema = cpackage.get_SchemaName();
 | 
						|
  tableName.table = cpackage.get_TableName();
 | 
						|
 | 
						|
  // Get the Table lock ID that is passed in the SQL statement attribute.
 | 
						|
  // This is a kludge we may want to consider changing.  Could add a table
 | 
						|
  // lock ID attribute to the CalpontDMLPackage object.
 | 
						|
  std::istringstream lockIDString(cpackage.get_SQLStatement());
 | 
						|
  uint64_t tableLockID;
 | 
						|
  lockIDString >> tableLockID;
 | 
						|
 | 
						|
  //----------------------------------------------------- start of syslog code
 | 
						|
  //
 | 
						|
  // Log initiation of cleartablelock to syslog
 | 
						|
  //
 | 
						|
  const std::string APPLNAME("cleartablelock SQL cmd");
 | 
						|
  const int SUBSYSTEM_ID = 21;  // dmlpackageproc
 | 
						|
  const int INIT_MSG_NUM = logging::M0088;
 | 
						|
  logging::Message::Args msgArgs;
 | 
						|
  logging::Message logMsg1(INIT_MSG_NUM);
 | 
						|
  msgArgs.add(APPLNAME);
 | 
						|
  msgArgs.add(tableName.toString());
 | 
						|
  msgArgs.add(tableLockID);
 | 
						|
  logMsg1.format(msgArgs);
 | 
						|
  logging::LoggingID lid(SUBSYSTEM_ID);
 | 
						|
  logging::MessageLog ml(lid);
 | 
						|
  ml.logInfoMessage(logMsg1);
 | 
						|
  //------------------------------------------------------- end of syslog code
 | 
						|
 | 
						|
  boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
  messageqcpp::ByteStream bsOut;
 | 
						|
  bool lockGrabbed = false;
 | 
						|
  bool bErrFlag = false;
 | 
						|
  bool bRemoveMetaErrFlag = false;
 | 
						|
  std::ostringstream combinedErrMsg;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // Make sure BRM is in READ-WRITE state before starting
 | 
						|
    int brmRc = fDbrm->isReadWrite();
 | 
						|
 | 
						|
    if (brmRc != BRM::ERR_OK)
 | 
						|
    {
 | 
						|
      std::string brmErrMsg;
 | 
						|
      BRM::errString(brmRc, brmErrMsg);
 | 
						|
      std::ostringstream oss;
 | 
						|
      oss << "Failed BRM status check: " << brmErrMsg;
 | 
						|
      throw std::runtime_error(oss.str());
 | 
						|
    }
 | 
						|
 | 
						|
    BRM::TableLockInfo lockInfo;
 | 
						|
    establishTableLockToClear(tableLockID, lockInfo);
 | 
						|
    lockGrabbed = true;
 | 
						|
 | 
						|
    std::set<int> pmSet;
 | 
						|
 | 
						|
    // Construct relevant list of PMs based on the DBRoots associated
 | 
						|
    // with the tableLock.
 | 
						|
    for (unsigned int k = 0; k < lockInfo.dbrootList.size(); k++)
 | 
						|
    {
 | 
						|
      if (!oamcache()->isOffline(lockInfo.dbrootList[k]))
 | 
						|
      {
 | 
						|
        int pm = oamcache()->getOwnerPM(lockInfo.dbrootList[k]);
 | 
						|
        pmSet.insert(pm);
 | 
						|
      }
 | 
						|
      else
 | 
						|
      {
 | 
						|
        std::ostringstream oss;
 | 
						|
        oss << "DBRoot " << lockInfo.dbrootList[k] << " does not map to a PM.  Cannot perform rollback";
 | 
						|
        throw std::runtime_error(oss.str());
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    std::vector<int> pmList;
 | 
						|
 | 
						|
    for (std::set<int>::const_iterator setIter = pmSet.begin(); setIter != pmSet.end(); ++setIter)
 | 
						|
    {
 | 
						|
      pmList.push_back(*setIter);
 | 
						|
    }
 | 
						|
 | 
						|
    std::cout << "cleartablelock rollback for table lock " << tableLockID << " being forwarded to PM(s): ";
 | 
						|
 | 
						|
    for (unsigned int k = 0; k < pmList.size(); k++)
 | 
						|
    {
 | 
						|
      if (k > 0)
 | 
						|
        std::cout << ", ";
 | 
						|
 | 
						|
      std::cout << pmList[k];
 | 
						|
    }
 | 
						|
 | 
						|
    std::cout << std::endl;
 | 
						|
 | 
						|
    // Perform bulk rollback if state is in LOADING state
 | 
						|
    if (lockInfo.state == BRM::LOADING)
 | 
						|
    {
 | 
						|
      fWEClient->addQueue(uniqueId);
 | 
						|
 | 
						|
      //------------------------------------------------------------------
 | 
						|
      // Send rollback msg to the writeengine server for every PM.
 | 
						|
      // We send to each PM, instead of just to PMs in the tablelock's
 | 
						|
      // PM list, just in case a DBRoot has been moved to a different PM.
 | 
						|
      //------------------------------------------------------------------
 | 
						|
      //			std::cout << "cleartablelock rollback: tableLock-" << tableLockID <<
 | 
						|
      //				": uniqueID-" << uniqueId             <<
 | 
						|
      //				": oid-"      << lockInfo.tableOID    <<
 | 
						|
      //				"; name-"     << tableName.toString() <<
 | 
						|
      //				"; app-"      << APPLNAME             << std::endl;
 | 
						|
 | 
						|
      bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK;
 | 
						|
      bsOut << uniqueId;
 | 
						|
      bsOut << tableLockID;
 | 
						|
      bsOut << lockInfo.tableOID;
 | 
						|
      bsOut << tableName.toString();
 | 
						|
      bsOut << APPLNAME;
 | 
						|
 | 
						|
      for (unsigned j = 0; j < pmList.size(); j++)
 | 
						|
      {
 | 
						|
        fWEClient->write(bsOut, pmList[j]);
 | 
						|
      }
 | 
						|
 | 
						|
      // Wait for "all" the responses, and accumulate any/all errors
 | 
						|
      unsigned int pmMsgCnt = 0;
 | 
						|
 | 
						|
      while (pmMsgCnt < pmList.size())
 | 
						|
      {
 | 
						|
        std::string rollbackErrMsg;
 | 
						|
        bsIn.reset(new messageqcpp::ByteStream());
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if (bsIn->length() == 0)
 | 
						|
        {
 | 
						|
          bRemoveMetaErrFlag = true;
 | 
						|
 | 
						|
          if (combinedErrMsg.str().length() > 0)
 | 
						|
            combinedErrMsg << std::endl;
 | 
						|
 | 
						|
          combinedErrMsg << "Network error, PM rollback; ";
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          messageqcpp::ByteStream::byte rc;
 | 
						|
          uint16_t pmNum;
 | 
						|
          *bsIn >> rc;
 | 
						|
          *bsIn >> rollbackErrMsg;
 | 
						|
          *bsIn >> pmNum;
 | 
						|
 | 
						|
          //					std::cout << "cleartablelock rollback response from PM"<<
 | 
						|
          //						pmNum << "; rc-" << (int)rc <<
 | 
						|
          //						"; errMsg: {" << rollbackErrMsg << '}' << std::endl;
 | 
						|
 | 
						|
          if (rc != 0)
 | 
						|
          {
 | 
						|
            bRemoveMetaErrFlag = true;
 | 
						|
 | 
						|
            if (combinedErrMsg.str().empty())
 | 
						|
              combinedErrMsg << "Rollback error; ";
 | 
						|
            else
 | 
						|
              combinedErrMsg << std::endl;
 | 
						|
 | 
						|
            combinedErrMsg << "[PM" << pmNum << "] " << rollbackErrMsg;
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        pmMsgCnt++;
 | 
						|
      }  // end of while loop to process all responses to bulk rollback
 | 
						|
 | 
						|
      // If no errors so far, then change state to CLEANUP state.
 | 
						|
      // We ignore the return stateChange flag.
 | 
						|
      if (!bErrFlag)
 | 
						|
      {
 | 
						|
        fDbrm->changeState(tableLockID, BRM::CLEANUP);
 | 
						|
      }
 | 
						|
    }  // end of (lockInfo.state == BRM::LOADING)
 | 
						|
 | 
						|
    // If no errors so far, then:
 | 
						|
    // 1. delete meta data backup rollback files
 | 
						|
    // 2. finally release table lock
 | 
						|
    if (!bErrFlag)
 | 
						|
    {
 | 
						|
      bsOut.reset();
 | 
						|
 | 
						|
      //------------------------------------------------------------------
 | 
						|
      // Delete meta data backup rollback files
 | 
						|
      //------------------------------------------------------------------
 | 
						|
      //			std::cout << "cleartablelock cleanup: uniqueID-" << uniqueId <<
 | 
						|
      //				": oid-" << lockInfo.tableOID << std::endl;
 | 
						|
 | 
						|
      bsOut << (messageqcpp::ByteStream::byte)WE_SVR_DML_BULKROLLBACK_CLEANUP;
 | 
						|
      bsOut << uniqueId;
 | 
						|
      bsOut << lockInfo.tableOID;
 | 
						|
 | 
						|
      for (unsigned j = 0; j < pmList.size(); j++)
 | 
						|
      {
 | 
						|
        fWEClient->write(bsOut, pmList[j]);
 | 
						|
      }
 | 
						|
 | 
						|
      // Wait for "all" the responses, and accumulate any/all errors
 | 
						|
      unsigned int pmMsgCnt = 0;
 | 
						|
 | 
						|
      while (pmMsgCnt < pmList.size())
 | 
						|
      {
 | 
						|
        std::string fileDeleteErrMsg;
 | 
						|
        bsIn.reset(new messageqcpp::ByteStream());
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if (bsIn->length() == 0)
 | 
						|
        {
 | 
						|
          bRemoveMetaErrFlag = true;
 | 
						|
 | 
						|
          if (combinedErrMsg.str().length() > 0)
 | 
						|
            combinedErrMsg << std::endl;
 | 
						|
 | 
						|
          combinedErrMsg << "Network error, PM rollback cleanup; ";
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
          messageqcpp::ByteStream::byte rc;
 | 
						|
          uint16_t pmNum;
 | 
						|
          *bsIn >> rc;
 | 
						|
          *bsIn >> fileDeleteErrMsg;
 | 
						|
          *bsIn >> pmNum;
 | 
						|
 | 
						|
          //					std::cout << "cleartablelock cleanup response from PM" <<
 | 
						|
          //						pmNum << "; rc-" << (int)rc <<
 | 
						|
          //						"; errMsg: {" << fileDeleteErrMsg << '}' << std::endl;
 | 
						|
 | 
						|
          if (rc != 0)
 | 
						|
          {
 | 
						|
            bRemoveMetaErrFlag = true;
 | 
						|
 | 
						|
            if (combinedErrMsg.str().empty())
 | 
						|
              combinedErrMsg << "Cleanup error; ";
 | 
						|
            else
 | 
						|
              combinedErrMsg << std::endl;
 | 
						|
 | 
						|
            combinedErrMsg << "[PM" << pmNum << "] " << fileDeleteErrMsg;
 | 
						|
          }
 | 
						|
        }
 | 
						|
 | 
						|
        pmMsgCnt++;
 | 
						|
      }  // end of while loop to process all responses to rollback cleanup
 | 
						|
 | 
						|
      // We ignore return release flag from releaseTableLock().
 | 
						|
      if (!bErrFlag)
 | 
						|
      {
 | 
						|
        fDbrm->releaseTableLock(tableLockID);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    bErrFlag = true;
 | 
						|
    combinedErrMsg << ex.what();
 | 
						|
  }
 | 
						|
 | 
						|
  if (!bErrFlag)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " is cleared.";
 | 
						|
 | 
						|
    //@Bug 4517. Release tablelock if remove meta files failed.
 | 
						|
    if (bRemoveMetaErrFlag)
 | 
						|
    {
 | 
						|
      oss << " Warning: " << combinedErrMsg.str();
 | 
						|
    }
 | 
						|
 | 
						|
    result.tableLockInfo = oss.str();
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Table lock " << tableLockID << " for table " << tableName.toString() << " cannot be cleared.  "
 | 
						|
        << combinedErrMsg.str();
 | 
						|
    result.tableLockInfo = oss.str();
 | 
						|
  }
 | 
						|
 | 
						|
  // Remove tableLockID out of the active cleartableLock command list
 | 
						|
  if (lockGrabbed)
 | 
						|
  {
 | 
						|
    boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex);
 | 
						|
    fActiveClearTableLockCmds.erase(tableLockID);
 | 
						|
  }
 | 
						|
 | 
						|
  //----------------------------------------------------- start of syslog code
 | 
						|
  //
 | 
						|
  // Log completion of cleartablelock to syslog
 | 
						|
  //
 | 
						|
  const int COMP_MSG_NUM = logging::M0089;
 | 
						|
  msgArgs.reset();
 | 
						|
  logging::Message logMsg2(COMP_MSG_NUM);
 | 
						|
  msgArgs.add(APPLNAME);
 | 
						|
  msgArgs.add(tableName.toString());
 | 
						|
  msgArgs.add(tableLockID);
 | 
						|
  std::string finalStatus;
 | 
						|
 | 
						|
  if (!bErrFlag)
 | 
						|
  {
 | 
						|
    finalStatus = "Completed successfully";
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    finalStatus = "Encountered errors: ";
 | 
						|
    finalStatus += combinedErrMsg.str();
 | 
						|
  }
 | 
						|
 | 
						|
  msgArgs.add(finalStatus);
 | 
						|
  logMsg2.format(msgArgs);
 | 
						|
  ml.logInfoMessage(logMsg2);
 | 
						|
  //------------------------------------------------------- end of syslog code
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Get/return the current tablelock info for tableLockID, and reassign the table
 | 
						|
// lock to ourselves if we can.  If the lock is still in use by another process
 | 
						|
// or by another DML cleartablelock thread, then we error out with exception.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
void CommandPackageProcessor::establishTableLockToClear(uint64_t tableLockID, BRM::TableLockInfo& lockInfo)
 | 
						|
{
 | 
						|
  boost::mutex::scoped_lock lock(fActiveClearTableLockCmdMutex);
 | 
						|
 | 
						|
  // Get current table lock info
 | 
						|
  bool getLockInfo = fDbrm->getTableLockInfo(tableLockID, &lockInfo);
 | 
						|
 | 
						|
  if (!getLockInfo)
 | 
						|
  {
 | 
						|
    throw std::runtime_error(std::string("Lock does not exist."));
 | 
						|
  }
 | 
						|
 | 
						|
  std::string processName("DMLProc clearTableLock");
 | 
						|
  uint32_t processID = ::getpid();
 | 
						|
 | 
						|
  // See if another thread is executing a cleartablelock cmd for this table
 | 
						|
  if ((lockInfo.ownerName == processName) && (lockInfo.ownerPID == processID))
 | 
						|
  {
 | 
						|
    std::set<uint64_t>::const_iterator it = fActiveClearTableLockCmds.find(tableLockID);
 | 
						|
 | 
						|
    if (it != fActiveClearTableLockCmds.end())
 | 
						|
    {
 | 
						|
      throw std::runtime_error(
 | 
						|
          std::string("Lock in use.  "
 | 
						|
                      "DML is executing another cleartablelock MySQL cmd."));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  else
 | 
						|
  {
 | 
						|
    // Take over ownership of stale lock.
 | 
						|
    // Use "DMLProc clearTableLock" as process name to differentiate
 | 
						|
    // from a DMLProc lock used for inserts, updates, etc.
 | 
						|
    int32_t sessionID = fSessionID;
 | 
						|
    int32_t txnid = -1;
 | 
						|
    bool ownerChanged = fDbrm->changeOwner(tableLockID, processName, processID, sessionID, txnid);
 | 
						|
 | 
						|
    if (!ownerChanged)
 | 
						|
    {
 | 
						|
      throw std::runtime_error(std::string("Unable to grab lock; lock not found or still in use."));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  // Add this cleartablelock command to the list of active cleartablelock cmds
 | 
						|
  fActiveClearTableLockCmds.insert(tableLockID);
 | 
						|
}
 | 
						|
 | 
						|
void CommandPackageProcessor::analyzePartitionBloat(const dmlpackage::CalpontDMLPackage& cpackage,
 | 
						|
                                                    DMLPackageProcessor::DMLResult& result)
 | 
						|
{
 | 
						|
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
      CalpontSystemCatalog::makeCalpontSystemCatalog(fSessionID);
 | 
						|
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
 | 
						|
  CalpontSystemCatalog::TableName tableName;
 | 
						|
  tableName.schema = cpackage.get_SchemaName();
 | 
						|
  tableName.table = cpackage.get_TableName();
 | 
						|
  std::string partitionStr = cpackage.get_SQLStatement();
 | 
						|
 | 
						|
  std::ostringstream analysisResults;
 | 
						|
  bool bErrFlag = false;
 | 
						|
  std::string errorMsg;
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    // Get AUX column OID for the table
 | 
						|
    CalpontSystemCatalog::OID auxColumnOid = systemCatalogPtr->tableAUXColumnOID(tableName);
 | 
						|
 | 
						|
    if (auxColumnOid <= 3000)
 | 
						|
    {
 | 
						|
      analysisResults << "Table " << tableName.toString()
 | 
						|
                      << " does not have an AUX column for bloat analysis.";
 | 
						|
      result.bloatAnalysis = analysisResults.str();
 | 
						|
      return;
 | 
						|
    }
 | 
						|
 | 
						|
    // SELECT COUNT(aux) AS count_aux FROM schema.table WHERE idbPartition(aux) = partitionStr;
 | 
						|
    CalpontSelectExecutionPlan csep;
 | 
						|
    CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
 | 
						|
    CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
 | 
						|
    CalpontSelectExecutionPlan::ColumnMap colMap;
 | 
						|
 | 
						|
    // Create the base SimpleColumn for 'aux'
 | 
						|
    SimpleColumn* auxCol = new SimpleColumn(tableName.schema, tableName.table, "aux", fSessionID);
 | 
						|
    auxCol->alias("aux");
 | 
						|
    CalpontSystemCatalog::ColType auxColType;
 | 
						|
    auxColType.colDataType = CalpontSystemCatalog::INT;
 | 
						|
    auxColType.colWidth = 4;
 | 
						|
    auxCol->resultType(auxColType);
 | 
						|
 | 
						|
    // Create the COUNT(aux) AS count_aux aggregate column
 | 
						|
    AggregateColumn* countAuxCol = new AggregateColumn(fSessionID);
 | 
						|
    countAuxCol->alias("count_aux");
 | 
						|
    countAuxCol->aggOp(AggregateColumn::COUNT);
 | 
						|
    countAuxCol->functionName("count");
 | 
						|
    countAuxCol->expressionId(1);
 | 
						|
    CalpontSystemCatalog::ColType countAuxColType;
 | 
						|
    countAuxColType.colDataType = CalpontSystemCatalog::INT;
 | 
						|
    countAuxColType.colWidth = 4;
 | 
						|
    countAuxCol->resultType(countAuxColType);
 | 
						|
 | 
						|
    SRCP auxSRCP(auxCol->clone());
 | 
						|
    countAuxCol->aggParms().push_back(auxSRCP);
 | 
						|
 | 
						|
    // Add the base 'aux' column to ColumnMap (used for reference resolution)
 | 
						|
    // Note: The aggregate result "count_aux" does NOT go in ColumnMap
 | 
						|
    // Add "aux" twice since it's referenced in both COUNT(aux) and idbPartition(aux)
 | 
						|
    colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP));
 | 
						|
    auxSRCP.reset(auxCol->clone());
 | 
						|
    colMap.insert(CMVT_(tableName.schema + "." + tableName.table + "." + "aux", auxSRCP));
 | 
						|
 | 
						|
    // Add the COUNT column to ReturnedColumnList (what gets returned by SELECT)
 | 
						|
    SRCP countSRCP(countAuxCol->clone());
 | 
						|
    returnedColumnList.push_back(countSRCP);
 | 
						|
 | 
						|
    csep.columnMapNonStatic(colMap);
 | 
						|
    csep.returnedCols(returnedColumnList);
 | 
						|
 | 
						|
    // Define the filter using FunctionColumn for idbPartition()
 | 
						|
    const SOP opeq(new Operator("="));
 | 
						|
 | 
						|
    // Create a FunctionColumn for idbPartition(aux)
 | 
						|
    // parms: psueducolumn dbroot, segmentdir, segment
 | 
						|
    SPTP sptp;
 | 
						|
    FunctionColumn* fc = new FunctionColumn();
 | 
						|
    fc->functionName("idbpartition");
 | 
						|
    fc->sessionID(fSessionID);
 | 
						|
    fc->expressionId(0);
 | 
						|
    funcexp::FunctionParm parms;
 | 
						|
    PseudoColumn* dbroot = new PseudoColumn(*auxCol, PSEUDO_DBROOT, fSessionID);
 | 
						|
    sptp.reset(new ParseTree(dbroot));
 | 
						|
    parms.push_back(sptp);
 | 
						|
 | 
						|
    PseudoColumn* pp = new PseudoColumn(*auxCol, PSEUDO_SEGMENTDIR, fSessionID);
 | 
						|
    sptp.reset(new ParseTree(pp));
 | 
						|
    parms.push_back(sptp);
 | 
						|
 | 
						|
    PseudoColumn* seg = new PseudoColumn(*auxCol, PSEUDO_SEGMENT, fSessionID);
 | 
						|
    sptp.reset(new ParseTree(seg));
 | 
						|
    parms.push_back(sptp);
 | 
						|
 | 
						|
    fc->functionParms(parms);
 | 
						|
 | 
						|
    CalpontSystemCatalog::ColType resultType;
 | 
						|
    resultType.colDataType = CalpontSystemCatalog::VARCHAR;
 | 
						|
    resultType.colWidth = 256;
 | 
						|
    fc->resultType(resultType);
 | 
						|
 | 
						|
    funcexp::Func_idbpartition* idbpartition = new funcexp::Func_idbpartition();
 | 
						|
    fc->operationType(idbpartition->operationType(parms, fc->resultType()));
 | 
						|
    delete idbpartition;
 | 
						|
 | 
						|
    // Set up the filter
 | 
						|
    ConstantColumn* partitionConstCol = new ConstantColumn(partitionStr, ConstantColumn::LITERAL);
 | 
						|
    SimpleFilter* f1 = new SimpleFilter(opeq, fc, partitionConstCol);
 | 
						|
    filterTokenList.push_back(f1);
 | 
						|
    csep.filterTokenList(filterTokenList);
 | 
						|
 | 
						|
 | 
						|
    // Set the session ID, transaction ID and version Id
 | 
						|
    BRM::QueryContext verID;
 | 
						|
    verID = fSessionManager.verID();
 | 
						|
    csep.verID(verID);
 | 
						|
    csep.sessionID(fSessionID);
 | 
						|
    BRM::TxnID txnID;
 | 
						|
    txnID = fSessionManager.getTxnID(fSessionID);
 | 
						|
    csep.txnID(txnID.id);
 | 
						|
 | 
						|
    // Set the table list
 | 
						|
    CalpontSelectExecutionPlan::TableList tablelist;
 | 
						|
    tablelist.push_back(make_aliastable(tableName.schema, tableName.table, ""));
 | 
						|
    csep.tableList(tablelist);
 | 
						|
    csep.schemaName(tableName.schema, 0);
 | 
						|
    csep.tableName(tableName.table, 0);
 | 
						|
 | 
						|
    // Send CSEP to ExeMgr
 | 
						|
    auto csepStr = csep.toString();
 | 
						|
    cout << "csep: " << csepStr << endl;
 | 
						|
    CalpontSystemCatalog::NJLSysDataList sysDataList;
 | 
						|
    systemCatalogPtr->getQueryData(csep, sysDataList);
 | 
						|
 | 
						|
    cout << "Done getSysData" << endl;
 | 
						|
 | 
						|
    cout << "result size: " << sysDataList.sysDataVec.size() << endl;
 | 
						|
 | 
						|
    // parse the result
 | 
						|
    for (auto it = sysDataList.begin(); it != sysDataList.end(); it++)
 | 
						|
    {
 | 
						|
      cout << "result: " << (*it)->GetData(0) << endl;
 | 
						|
    }
 | 
						|
 | 
						|
    // Return the result - use toString() to get the full plan representation
 | 
						|
    analysisResults << sysDataList.sysDataVec.front()->GetData(0);
 | 
						|
 | 
						|
    result.bloatAnalysis = analysisResults.str();
 | 
						|
    cout << "analysisResults: " << analysisResults.str() << endl;
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    bErrFlag = true;
 | 
						|
    errorMsg = ex.what();
 | 
						|
  }
 | 
						|
 | 
						|
  if (bErrFlag)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Partition bloat analysis failed for table " << tableName.toString() << ", partition "
 | 
						|
        << partitionStr << ": " << errorMsg;
 | 
						|
    result.bloatAnalysis = oss.str();
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace dmlpackageprocessor
 |