You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-31 18:30:33 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			302 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			302 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| /***********************************************************************
 | |
|  *   $Id: querystats.cpp 4028 2013-08-02 18:49:00Z zzhu $
 | |
|  *
 | |
|  *
 | |
|  ***********************************************************************/
 | |
| 
 | |
| #define PREFER_MY_CONFIG_H
 | |
| #include <my_config.h>
 | |
| 
 | |
| #include <string>
 | |
| using namespace std;
 | |
| 
 | |
| #include <boost/scoped_array.hpp>
 | |
| 
 | |
| #include "bytestream.h"
 | |
| using namespace messageqcpp;
 | |
| 
 | |
| #include "resourcemanager.h"
 | |
| using namespace joblist;
 | |
| 
 | |
| #include "errorids.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "libmysql_client.h"
 | |
| 
 | |
| #include "querystats.h"
 | |
| 
 | |
| namespace querystats
 | |
| {
 | |
| const string SCHEMA = "infinidb_querystats";
 | |
| 
 | |
| QueryStats::QueryStats()
 | |
| {
 | |
|   reset();
 | |
| }
 | |
| 
 | |
| void QueryStats::reset()
 | |
| {
 | |
|   fMaxMemPct = 0;
 | |
|   fNumFiles = 0;
 | |
|   fFileBytes = 0;
 | |
|   fPhyIO = 0;
 | |
|   fCacheIO = 0;
 | |
|   fMsgRcvCnt = 0;
 | |
|   fCPBlocksSkipped = 0;
 | |
|   fMsgBytesIn = 0;
 | |
|   fMsgBytesOut = 0;
 | |
|   fRows = 0;
 | |
|   fStartTime = 0;
 | |
|   fEndTime = 0;
 | |
|   fErrorNo = 0;
 | |
|   fBlocksChanged = 0;
 | |
|   fSessionID = (uint64_t)-1;
 | |
|   fStartTimeStr.clear();
 | |
|   fEndTimeStr.clear();
 | |
|   fQueryType.clear();
 | |
|   fQuery.clear();
 | |
|   fHost.clear();
 | |
|   fUser.clear();
 | |
|   fPriority.clear();
 | |
| }
 | |
| 
 | |
| void QueryStats::serialize(ByteStream& b)
 | |
| {
 | |
|   b << (uint64_t)fSessionID;
 | |
|   b << (uint64_t)fMaxMemPct;
 | |
|   b << (uint64_t)fNumFiles;
 | |
|   b << (uint64_t)fFileBytes;
 | |
|   b << (uint64_t)fPhyIO;
 | |
|   b << (uint64_t)fCacheIO;
 | |
|   b << (uint64_t)fMsgRcvCnt;
 | |
|   b << (uint64_t)fCPBlocksSkipped;
 | |
|   b << (uint64_t)fMsgBytesIn;
 | |
|   b << (uint64_t)fMsgBytesOut;
 | |
|   b << (uint64_t)fRows;
 | |
|   b << fStartTimeStr;
 | |
|   b << fEndTimeStr;
 | |
|   b << (uint64_t)fErrorNo;
 | |
|   b << (uint64_t)fBlocksChanged;
 | |
|   b << fQuery;
 | |
|   b << fQueryType;
 | |
|   b << fHost;
 | |
|   b << fUser;
 | |
|   b << fPriority;
 | |
| }
 | |
| 
 | |
| // unserialize new stats and keep the stats set in this.
 | |
| void QueryStats::unserialize(ByteStream& b)
 | |
| {
 | |
|   uint64_t temp = 0;
 | |
|   string str;
 | |
|   b >> (uint64_t&)temp;
 | |
|   fSessionID = (fSessionID == (uint64_t)-1 ? temp : fSessionID);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fMaxMemPct = (fMaxMemPct == 0 ? temp : fMaxMemPct);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fNumFiles = (fNumFiles == 0 ? temp : fNumFiles);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fFileBytes = (fFileBytes == 0 ? temp : fFileBytes);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fPhyIO = (fPhyIO == 0 ? temp : fPhyIO);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fCacheIO = (fCacheIO == 0 ? temp : fCacheIO);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fMsgRcvCnt = (fMsgRcvCnt == 0 ? temp : fMsgRcvCnt);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fCPBlocksSkipped = (fCPBlocksSkipped == 0 ? temp : fCPBlocksSkipped);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fMsgBytesIn = (fMsgBytesIn == 0 ? temp : fMsgBytesIn);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fMsgBytesOut = (fMsgBytesOut == 0 ? temp : fMsgBytesOut);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fRows = (fRows == 0 ? temp : fRows);
 | |
| 
 | |
|   b >> str;
 | |
|   fStartTimeStr = (fStartTimeStr.empty() ? str : fStartTimeStr);
 | |
|   b >> str;
 | |
|   fEndTimeStr = (fEndTimeStr.empty() ? str : fEndTimeStr);
 | |
| 
 | |
|   b >> (uint64_t&)temp;
 | |
|   fErrorNo = (fErrorNo == 0 ? temp : fErrorNo);
 | |
|   b >> (uint64_t&)temp;
 | |
|   fBlocksChanged = (fBlocksChanged == 0 ? temp : fBlocksChanged);
 | |
| 
 | |
|   b >> str;
 | |
|   fQuery = (fQuery.empty() ? str : fQuery);
 | |
|   b >> str;
 | |
|   fQueryType = (fQueryType.empty() ? str : fQueryType);
 | |
|   b >> str;
 | |
|   fHost = (fHost.empty() ? str : fHost);
 | |
|   b >> str;
 | |
|   fUser = (fUser.empty() ? str : fUser);
 | |
|   b >> str;
 | |
|   fPriority = (fPriority.empty() ? str : fPriority);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * QueryStats table definition
 | |
|    create table QueryStats
 | |
|    (
 | |
|      queryID           bigint primary key auto_increment,
 | |
|      sessionID         bigint,
 | |
|      queryType         char(25),
 | |
|      query             blob,
 | |
|      startTime         timestamp,
 | |
|      endTime           timestamp,
 | |
|      rows              bigint,
 | |
|      errno             int,
 | |
|      phyIO             bigint,
 | |
|      cacheIO           bigint,
 | |
|      blocksTouched     bigint,
 | |
|      CPBlocksSkipped   bigint,
 | |
|      msgInUM           bigint,
 | |
|      msgOutUm          bigint,
 | |
|      maxMemPct         int,    // not populated for now
 | |
|      blocksChanged     bigint, // for dml
 | |
|      numTempFiles      bigint, // not populated for now
 | |
|      tempFileSpace     bigint  // not populated for now
 | |
|    )
 | |
|  */
 | |
| void QueryStats::insert()
 | |
| {
 | |
|   ResourceManager* rm = ResourceManager::instance();
 | |
| 
 | |
|   // check if query stats is enabled in Columnstore.xml
 | |
|   if (!rm->queryStatsEnabled())
 | |
|     return;
 | |
| 
 | |
|   // get configure for every query to allow only changing of connect info
 | |
|   int ret;
 | |
|   string host, user, pwd;
 | |
|   uint32_t port;
 | |
| 
 | |
|   if (rm->getMysqldInfo(host, user, pwd, port) == false)
 | |
|     throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG);
 | |
| 
 | |
|   // insert stats to querystats table
 | |
|   utils::LibMySQL mysql;
 | |
| 
 | |
|   ret = mysql.init(host.c_str(), port, user.c_str(), pwd.c_str(), SCHEMA.c_str());
 | |
| 
 | |
|   if (ret != 0)
 | |
|     mysql.handleMySqlError(mysql.getError().c_str(), ret);
 | |
| 
 | |
|   // escape quote characters
 | |
|   boost::scoped_array<char> query(new char[fQuery.length() * 2 + 1]);
 | |
|   mysql_real_escape_string(mysql.getMySqlCon(), query.get(), fQuery.c_str(), fQuery.length());
 | |
| 
 | |
|   ostringstream insert;
 | |
|   insert << "insert into querystats values (0, ";
 | |
|   insert << fSessionID << ", ";
 | |
|   insert << "'" << fHost << "', ";
 | |
|   insert << "'" << fUser << "', ";
 | |
|   insert << "'" << fPriority << "', ";
 | |
|   insert << "'" << fQueryType << "', ";
 | |
|   insert << "'" << query.get() << "', ";
 | |
|   insert << "'" << fStartTimeStr << "', ";
 | |
|   insert << "'" << fEndTimeStr << "', ";
 | |
|   insert << fRows << ", ";
 | |
|   insert << fErrorNo << ", ";
 | |
|   insert << fPhyIO << ", ";
 | |
|   insert << fCacheIO << ", ";
 | |
|   insert << fMsgRcvCnt << ", ";
 | |
|   insert << fCPBlocksSkipped << ", ";
 | |
|   insert << fMsgBytesIn << ", ";
 | |
|   insert << fMsgBytesOut << ", ";
 | |
|   insert << fMaxMemPct << ", ";
 | |
|   insert << fBlocksChanged << ", ";
 | |
|   insert << fNumFiles << ", ";
 | |
|   insert << fFileBytes << ")";  // the last 2 fields are not populated yet
 | |
| 
 | |
|   ret = mysql.run(insert.str().c_str(), false);
 | |
| 
 | |
|   if (ret != 0)
 | |
|     mysql.handleMySqlError(mysql.getError().c_str(), ret);
 | |
| }
 | |
| 
 | |
| uint32_t QueryStats::userPriority(string _host, const string _user)
 | |
| {
 | |
|   // priority has been set already
 | |
|   if (!fPriority.empty())
 | |
|     return fPriorityLevel;
 | |
| 
 | |
|   bool runningInExeMgr = false;
 | |
|   ResourceManager rm(runningInExeMgr);
 | |
|   fPriorityLevel = DEFAULT_USER_PRIORITY_LEVEL;
 | |
|   fPriority = DEFAULT_USER_PRIORITY;
 | |
| 
 | |
|   // check if query stats is enabled in Columnstore.xml
 | |
|   if (!rm.userPriorityEnabled())
 | |
|   {
 | |
|     fPriority = DEFAULT_USER_PRIORITY;
 | |
|     fPriorityLevel = DEFAULT_USER_PRIORITY_LEVEL;
 | |
|     return fPriorityLevel;
 | |
|   }
 | |
| 
 | |
|   string host, user, pwd;
 | |
|   uint32_t port;
 | |
| 
 | |
|   // get configure for every query to allow only changing of connect info
 | |
|   if (rm.getMysqldInfo(host, user, pwd, port) == false)
 | |
|     throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG);
 | |
| 
 | |
|   // get user priority
 | |
|   utils::LibMySQL mysql;
 | |
| 
 | |
|   int ret;
 | |
|   ret = mysql.init(host.c_str(), port, user.c_str(), pwd.c_str(), SCHEMA.c_str());
 | |
| 
 | |
|   if (ret != 0)
 | |
|     mysql.handleMySqlError(mysql.getError().c_str(), ret);
 | |
| 
 | |
|   // get the part of host string befor ':' if there is.
 | |
|   size_t pos = _host.find(':', 0);
 | |
| 
 | |
|   if (pos != string::npos)
 | |
|     _host = _host.substr(0, pos);
 | |
| 
 | |
|   ostringstream query;
 | |
|   query << "select a.priority, priority_level from user_priority a, priority b where \
 | |
| 	          upper(case when INSTR(host, ':') = 0 \
 | |
| 	          then host \
 | |
| 	          else SUBSTR(host, 1, INSTR(host, ':')-1 ) \
 | |
| 	          end)=upper('"
 | |
|         << _host << "') and upper(user)=upper('" << _user << "') and upper(a.priority) = upper(b.priority)";
 | |
| 
 | |
|   ret = mysql.run(query.str().c_str());
 | |
| 
 | |
|   if (ret != 0)
 | |
|     mysql.handleMySqlError(mysql.getError().c_str(), ret);
 | |
| 
 | |
|   char** rowIn;
 | |
|   rowIn = mysql.nextRow();
 | |
| 
 | |
|   if (rowIn)
 | |
|   {
 | |
|     fPriority = rowIn[0];
 | |
|     fPriorityLevel = atoi(rowIn[1]);
 | |
|   }
 | |
| 
 | |
|   return fPriorityLevel;
 | |
| }
 | |
| 
 | |
| }  // namespace querystats
 |