/* Copyright (C) 2014 InfiniDB, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /*********************************************************************** * $Id: querystats.cpp 4028 2013-08-02 18:49:00Z zzhu $ * * ***********************************************************************/ #define PREFER_MY_CONFIG_H #include #include using namespace std; #include #include "bytestream.h" using namespace messageqcpp; #include "resourcemanager.h" using namespace joblist; #include "errorids.h" using namespace logging; #include "libmysql_client.h" #include "querystats.h" namespace querystats { const string SCHEMA = "infinidb_querystats"; QueryStats::QueryStats() { reset(); } void QueryStats::reset() { fMaxMemPct = 0; fNumFiles = 0; fFileBytes = 0; fPhyIO = 0; fCacheIO = 0; fMsgRcvCnt = 0; fCPBlocksSkipped = 0; fMsgBytesIn = 0; fMsgBytesOut = 0; fRows = 0; fStartTime = 0; fEndTime = 0; fErrorNo = 0; fBlocksChanged = 0; fSessionID = (uint64_t)-1; fStartTimeStr.clear(); fEndTimeStr.clear(); fQueryType.clear(); fQuery.clear(); fHost.clear(); fUser.clear(); fPriority.clear(); } void QueryStats::serialize(ByteStream& b) { b << (uint64_t)fSessionID; b << (uint64_t)fMaxMemPct; b << (uint64_t)fNumFiles; b << (uint64_t)fFileBytes; b << (uint64_t)fPhyIO; b << (uint64_t)fCacheIO; b << (uint64_t)fMsgRcvCnt; b << (uint64_t)fCPBlocksSkipped; b << (uint64_t)fMsgBytesIn; b << (uint64_t)fMsgBytesOut; b << (uint64_t)fRows; b << fStartTimeStr; b << fEndTimeStr; b << (uint64_t)fErrorNo; b << (uint64_t)fBlocksChanged; b << fQuery; b << fQueryType; b << fHost; b << fUser; b << fPriority; } // unserialize new stats and keep the stats set in this. void QueryStats::unserialize(ByteStream& b) { uint64_t temp = 0; string str; b >> (uint64_t&)temp; fSessionID = (fSessionID == (uint64_t)-1 ? temp : fSessionID); b >> (uint64_t&)temp; fMaxMemPct = (fMaxMemPct == 0 ? temp : fMaxMemPct); b >> (uint64_t&)temp; fNumFiles = (fNumFiles == 0 ? temp : fNumFiles); b >> (uint64_t&)temp; fFileBytes = (fFileBytes == 0 ? temp : fFileBytes); b >> (uint64_t&)temp; fPhyIO = (fPhyIO == 0 ? temp : fPhyIO); b >> (uint64_t&)temp; fCacheIO = (fCacheIO == 0 ? temp : fCacheIO); b >> (uint64_t&)temp; fMsgRcvCnt = (fMsgRcvCnt == 0 ? temp : fMsgRcvCnt); b >> (uint64_t&)temp; fCPBlocksSkipped = (fCPBlocksSkipped == 0 ? temp : fCPBlocksSkipped); b >> (uint64_t&)temp; fMsgBytesIn = (fMsgBytesIn == 0 ? temp : fMsgBytesIn); b >> (uint64_t&)temp; fMsgBytesOut = (fMsgBytesOut == 0 ? temp : fMsgBytesOut); b >> (uint64_t&)temp; fRows = (fRows == 0 ? temp : fRows); b >> str; fStartTimeStr = (fStartTimeStr.empty() ? str : fStartTimeStr); b >> str; fEndTimeStr = (fEndTimeStr.empty() ? str : fEndTimeStr); b >> (uint64_t&)temp; fErrorNo = (fErrorNo == 0 ? temp : fErrorNo); b >> (uint64_t&)temp; fBlocksChanged = (fBlocksChanged == 0 ? temp : fBlocksChanged); b >> str; fQuery = (fQuery.empty() ? str : fQuery); b >> str; fQueryType = (fQueryType.empty() ? str : fQueryType); b >> str; fHost = (fHost.empty() ? str : fHost); b >> str; fUser = (fUser.empty() ? str : fUser); b >> str; fPriority = (fPriority.empty() ? str : fPriority); } /** * QueryStats table definition create table QueryStats ( queryID bigint primary key auto_increment, sessionID bigint, queryType char(25), query blob, startTime timestamp, endTime timestamp, rows bigint, errno int, phyIO bigint, cacheIO bigint, blocksTouched bigint, CPBlocksSkipped bigint, msgInUM bigint, msgOutUm bigint, maxMemPct int, // not populated for now blocksChanged bigint, // for dml numTempFiles bigint, // not populated for now tempFileSpace bigint // not populated for now ) */ void QueryStats::insert() { ResourceManager* rm = ResourceManager::instance(); // check if query stats is enabled in Columnstore.xml if (!rm->queryStatsEnabled()) return; // get configure for every query to allow only changing of connect info int ret; string host, user, pwd; uint32_t port; if (rm->getMysqldInfo(host, user, pwd, port) == false) throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG); // insert stats to querystats table utils::LibMySQL mysql; ret = mysql.init(host.c_str(), port, user.c_str(), pwd.c_str(), SCHEMA.c_str()); if (ret != 0) mysql.handleMySqlError(mysql.getError().c_str(), ret); // escape quote characters boost::scoped_array query(new char[fQuery.length() * 2 + 1]); mysql_real_escape_string(mysql.getMySqlCon(), query.get(), fQuery.c_str(), fQuery.length()); ostringstream insert; insert << "insert into querystats values (0, "; insert << fSessionID << ", "; insert << "'" << fHost << "', "; insert << "'" << fUser << "', "; insert << "'" << fPriority << "', "; insert << "'" << fQueryType << "', "; insert << "'" << query.get() << "', "; insert << "'" << fStartTimeStr << "', "; insert << "'" << fEndTimeStr << "', "; insert << fRows << ", "; insert << fErrorNo << ", "; insert << fPhyIO << ", "; insert << fCacheIO << ", "; insert << fMsgRcvCnt << ", "; insert << fCPBlocksSkipped << ", "; insert << fMsgBytesIn << ", "; insert << fMsgBytesOut << ", "; insert << fMaxMemPct << ", "; insert << fBlocksChanged << ", "; insert << fNumFiles << ", "; insert << fFileBytes << ")"; // the last 2 fields are not populated yet ret = mysql.run(insert.str().c_str(), false); if (ret != 0) mysql.handleMySqlError(mysql.getError().c_str(), ret); } uint32_t QueryStats::userPriority(string _host, const string _user) { // priority has been set already if (!fPriority.empty()) return fPriorityLevel; ResourceManager rm; fPriorityLevel = DEFAULT_USER_PRIORITY_LEVEL; fPriority = DEFAULT_USER_PRIORITY; // check if query stats is enabled in Columnstore.xml if (!rm.userPriorityEnabled()) { fPriority = DEFAULT_USER_PRIORITY; fPriorityLevel = DEFAULT_USER_PRIORITY_LEVEL; return fPriorityLevel; } string host, user, pwd; uint32_t port; // get configure for every query to allow only changing of connect info if (rm.getMysqldInfo(host, user, pwd, port) == false) throw IDBExcept(IDBErrorInfo::instance()->errorMsg(ERR_CROSS_ENGINE_CONFIG), ERR_CROSS_ENGINE_CONFIG); // get user priority utils::LibMySQL mysql; int ret; ret = mysql.init(host.c_str(), port, user.c_str(), pwd.c_str(), SCHEMA.c_str()); if (ret != 0) mysql.handleMySqlError(mysql.getError().c_str(), ret); // get the part of host string befor ':' if there is. size_t pos = _host.find(':', 0); if (pos != string::npos) _host = _host.substr(0, pos); ostringstream query; query << "select a.priority, priority_level from user_priority a, priority b where \ upper(case when INSTR(host, ':') = 0 \ then host \ else SUBSTR(host, 1, INSTR(host, ':')-1 ) \ end)=upper('" << _host << "') and upper(user)=upper('" << _user << "') and upper(a.priority) = upper(b.priority)"; ret = mysql.run(query.str().c_str()); if (ret != 0) mysql.handleMySqlError(mysql.getError().c_str(), ret); char** rowIn; rowIn = mysql.nextRow(); if (rowIn) { fPriority = rowIn[0]; fPriorityLevel = atoi(rowIn[1]); } return fPriorityLevel; } } // namespace querystats