You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			944 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			944 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/***********************************************************************
 | 
						|
 *   $Id: dmlpackageprocessor.cpp 9673 2013-07-09 15:59:49Z chao $
 | 
						|
 *
 | 
						|
 *
 | 
						|
 ***********************************************************************/
 | 
						|
#include "dmlpackageprocessor.h"
 | 
						|
 | 
						|
#include <math.h>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include <boost/algorithm/string/case_conv.hpp>
 | 
						|
using namespace boost::algorithm;
 | 
						|
#include <boost/tokenizer.hpp>
 | 
						|
#include <boost/shared_ptr.hpp>
 | 
						|
 | 
						|
#include "we_messages.h"
 | 
						|
using namespace WriteEngine;
 | 
						|
using namespace dmlpackage;
 | 
						|
#include "calpontselectexecutionplan.h"
 | 
						|
#include "simplecolumn.h"
 | 
						|
#include "constantcolumn.h"
 | 
						|
#include "simplefilter.h"
 | 
						|
#include "constantfilter.h"
 | 
						|
#include "columnresult.h"
 | 
						|
using namespace execplan;
 | 
						|
using namespace logging;
 | 
						|
#include "configcpp.h"
 | 
						|
using namespace config;
 | 
						|
#include "joblistfactory.h"
 | 
						|
#include "joblist.h"
 | 
						|
#include "distributedenginecomm.h"
 | 
						|
using namespace joblist;
 | 
						|
#include "bytestream.h"
 | 
						|
#include "messagequeue.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
#include "tablelockdata.h"
 | 
						|
#include "exceptclasses.h"
 | 
						|
 | 
						|
namespace
 | 
						|
{
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
const SOP opeq(new Operator("="));
 | 
						|
const SOP opne(new Operator("<>"));
 | 
						|
const SOP opor(new Operator("or"));
 | 
						|
const SOP opand(new Operator("and"));
 | 
						|
}
 | 
						|
 | 
						|
namespace dmlpackageprocessor
 | 
						|
{
 | 
						|
 | 
						|
DMLPackageProcessor::~DMLPackageProcessor()
 | 
						|
{
 | 
						|
    //cout << "In DMLPackageProcessor destructor " << this << endl;
 | 
						|
    if (fWEClient)
 | 
						|
        delete fWEClient;
 | 
						|
 | 
						|
    if (fExeMgr)
 | 
						|
        delete fExeMgr;
 | 
						|
}
 | 
						|
 | 
						|
//@bug 397
 | 
						|
void DMLPackageProcessor::cleanString(string& s)
 | 
						|
{
 | 
						|
    string::size_type pos = s.find_first_not_of(" ");
 | 
						|
 | 
						|
    //stripe off space and ' or '' at beginning and end
 | 
						|
    if ( pos < s.length() )
 | 
						|
    {
 | 
						|
        s = s.substr( pos, s.length() - pos );
 | 
						|
 | 
						|
        if ( (pos = s.find_last_of(" ")) < s.length())
 | 
						|
        {
 | 
						|
            s = s.substr(0, pos );
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if  ( s[0] == '\'')
 | 
						|
    {
 | 
						|
        s = s.substr(1, s.length() - 2);
 | 
						|
 | 
						|
        if  ( s[0] == '\'')
 | 
						|
            s = s.substr(1, s.length() - 2);
 | 
						|
    }
 | 
						|
}
 | 
						|
#if 0
 | 
						|
boost::any DMLPackageProcessor::tokenizeData( execplan::CalpontSystemCatalog::SCN txnID,
 | 
						|
        execplan::CalpontSystemCatalog::ColType colType,
 | 
						|
        const std::string& data, DMLResult& result, bool isNULL )
 | 
						|
{
 | 
						|
    SUMMARY_INFO("DMLPackageProcessor::tokenizeData");
 | 
						|
 | 
						|
    bool retval = true;
 | 
						|
    boost::any value;
 | 
						|
 | 
						|
    if (isNULL)
 | 
						|
    {
 | 
						|
        WriteEngine::Token nullToken;
 | 
						|
        value = nullToken;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        if ( data.length() > (unsigned int)colType.colWidth )
 | 
						|
        {
 | 
						|
            retval = false;
 | 
						|
            // build the logging message
 | 
						|
            logging::Message::Args args;
 | 
						|
            logging::Message message(6);
 | 
						|
            args.add("Insert value is too large for colum ");
 | 
						|
            message.format( args );
 | 
						|
 | 
						|
            result.result = INSERT_ERROR;
 | 
						|
            result.message = message;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            //Tokenize the data value
 | 
						|
            WriteEngine::DctnryStruct dictStruct;
 | 
						|
            dictStruct.dctnryOid = colType.ddn.dictOID;
 | 
						|
            //cout << "Dictionary OIDs: " << colType.ddn.treeOID << " " << colType.ddn.listOID << endl;
 | 
						|
            WriteEngine::DctnryTuple  dictTuple;
 | 
						|
            dictTuple.sigValue = data.c_str();
 | 
						|
            dictTuple.sigSize = data.length();
 | 
						|
            int error = NO_ERROR;
 | 
						|
 | 
						|
            if ( NO_ERROR != (error = fWriteEngine.tokenize( txnID, dictStruct, dictTuple)) )
 | 
						|
            {
 | 
						|
                retval = false;
 | 
						|
                //cout << "Error code from WE: " << error << endl;
 | 
						|
                // build the logging message
 | 
						|
                logging::Message::Args args;
 | 
						|
                logging::Message message(1);
 | 
						|
                args.add("Tokenization failed on: ");
 | 
						|
                args.add(data);
 | 
						|
                args.add("error number: ");
 | 
						|
                args.add( error );
 | 
						|
                message.format( args );
 | 
						|
 | 
						|
                result.result = TOKEN_ERROR;
 | 
						|
                result.message = message;
 | 
						|
            }
 | 
						|
 | 
						|
            WriteEngine::Token aToken = dictTuple.token;
 | 
						|
            value = aToken;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return value;
 | 
						|
}
 | 
						|
#endif
 | 
						|
void DMLPackageProcessor::getColumnsForTable(uint32_t sessionID, std::string schema,
 | 
						|
        std::string table, dmlpackage::ColumnList& colList)
 | 
						|
{
 | 
						|
 | 
						|
    CalpontSystemCatalog::TableName tableName;
 | 
						|
    tableName.schema = schema;
 | 
						|
    tableName.table = table;
 | 
						|
 | 
						|
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog( sessionID );
 | 
						|
    CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(tableName, true);
 | 
						|
 | 
						|
    CalpontSystemCatalog::RIDList::const_iterator rid_iterator = ridList.begin();
 | 
						|
 | 
						|
    while (rid_iterator != ridList.end())
 | 
						|
    {
 | 
						|
        CalpontSystemCatalog::ROPair roPair = *rid_iterator;
 | 
						|
        DMLColumn* columnPtr = new DMLColumn();
 | 
						|
        CalpontSystemCatalog::TableColName tblColName = systemCatalogPtr->colName( roPair.objnum );
 | 
						|
        columnPtr->set_Name(tblColName.column);
 | 
						|
 | 
						|
        colList.push_back(columnPtr);
 | 
						|
 | 
						|
        ++rid_iterator;
 | 
						|
    }
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
char* DMLPackageProcessor::strlower(char* in)
 | 
						|
{
 | 
						|
    char* p = in;
 | 
						|
 | 
						|
    if (p)
 | 
						|
    {
 | 
						|
        while (*p)
 | 
						|
        {
 | 
						|
            *p = tolower(*p);
 | 
						|
            p++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return in;
 | 
						|
}
 | 
						|
 | 
						|
void DMLPackageProcessor::convertRidToColumn(uint64_t& rid, unsigned& dbRoot, unsigned& partition,
 | 
						|
        unsigned& segment, unsigned filesPerColumnPartition, unsigned  extentsPerSegmentFile, unsigned extentRows,
 | 
						|
        unsigned startDBRoot, unsigned dbrootCnt, const unsigned startPartitionNum)
 | 
						|
{
 | 
						|
    partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);
 | 
						|
 | 
						|
    segment = (((rid % ( filesPerColumnPartition * extentsPerSegmentFile * extentRows)) /
 | 
						|
                extentRows)) % filesPerColumnPartition;
 | 
						|
 | 
						|
    dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;
 | 
						|
 | 
						|
    //Calculate the relative rid for this segment file
 | 
						|
    uint64_t relRidInPartition = rid - ((uint64_t)partition * (uint64_t)filesPerColumnPartition *
 | 
						|
                                        (uint64_t)extentsPerSegmentFile * (uint64_t)extentRows);
 | 
						|
    idbassert(relRidInPartition <= (uint64_t)filesPerColumnPartition * (uint64_t)extentsPerSegmentFile *
 | 
						|
              (uint64_t)extentRows);
 | 
						|
    uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
 | 
						|
    unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
 | 
						|
    uint64_t relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
 | 
						|
    rid = relRidInThisExtent +  numExtentsInThisSegPart * extentRows;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
string DMLPackageProcessor::projectTableErrCodeToMsg(uint32_t ec)
 | 
						|
{
 | 
						|
    if (ec < 1000) // pre IDB error code
 | 
						|
    {
 | 
						|
        ErrorCodes ecObj;
 | 
						|
        string errMsg("Statement failed.");
 | 
						|
        errMsg += ecObj.errorString(ec).substr(150); // substr removes ErrorCodes::fPreamble
 | 
						|
        return errMsg;
 | 
						|
    }
 | 
						|
 | 
						|
    // IDB error
 | 
						|
    return IDBErrorInfo::instance()->errorMsg(ec);
 | 
						|
}
 | 
						|
 | 
						|
bool DMLPackageProcessor::validateVarbinaryVal( std::string& inStr)
 | 
						|
{
 | 
						|
    bool invalid = false;
 | 
						|
 | 
						|
    for (unsigned i = 0; i < inStr.length(); i++)
 | 
						|
    {
 | 
						|
        if (!isxdigit(inStr[i]))
 | 
						|
        {
 | 
						|
            invalid = true;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
 | 
						|
    }
 | 
						|
 | 
						|
    return invalid;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::commitTransaction(uint64_t uniqueId, BRM::TxnID txnID)
 | 
						|
{
 | 
						|
    int rc = fDbrm->vbCommit(txnID.id);
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::rollBackTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
 | 
						|
        std::string& errorMsg)
 | 
						|
{
 | 
						|
    std::vector<BRM::LBID_t> lbidList;
 | 
						|
    std::vector<BRM::LBIDRange> lbidRangeList;
 | 
						|
    BRM::LBIDRange   range;
 | 
						|
    int rc = 0;
 | 
						|
    //Check BRM status before processing.
 | 
						|
    rc = fDbrm->isReadWrite();
 | 
						|
 | 
						|
    if (rc != 0 )
 | 
						|
    {
 | 
						|
        std::string brmMsg;
 | 
						|
        errorMsg = "Can't read DBRM isReadWrite [ ";
 | 
						|
        BRM::errString(rc, brmMsg);
 | 
						|
        errorMsg += brmMsg;
 | 
						|
        errorMsg += "]";
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    ByteStream bytestream;
 | 
						|
    fWEClient->addQueue(uniqueId);
 | 
						|
    //cout << "adding to queue with id " << uniqueId << endl;
 | 
						|
    bytestream << (ByteStream::byte) WE_SVR_ROLLBACK_BLOCKS;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << sessionID;
 | 
						|
    bytestream << (uint32_t)txnID.id;
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        fWEClient->write_to_all(bytestream);
 | 
						|
        boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
        bsIn.reset(new ByteStream());
 | 
						|
        ByteStream::byte tmp8;
 | 
						|
 | 
						|
        while (1)
 | 
						|
        {
 | 
						|
            if (msgRecived == fWEClient->getPmCount())
 | 
						|
                break;
 | 
						|
 | 
						|
            fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
            if ( bsIn->length() == 0 ) //read error
 | 
						|
            {
 | 
						|
                rc = NETWORK_ERROR;
 | 
						|
                errorMsg = "Network error reading WEClient";
 | 
						|
                fWEClient->removeQueue(uniqueId);
 | 
						|
                //cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                *bsIn >> tmp8;
 | 
						|
                rc = tmp8;
 | 
						|
 | 
						|
                if (rc != 0)
 | 
						|
                {
 | 
						|
                    char szrc[20];
 | 
						|
                    *bsIn >> errorMsg;
 | 
						|
                    errorMsg += " (WriteEngine returns error ";
 | 
						|
                    sprintf(szrc, "%d", rc);
 | 
						|
                    errorMsg += szrc;
 | 
						|
                    errorMsg += ")";
 | 
						|
                    fWEClient->removeQueue(uniqueId);
 | 
						|
                    cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
                    break;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                    msgRecived++;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
    catch (std::exception& e)
 | 
						|
    {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Network error occured when rolling back blocks";
 | 
						|
        errorMsg += e.what();
 | 
						|
        fWEClient->removeQueue(uniqueId);
 | 
						|
        cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
        //delete fWEClient;
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
    catch ( ... )
 | 
						|
    {
 | 
						|
        rc = NETWORK_ERROR;
 | 
						|
        errorMsg = "Unknown exception caught while rolling back transaction.";
 | 
						|
        fWEClient->removeQueue(uniqueId);
 | 
						|
        cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
        //delete fWEClient;
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    if (rc != 0)
 | 
						|
    {
 | 
						|
        //delete fWEClient;
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    //delete fWEClient;
 | 
						|
//	cout << "success. remove queue id " << uniqueId << endl;
 | 
						|
    rc = fDbrm->getUncommittedLBIDs(txnID.id, lbidList);
 | 
						|
 | 
						|
    if (rc != 0 )
 | 
						|
    {
 | 
						|
        std::string brmMsg;
 | 
						|
        errorMsg = "DBRM getUncommittedLBIDs [ ";
 | 
						|
        BRM::errString(rc, brmMsg);
 | 
						|
        errorMsg += brmMsg;
 | 
						|
        errorMsg += "]";
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    for (size_t i = 0; i < lbidList.size(); i++)
 | 
						|
    {
 | 
						|
        range.start = lbidList[i];
 | 
						|
        range.size = 1;
 | 
						|
        lbidRangeList.push_back(range);
 | 
						|
    }
 | 
						|
 | 
						|
    rc =  fDbrm->vbRollback(txnID.id, lbidRangeList);
 | 
						|
 | 
						|
    if (rc != 0 )
 | 
						|
    {
 | 
						|
        std::string brmMsg;
 | 
						|
        errorMsg = "DBRM vbRollback [ ";
 | 
						|
        BRM::errString(rc, brmMsg);
 | 
						|
        errorMsg += brmMsg;
 | 
						|
        errorMsg += "]";
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::commitBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, const uint32_t tableOid, std::string& errorMsg)
 | 
						|
{
 | 
						|
    //collect hwm info from all pms and set them here. remove table metadata if all successful
 | 
						|
    ByteStream bytestream;
 | 
						|
    fWEClient->addQueue(uniqueId);
 | 
						|
    bytestream << (ByteStream::byte)WE_SVR_COMMIT_BATCH_AUTO_ON;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << (uint32_t) txnID.id;
 | 
						|
    bytestream << tableOid;
 | 
						|
    bytestream << fSessionID;
 | 
						|
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    int rc = 0;
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
    typedef std::vector<BRM::BulkSetHWMArg> BulkSetHWMArgs;
 | 
						|
    std::vector<BulkSetHWMArgs> hwmArgsAllPms;
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
        if (msgRecived == fWEClient->getPmCount())
 | 
						|
            break;
 | 
						|
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if ( bsIn->length() == 0 ) //read error
 | 
						|
        {
 | 
						|
            rc = NETWORK_ERROR;
 | 
						|
            fWEClient->removeQueue(uniqueId);
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *bsIn >> tmp8;
 | 
						|
            rc = tmp8;
 | 
						|
 | 
						|
            if (rc != 0)
 | 
						|
            {
 | 
						|
                *bsIn >> errorMsg;
 | 
						|
                fWEClient->removeQueue(uniqueId);
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                //get hwm info
 | 
						|
                *bsIn >> errorMsg;
 | 
						|
                BulkSetHWMArgs setHWMArgs;
 | 
						|
                //cout << "received from WES bytestream length = " <<  bsIn->length() << endl;
 | 
						|
                deserializeInlineVector(*(bsIn.get()), setHWMArgs);
 | 
						|
                //cout << "get hwm info from WES size " << setHWMArgs.size() << endl;
 | 
						|
                hwmArgsAllPms.push_back(setHWMArgs);
 | 
						|
                msgRecived++;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if (rc != 0)
 | 
						|
        return rc;
 | 
						|
 | 
						|
    //set hwm
 | 
						|
    std::vector<BRM::BulkSetHWMArg> allHwm;
 | 
						|
    BulkSetHWMArgs::const_iterator itor;
 | 
						|
 | 
						|
    //cout << "total hwmArgsAllPms size " << hwmArgsAllPms.size() << endl;
 | 
						|
    for (unsigned i = 0; i  < fWEClient->getPmCount(); i++)
 | 
						|
    {
 | 
						|
        itor = hwmArgsAllPms[i].begin();
 | 
						|
 | 
						|
        while (itor != hwmArgsAllPms[i].end())
 | 
						|
        {
 | 
						|
            allHwm.push_back(*itor);
 | 
						|
            //cout << "received hwm info: " <<  itor->oid << ":" << itor->hwm << endl;
 | 
						|
            itor++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    //set CP data before hwm.
 | 
						|
 | 
						|
    //cout << "setting hwm allHwm size " << allHwm.size() << endl;
 | 
						|
    vector<BRM::LBID_t> lbidList;
 | 
						|
 | 
						|
    if (idbdatafile::IDBPolicy::useHdfs())
 | 
						|
    {
 | 
						|
        BRM::LBID_t startLbid;
 | 
						|
 | 
						|
        for ( unsigned i = 0; i < allHwm.size(); i++)
 | 
						|
        {
 | 
						|
            rc = fDbrm->lookupLocalStartLbid(allHwm[i].oid, allHwm[i].partNum, allHwm[i].segNum, allHwm[i].hwm, startLbid);
 | 
						|
            lbidList.push_back(startLbid);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
        fDbrm->getUncommittedExtentLBIDs(static_cast<BRM::VER_t>(txnID.id), lbidList);
 | 
						|
 | 
						|
    vector<BRM::LBID_t>::const_iterator iter = lbidList.begin();
 | 
						|
    vector<BRM::LBID_t>::const_iterator end = lbidList.end();
 | 
						|
    BRM::CPInfoList_t cpInfos;
 | 
						|
    BRM::CPInfo aInfo;
 | 
						|
 | 
						|
    while (iter != end)
 | 
						|
    {
 | 
						|
        aInfo.firstLbid = *iter;
 | 
						|
        aInfo.max = numeric_limits<int64_t>::min(); // Not used
 | 
						|
        aInfo.min = numeric_limits<int64_t>::max(); // Not used
 | 
						|
        aInfo.seqNum = -1;
 | 
						|
        cpInfos.push_back(aInfo);
 | 
						|
        ++iter;
 | 
						|
    }
 | 
						|
 | 
						|
    std::vector<BRM::CPInfoMerge>  mergeCPDataArgs;
 | 
						|
    rc = fDbrm->bulkSetHWMAndCP(allHwm, cpInfos, mergeCPDataArgs, txnID.id);
 | 
						|
    fDbrm->takeSnapshot();
 | 
						|
    //Set tablelock to rollforward remove meta files
 | 
						|
 | 
						|
    if (rc != 0)
 | 
						|
        return rc;
 | 
						|
 | 
						|
    bool stateChanged = true;
 | 
						|
    TablelockData* tablelockData = TablelockData::makeTablelockData(fSessionID);
 | 
						|
    uint64_t tablelockId = tablelockData->getTablelockId(tableOid);
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        stateChanged = fDbrm->changeState(tablelockId, BRM::CLEANUP);
 | 
						|
    }
 | 
						|
    catch (std::exception&)
 | 
						|
    {
 | 
						|
        errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | 
						|
        stateChanged = false;
 | 
						|
    }
 | 
						|
 | 
						|
    if (!stateChanged)
 | 
						|
        return rc;
 | 
						|
 | 
						|
    bytestream.restart();
 | 
						|
    //@Bug 4517 Remove meta data failure doesn't stop tablelock releasing.
 | 
						|
    bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << tableOid;
 | 
						|
    msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
        if (msgRecived == fWEClient->getPmCount())
 | 
						|
            break;
 | 
						|
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if ( bsIn->length() == 0 ) //read error
 | 
						|
        {
 | 
						|
            fWEClient->removeQueue(uniqueId);
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *bsIn >> tmp8;
 | 
						|
            msgRecived++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return rc;
 | 
						|
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::rollBackBatchAutoOnTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
 | 
						|
        const uint32_t tableOid, std::string& errorMsg)
 | 
						|
{
 | 
						|
    //Bulkrollback, rollback blocks, vbrollback, change state, remove meta file
 | 
						|
    //cout << "In rollBackBatchAutoOnTransaction" << endl;
 | 
						|
    std::vector<BRM::TableLockInfo> tableLocks;
 | 
						|
    tableLocks = fDbrm->getAllTableLocks();
 | 
						|
    //cout << " Got all tablelocks" << endl;
 | 
						|
    unsigned idx = 0;
 | 
						|
    string ownerName ("DMLProc batchinsert");
 | 
						|
    uint64_t tableLockId = 0;
 | 
						|
    int rc = 0;
 | 
						|
 | 
						|
    for (; idx < tableLocks.size(); idx++)
 | 
						|
    {
 | 
						|
        if ((tableLocks[idx].ownerName == ownerName) && (tableLocks[idx].tableOID == tableOid))
 | 
						|
        {
 | 
						|
            tableLockId = tableLocks[idx].id;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if ((tableLockId == 0) || (tableOid == 0))
 | 
						|
    {
 | 
						|
        // table is not locked by DMLProc. Could happen if we failed to get lock
 | 
						|
        // while inserting. Not an error during rollback, but we don't
 | 
						|
        // want to do anything.
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    //cout << "sending to WES" << endl;
 | 
						|
    ByteStream bytestream;
 | 
						|
    fWEClient->addQueue(uniqueId);
 | 
						|
    //cout << "adding queue id " << uniqueId << endl;
 | 
						|
    bytestream << (ByteStream::byte) WE_SVR_ROLLBACK_BATCH_AUTO_ON;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << sessionID;
 | 
						|
    bytestream << tableLockId;
 | 
						|
    bytestream << tableOid;
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
 | 
						|
    //cout << "waiting for reply from WES" << endl;
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
        if (msgRecived == fWEClient->getPmCount())
 | 
						|
            break;
 | 
						|
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if ( bsIn->length() == 0 ) //read error
 | 
						|
        {
 | 
						|
            rc = NETWORK_ERROR;
 | 
						|
            fWEClient->removeQueue(uniqueId);
 | 
						|
            //cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *bsIn >> tmp8;
 | 
						|
            rc = tmp8;
 | 
						|
 | 
						|
            if (rc != 0)
 | 
						|
            {
 | 
						|
                *bsIn >> errorMsg;
 | 
						|
                fWEClient->removeQueue(uniqueId);
 | 
						|
                //cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            else
 | 
						|
                msgRecived++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if (rc == 0) //change table lock state
 | 
						|
    {
 | 
						|
        bool stateChanged = true;
 | 
						|
 | 
						|
        //cout << "changing tablelock state" << endl;
 | 
						|
        try
 | 
						|
        {
 | 
						|
            stateChanged = fDbrm->changeState(tableLockId, BRM::CLEANUP);
 | 
						|
        }
 | 
						|
        catch (std::exception&)
 | 
						|
        {
 | 
						|
            errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | 
						|
            stateChanged = false;
 | 
						|
        }
 | 
						|
 | 
						|
        if (!stateChanged)
 | 
						|
        {
 | 
						|
            rc = 1;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if ( rc != 0 )
 | 
						|
        return rc;
 | 
						|
 | 
						|
    bytestream.restart();
 | 
						|
    bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << tableOid;
 | 
						|
    msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
        if (msgRecived == fWEClient->getPmCount())
 | 
						|
            break;
 | 
						|
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if ( bsIn->length() == 0 ) //read error
 | 
						|
        {
 | 
						|
            fWEClient->removeQueue(uniqueId);
 | 
						|
            //cout << "erroring out remove queue id " << uniqueId << endl;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *bsIn >> tmp8;
 | 
						|
            msgRecived++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::commitBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID, const uint32_t tableOid, std::string& errorMsg)
 | 
						|
{
 | 
						|
    std::vector<BRM::TableLockInfo> tableLocks;
 | 
						|
    tableLocks = fDbrm->getAllTableLocks();
 | 
						|
    //cout << " Got all tablelocks" << endl;
 | 
						|
    unsigned idx = 0;
 | 
						|
    string ownerName ("DMLProc batchinsert");
 | 
						|
    uint64_t tableLockId = 0;
 | 
						|
    int rc = 0;
 | 
						|
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
 | 
						|
    for (; idx < tableLocks.size(); idx++)
 | 
						|
    {
 | 
						|
        if ((tableLocks[idx].ownerName == ownerName) && (tableLocks[idx].tableOID == tableOid))
 | 
						|
        {
 | 
						|
            tableLockId = tableLocks[idx].id;
 | 
						|
            break;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if ((tableLockId == 0) || (tableOid == 0))
 | 
						|
    {
 | 
						|
        // table is not locked by DMLProc. Could happen if we failed to get lock
 | 
						|
        // while inserting. Not an error during rollback, but we don't
 | 
						|
        // want to do anything.
 | 
						|
        return rc;
 | 
						|
    }
 | 
						|
 | 
						|
    bool stateChanged = true;
 | 
						|
 | 
						|
    //cout << "changing tablelock state" << endl;
 | 
						|
    try
 | 
						|
    {
 | 
						|
        stateChanged = fDbrm->changeState(tableLockId, BRM::CLEANUP);
 | 
						|
    }
 | 
						|
    catch (std::exception&)
 | 
						|
    {
 | 
						|
        errorMsg = IDBErrorInfo::instance()->errorMsg(ERR_HARD_FAILURE);
 | 
						|
        stateChanged = false;
 | 
						|
    }
 | 
						|
 | 
						|
    if (!stateChanged)
 | 
						|
    {
 | 
						|
        rc = 1;
 | 
						|
    }
 | 
						|
 | 
						|
    if ( rc != 0 )
 | 
						|
        return rc;
 | 
						|
 | 
						|
    ByteStream bytestream;
 | 
						|
    fWEClient->addQueue(uniqueId);
 | 
						|
    bytestream << (ByteStream::byte)WE_SVR_BATCH_AUTOON_REMOVE_META;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << tableOid;
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
        if (msgRecived == fWEClient->getPmCount())
 | 
						|
            break;
 | 
						|
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if ( bsIn->length() == 0 ) //read error
 | 
						|
        {
 | 
						|
            fWEClient->removeQueue(uniqueId);
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *bsIn >> tmp8;
 | 
						|
            msgRecived++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    fWEClient->removeQueue(uniqueId);
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::rollBackBatchAutoOffTransaction(uint64_t uniqueId, BRM::TxnID txnID, uint32_t sessionID,
 | 
						|
        const uint32_t tableOid, std::string& errorMsg)
 | 
						|
{
 | 
						|
    ByteStream bytestream;
 | 
						|
    fWEClient->addQueue(uniqueId);
 | 
						|
    bytestream << (ByteStream::byte) WE_SVR_ROLLBACK_BATCH_AUTO_OFF;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << sessionID;
 | 
						|
    bytestream << (uint32_t)txnID.id;
 | 
						|
    bytestream << tableOid;
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    int rc = 0;
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
 | 
						|
    while (1)
 | 
						|
    {
 | 
						|
        if (msgRecived == fWEClient->getPmCount())
 | 
						|
            break;
 | 
						|
 | 
						|
        fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
        if ( bsIn->length() == 0 ) //read error
 | 
						|
        {
 | 
						|
            rc = NETWORK_ERROR;
 | 
						|
            fWEClient->removeQueue(uniqueId);
 | 
						|
            break;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *bsIn >> tmp8;
 | 
						|
            rc = tmp8;
 | 
						|
 | 
						|
            if (rc != 0)
 | 
						|
            {
 | 
						|
                *bsIn >> errorMsg;
 | 
						|
                fWEClient->removeQueue(uniqueId);
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            else
 | 
						|
                msgRecived++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::flushDataFiles (int rcIn, std::map<FID, FID>& columnOids, uint64_t uniqueId, BRM::TxnID txnID, uint32_t tableOid)
 | 
						|
{
 | 
						|
//cout <<"in flushDataFiles" << endl;
 | 
						|
    ByteStream bytestream;
 | 
						|
    bytestream << (ByteStream::byte) WE_SVR_FLUSH_FILES;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << (uint32_t) rcIn;
 | 
						|
    bytestream << (uint32_t)txnID.id;
 | 
						|
    bytestream << tableOid;
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    int rc = 0;
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
    std::string errorMsg;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        while (1)
 | 
						|
        {
 | 
						|
            if (msgRecived == fWEClient->getPmCount())
 | 
						|
                break;
 | 
						|
 | 
						|
            fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
            if ( bsIn->length() == 0 ) //read error
 | 
						|
            {
 | 
						|
                rc = NETWORK_ERROR;
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                *bsIn >> tmp8;
 | 
						|
                rc = tmp8;
 | 
						|
 | 
						|
                if (rc != 0)
 | 
						|
                {
 | 
						|
                    *bsIn >> errorMsg;
 | 
						|
                    break;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                    msgRecived++;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
    catch (std::exception&)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
 | 
						|
int DMLPackageProcessor::endTransaction (uint64_t uniqueId, BRM::TxnID txnID, bool success)
 | 
						|
{
 | 
						|
//cout <<"in flushDataFiles" << endl;
 | 
						|
    ByteStream bytestream;
 | 
						|
    bytestream << (ByteStream::byte) WE_END_TRANSACTION;
 | 
						|
    bytestream << uniqueId;
 | 
						|
    bytestream << (uint32_t)txnID.id;
 | 
						|
    bytestream << (ByteStream::byte)success;
 | 
						|
    uint32_t msgRecived = 0;
 | 
						|
    fWEClient->write_to_all(bytestream);
 | 
						|
    boost::shared_ptr<messageqcpp::ByteStream> bsIn;
 | 
						|
    bsIn.reset(new ByteStream());
 | 
						|
    int rc = 0;
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
    std::string errorMsg;
 | 
						|
 | 
						|
    try
 | 
						|
    {
 | 
						|
        while (1)
 | 
						|
        {
 | 
						|
            if (msgRecived == fWEClient->getPmCount())
 | 
						|
                break;
 | 
						|
 | 
						|
            fWEClient->read(uniqueId, bsIn);
 | 
						|
 | 
						|
            if ( bsIn->length() == 0 ) //read error
 | 
						|
            {
 | 
						|
                rc = NETWORK_ERROR;
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                *bsIn >> tmp8;
 | 
						|
                rc = tmp8;
 | 
						|
 | 
						|
                if (rc != 0)
 | 
						|
                {
 | 
						|
                    *bsIn >> errorMsg;
 | 
						|
                    break;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                    msgRecived++;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
    catch (std::exception&)
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    return rc;
 | 
						|
}
 | 
						|
}
 | 
						|
// vim:ts=4 sw=4:
 |