mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-20 09:07:44 +03:00
417 lines
8.6 KiB
C++
417 lines
8.6 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
/*******************************************************************************
|
|
* $Id$
|
|
*
|
|
*******************************************************************************/
|
|
|
|
/*
|
|
* we_splclient.h
|
|
*
|
|
* Created on: Oct 20, 2011
|
|
* Author: bpaul
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "threadsafequeue.h"
|
|
#include "resourcemanager.h"
|
|
|
|
#include "we_messages.h"
|
|
#include "calpontsystemcatalog.h"
|
|
|
|
namespace WriteEngine
|
|
{
|
|
class WESplClient; // forward decleration
|
|
|
|
// Structure for holding the Out of Range data from the BRMReport
|
|
// This class is also used by we_sdhandler to hold the agregate info.
|
|
class WEColOORInfo // Column Out-Of-Range Info
|
|
{
|
|
public:
|
|
WEColOORInfo() : fColNum(0), fColType(execplan::CalpontSystemCatalog::INT), fNoOfOORs(0)
|
|
{
|
|
}
|
|
~WEColOORInfo()
|
|
{
|
|
}
|
|
|
|
public:
|
|
int fColNum;
|
|
execplan::CalpontSystemCatalog::ColDataType fColType;
|
|
std::string fColName;
|
|
int fNoOfOORs;
|
|
};
|
|
typedef std::vector<WEColOORInfo> WEColOorVec;
|
|
|
|
//------------------------------------------------------------------------------
|
|
class WESdHandlerException : public std::exception
|
|
{
|
|
public:
|
|
std::string fWhat;
|
|
WESdHandlerException(std::string& What) throw()
|
|
{
|
|
fWhat = What;
|
|
}
|
|
virtual ~WESdHandlerException() throw()
|
|
{
|
|
}
|
|
virtual const char* what() const throw()
|
|
{
|
|
return fWhat.c_str();
|
|
}
|
|
};
|
|
//------------------------------------------------------------------------------
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
class WESplClientRunner
|
|
{
|
|
public:
|
|
WESplClientRunner(WESplClient& Sc) : fOwner(Sc)
|
|
{ /* ctor */
|
|
}
|
|
virtual ~WESplClientRunner()
|
|
{ /* dtor */
|
|
}
|
|
void operator()();
|
|
|
|
public:
|
|
WESplClient& fOwner;
|
|
};
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
class WESplClient
|
|
{
|
|
public:
|
|
WESplClient(WESDHandler& Sdh, int PmId);
|
|
virtual ~WESplClient();
|
|
|
|
void setup();
|
|
void startClientThread();
|
|
void sendAndRecv();
|
|
void send();
|
|
void recv();
|
|
void write(const messageqcpp::ByteStream& Msg);
|
|
void read(messageqcpp::SBS& Sbs);
|
|
void add2SendQueue(const messageqcpp::SBS& Sbs);
|
|
void clearSendQueue();
|
|
int getSendQSize();
|
|
|
|
void printStats();
|
|
void onConnect();
|
|
void onDisconnect();
|
|
|
|
unsigned int getRowTx() const
|
|
{
|
|
return fRowTx;
|
|
}
|
|
uint32_t getBytesRcv() const
|
|
{
|
|
return fBytesRcv;
|
|
}
|
|
uint32_t getBytesTx()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fTxMutex);
|
|
return fBytesTx;
|
|
}
|
|
boost::thread* getFpThread() const
|
|
{
|
|
return fpThread;
|
|
}
|
|
time_t getLastInTime()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fLastInMutex);
|
|
return (fLastInTime > 0) ? fLastInTime : fStartTime; // BUG 4309
|
|
}
|
|
time_t getStartTime() const
|
|
{
|
|
return fStartTime;
|
|
}
|
|
time_t getElapsedTime()
|
|
{
|
|
return (getLastInTime() - getStartTime());
|
|
}
|
|
bool isCpiStarted() const
|
|
{
|
|
return fCpiStarted;
|
|
}
|
|
bool isCpiPassed() const
|
|
{
|
|
return fCpiPassed;
|
|
}
|
|
bool isCpiFailed() const
|
|
{
|
|
return fCpiFailed;
|
|
}
|
|
bool isBrmRptRcvd() const
|
|
{
|
|
return fBrmRptRcvd;
|
|
}
|
|
int getRollbackRslt() const
|
|
{
|
|
return fRollbackRslt;
|
|
}
|
|
int getCleanupRslt() const
|
|
{
|
|
return fCleanupRslt;
|
|
}
|
|
bool getSendFlag() const
|
|
{
|
|
return fSend;
|
|
}
|
|
unsigned int getPmId() const
|
|
{
|
|
return fPmId;
|
|
}
|
|
unsigned int getDbRootCnt() const
|
|
{
|
|
return fDbrCnt;
|
|
}
|
|
unsigned int getDbRootVar()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
return fDbrVar;
|
|
}
|
|
int getDataRqstCount()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
return fDataRqstCnt;
|
|
}
|
|
long getRdSecTo() const
|
|
{
|
|
return fRdSecTo;
|
|
}
|
|
bool isConnected() const
|
|
{
|
|
return fConnected;
|
|
}
|
|
bool isContinue() const
|
|
{
|
|
return fContinue;
|
|
}
|
|
const std::string& getServer() const
|
|
{
|
|
return fServer;
|
|
}
|
|
const std::string& getIpAddress() const
|
|
{
|
|
return fIpAddress;
|
|
}
|
|
void setBytesRcv(uint32_t BytesRcv)
|
|
{
|
|
fBytesRcv = BytesRcv;
|
|
}
|
|
void setBytesTx(uint32_t BytesTx)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fTxMutex);
|
|
fBytesTx = BytesTx;
|
|
aLock.unlock();
|
|
}
|
|
void updateBytesTx(uint32_t fBytes)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fTxMutex);
|
|
fBytesTx += fBytes;
|
|
aLock.unlock();
|
|
}
|
|
void setConnected(bool Connected)
|
|
{
|
|
fConnected = Connected;
|
|
}
|
|
void setContinue(bool Continue)
|
|
{
|
|
fContinue = Continue;
|
|
}
|
|
void setFpThread(boost::thread* pThread)
|
|
{
|
|
fpThread = pThread;
|
|
}
|
|
void setLastInTime(time_t LastInTime)
|
|
{
|
|
fLastInTime = LastInTime;
|
|
}
|
|
void setStartTime(time_t StartTime)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fLastInMutex);
|
|
fStartTime = StartTime;
|
|
aLock.lock();
|
|
}
|
|
void setSendFlag(bool Send)
|
|
{
|
|
fSend = Send;
|
|
}
|
|
void setCpiStarted(bool Start)
|
|
{
|
|
fCpiStarted = Start;
|
|
}
|
|
void setCpiPassed(bool Pass)
|
|
{
|
|
setLastInTime(time(0));
|
|
fCpiPassed = Pass;
|
|
}
|
|
void setCpiFailed(bool Fail)
|
|
{
|
|
setLastInTime(time(0));
|
|
fCpiFailed = Fail;
|
|
fRowsUploadInfo.fRowsRead = 0;
|
|
fRowsUploadInfo.fRowsInserted = 0;
|
|
}
|
|
void setBrmRptRcvd(bool Rcvd)
|
|
{
|
|
fBrmRptRcvd = Rcvd;
|
|
}
|
|
void setRollbackRslt(int Rslt)
|
|
{
|
|
fRollbackRslt = Rslt;
|
|
}
|
|
void setCleanupRslt(int Rslt)
|
|
{
|
|
fCleanupRslt = Rslt;
|
|
}
|
|
void setPmId(unsigned int PmId)
|
|
{
|
|
fPmId = PmId;
|
|
}
|
|
void setDbRootCnt(unsigned int DbrCnt)
|
|
{
|
|
fDbrCnt = DbrCnt;
|
|
}
|
|
void resetDbRootVar()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
fDbrVar = fDbrCnt;
|
|
aLock.unlock();
|
|
}
|
|
void decDbRootVar()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
|
|
if (fDbrVar > 0)
|
|
--fDbrVar;
|
|
|
|
aLock.unlock();
|
|
}
|
|
void setRdSecTo(long RdSecTo)
|
|
{
|
|
fRdSecTo = RdSecTo;
|
|
}
|
|
void setDataRqstCount(int DataRqstCnt)
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
fDataRqstCnt = DataRqstCnt;
|
|
aLock.unlock();
|
|
}
|
|
void decDataRqstCount()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
|
|
if (fDataRqstCnt > 0)
|
|
--fDataRqstCnt;
|
|
|
|
aLock.unlock();
|
|
}
|
|
void incDataRqstCount()
|
|
{
|
|
boost::mutex::scoped_lock aLock(fDataRqstMutex);
|
|
++fDataRqstCnt;
|
|
aLock.unlock();
|
|
}
|
|
void setServer(const std::string& Server)
|
|
{
|
|
fServer = Server;
|
|
}
|
|
void setIpAddress(const std::string& IpAddr)
|
|
{
|
|
fIpAddress = IpAddr;
|
|
}
|
|
void updateRowTx(unsigned int aCnt)
|
|
{
|
|
fRowTx += aCnt;
|
|
}
|
|
void resetRowTx()
|
|
{
|
|
fRowTx = 0;
|
|
}
|
|
|
|
private:
|
|
bool fContinue;
|
|
bool fConnected;
|
|
unsigned int fPmId;
|
|
unsigned int fDbrCnt;
|
|
unsigned int fDbrVar; // Var to keep track next PM to send.
|
|
int fDataRqstCnt; // Data request count
|
|
long fRdSecTo; // read timeout sec
|
|
unsigned int fRowTx; // No. Of Rows Transmitted
|
|
uint32_t fBytesTx;
|
|
uint32_t fBytesRcv;
|
|
time_t fLastInTime;
|
|
time_t fStartTime;
|
|
bool fSend;
|
|
bool fCpiStarted;
|
|
bool fCpiPassed;
|
|
bool fCpiFailed;
|
|
bool fBrmRptRcvd;
|
|
int fRollbackRslt;
|
|
int fCleanupRslt;
|
|
|
|
boost::mutex fTxMutex; // mutex for TxBytes
|
|
boost::mutex fDataRqstMutex;
|
|
boost::mutex fWriteMutex;
|
|
boost::mutex fSentQMutex;
|
|
boost::mutex fLastInMutex;
|
|
typedef std::queue<messageqcpp::SBS> WESendQueue;
|
|
WESendQueue fSendQueue;
|
|
|
|
std::string fServer;
|
|
std::string fIpAddress;
|
|
boost::shared_ptr<messageqcpp::MessageQueueClient> fClnt;
|
|
boost::thread* fpThread;
|
|
WESDHandler& fOwner;
|
|
|
|
class WERowsUploadInfo
|
|
{
|
|
public:
|
|
WERowsUploadInfo() : fRowsRead(0), fRowsInserted(0)
|
|
{
|
|
}
|
|
~WERowsUploadInfo()
|
|
{
|
|
}
|
|
|
|
public:
|
|
int64_t fRowsRead;
|
|
int64_t fRowsInserted;
|
|
};
|
|
WERowsUploadInfo fRowsUploadInfo;
|
|
WEColOorVec fColOorVec;
|
|
std::string fBadDataFile;
|
|
std::string fErrInfoFile;
|
|
|
|
void setRowsUploadInfo(int64_t RowsRead, int64_t RowsInserted);
|
|
void add2ColOutOfRangeInfo(int ColNum, execplan::CalpontSystemCatalog::ColDataType ColType,
|
|
std::string& ColName, int NoOfOors);
|
|
void setBadDataFile(const std::string& BadDataFile);
|
|
void setErrInfoFile(const std::string& ErrInfoFile);
|
|
|
|
friend class WESDHandler;
|
|
};
|
|
//------------------------------------------------------------------------------
|
|
|
|
} /* namespace WriteEngine */
|