You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			606 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			606 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2021 MariaDB Corporation
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| #include <iostream>
 | |
| #include <atomic>
 | |
| #include <boost/filesystem.hpp>
 | |
| 
 | |
| #include "IDBPolicy.h"
 | |
| #include "brmtypes.h"
 | |
| #include "hasher.h"
 | |
| #include "messagequeue.h"
 | |
| #include "configcpp.h"
 | |
| #include "statistics.h"
 | |
| 
 | |
| using namespace idbdatafile;
 | |
| using namespace logging;
 | |
| using namespace std;
 | |
| namespace statistics
 | |
| {
 | |
| StatisticsManager* StatisticsManager::instance()
 | |
| {
 | |
|   static StatisticsManager* sm = new StatisticsManager();
 | |
|   return sm;
 | |
| }
 | |
| 
 | |
| void StatisticsManager::collectSample(const rowgroup::RowGroup& rowGroup)
 | |
| {
 | |
|   std::lock_guard<std::mutex> lock(mut);
 | |
|   const auto rowCount = rowGroup.getRowCount();
 | |
|   const auto columnCount = rowGroup.getColumnCount();
 | |
|   if (!rowCount || !columnCount)
 | |
|     return;
 | |
| 
 | |
|   const auto& oids = rowGroup.getOIDs();
 | |
|   for (const auto oid : oids)
 | |
|   {
 | |
|     // Initialize a column data with 0.
 | |
|     if (!columnGroups.count(oid))
 | |
|       columnGroups[oid] = std::vector<uint64_t>(maxSampleSize, 0);
 | |
|   }
 | |
| 
 | |
|   // Initialize a first row from the given `rowGroup`.
 | |
|   rowgroup::Row r;
 | |
|   rowGroup.initRow(&r);
 | |
|   rowGroup.getRow(0, &r);
 | |
| 
 | |
|   // Generate a uniform distribution.
 | |
|   for (uint32_t i = 0; i < rowCount; ++i)
 | |
|   {
 | |
|     if (currentSampleSize < maxSampleSize)
 | |
|     {
 | |
|       for (uint32_t j = 0; j < columnCount; ++j)
 | |
|       {
 | |
|         if (!r.isNullValue(j))
 | |
|           columnGroups[oids[j]][currentSampleSize] = r.getIntField(j);
 | |
|       }
 | |
|       ++currentSampleSize;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       const uint32_t index = uniformDistribution(gen32);
 | |
|       if (index < maxSampleSize)
 | |
|       {
 | |
|         for (uint32_t j = 0; j < columnCount; ++j)
 | |
|           if (!r.isNullValue(j))
 | |
|             columnGroups[oids[j]][index] = r.getIntField(j);
 | |
|       }
 | |
|     }
 | |
|     r.nextRow();
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StatisticsManager::analyzeSample(bool traceOn)
 | |
| {
 | |
|   if (traceOn)
 | |
|     std::cout << "Sample size: " << currentSampleSize << std::endl;
 | |
| 
 | |
|   // PK_FK statistics.
 | |
|   for (const auto& [oid, sample] : columnGroups)
 | |
|     keyTypes[oid] = KeyType::PK;
 | |
| 
 | |
|   for (const auto& [oid, sample] : columnGroups)
 | |
|   {
 | |
|     std::unordered_set<uint32_t> columnsCache;
 | |
|     std::unordered_map<uint64_t, uint32_t> columnMCV;
 | |
|     for (uint32_t i = 0; i < currentSampleSize; ++i)
 | |
|     {
 | |
|       const auto value = sample[i];
 | |
|       // PK_FK statistics.
 | |
|       if (columnsCache.count(value) && keyTypes[oid] == KeyType::PK)
 | |
|         keyTypes[oid] = KeyType::FK;
 | |
|       else
 | |
|         columnsCache.insert(value);
 | |
| 
 | |
|       // MCV statistics.
 | |
|       if (columnMCV.count(value))
 | |
|         columnMCV[value]++;
 | |
|       else
 | |
|         columnMCV.insert({value, 1});
 | |
|     }
 | |
| 
 | |
|     // MCV statistics.
 | |
|     std::vector<pair<uint64_t, uint32_t>> mcvList(columnMCV.begin(), columnMCV.end());
 | |
|     std::sort(mcvList.begin(), mcvList.end(),
 | |
|               [](const std::pair<uint64_t, uint32_t>& a, const std::pair<uint64_t, uint32_t>& b) {
 | |
|                 return a.second > b.second;
 | |
|               });
 | |
| 
 | |
|     // 200 buckets as Microsoft does.
 | |
|     const auto mcvSize = std::min(columnMCV.size(), static_cast<uint64_t>(200));
 | |
|     mcv[oid] = std::unordered_map<uint64_t, uint32_t>(mcvList.begin(), mcvList.begin() + mcvSize);
 | |
|   }
 | |
| 
 | |
|   if (traceOn)
 | |
|     output();
 | |
| 
 | |
|   // Clear sample.
 | |
|   columnGroups.clear();
 | |
|   currentSampleSize = 0;
 | |
| }
 | |
| 
 | |
| void StatisticsManager::output()
 | |
| {
 | |
|   std::cout << "Columns count: " << keyTypes.size() << std::endl;
 | |
| 
 | |
|   std::cout << "Statistics type [PK_FK]:  " << std::endl;
 | |
|   for (const auto& p : keyTypes)
 | |
|   {
 | |
|     std::cout << "[OID: " << p.first << ": ";
 | |
|     if (static_cast<uint32_t>(p.second) == 0)
 | |
|       std::cout << "PK] ";
 | |
|     else
 | |
|       std::cout << "FK] ";
 | |
|   }
 | |
| 
 | |
|   std::cout << "\nStatistics type [MCV]: " << std::endl;
 | |
|   for (const auto& [oid, columnMCV] : mcv)
 | |
|   {
 | |
|     std::cout << "[OID: " << oid << std::endl;
 | |
|     for (const auto& [value, count] : columnMCV)
 | |
|       std::cout << value << ": " << count << ", ";
 | |
|     cout << "]" << endl;
 | |
|   }
 | |
| }
 | |
| 
 | |
| // Someday it will be a virtual method, based on statistics type we processing.
 | |
| std::unique_ptr<char[]> StatisticsManager::convertStatsToDataStream(uint64_t& dataStreamSize)
 | |
| {
 | |
|   // Number of pairs.
 | |
|   uint64_t count = keyTypes.size();
 | |
|   // count, [[uid, keyType], ... ]
 | |
|   dataStreamSize = sizeof(uint64_t) + count * (sizeof(uint32_t) + sizeof(KeyType));
 | |
| 
 | |
|   // Count the size of the MCV.
 | |
|   for (const auto& [oid, mcvColumn] : mcv)
 | |
|   {
 | |
|     // [oid, list size, list [value, count]]
 | |
|     dataStreamSize +=
 | |
|         (sizeof(uint32_t) + sizeof(uint32_t) + ((sizeof(uint64_t) + sizeof(uint32_t)) * mcvColumn.size()));
 | |
|   }
 | |
| 
 | |
|   // Allocate memory for data stream.
 | |
|   std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
 | |
|   auto* dataStream = dataStreamSmartPtr.get();
 | |
|   // Initialize the data stream.
 | |
|   uint64_t offset = 0;
 | |
|   std::memcpy(dataStream, reinterpret_cast<char*>(&count), sizeof(uint64_t));
 | |
|   offset += sizeof(uint64_t);
 | |
| 
 | |
|   // For each pair [oid, key type].
 | |
|   for (const auto& p : keyTypes)
 | |
|   {
 | |
|     uint32_t oid = p.first;
 | |
|     std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
 | |
|     offset += sizeof(uint32_t);
 | |
|     KeyType keyType = p.second;
 | |
|     std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&keyType), sizeof(KeyType));
 | |
|     offset += sizeof(KeyType);
 | |
|   }
 | |
| 
 | |
|   // For each [oid, list size, list [value, count]].
 | |
|   for (const auto& p : mcv)
 | |
|   {
 | |
|     // [oid]
 | |
|     uint32_t oid = p.first;
 | |
|     std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&oid), sizeof(uint32_t));
 | |
|     offset += sizeof(uint32_t);
 | |
| 
 | |
|     // [list size]
 | |
|     const auto& mcvColumn = p.second;
 | |
|     uint32_t size = mcvColumn.size();
 | |
|     std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&size), sizeof(uint32_t));
 | |
|     offset += sizeof(uint32_t);
 | |
| 
 | |
|     // [list [value, count]]
 | |
|     for (const auto& mcvPair : mcvColumn)
 | |
|     {
 | |
|       uint64_t value = mcvPair.first;
 | |
|       std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&value), sizeof(uint64_t));
 | |
|       offset += sizeof(uint64_t);
 | |
|       uint32_t count = mcvPair.second;
 | |
|       std::memcpy(&dataStream[offset], reinterpret_cast<char*>(&count), sizeof(uint32_t));
 | |
|       offset += sizeof(uint32_t);
 | |
|     }
 | |
|   }
 | |
|   return dataStreamSmartPtr;
 | |
| }
 | |
| 
 | |
| void StatisticsManager::convertStatsFromDataStream(std::unique_ptr<char[]> dataStreamSmartPtr)
 | |
| {
 | |
|   auto* dataStream = dataStreamSmartPtr.get();
 | |
|   uint64_t count = 0;
 | |
|   std::memcpy(reinterpret_cast<char*>(&count), dataStream, sizeof(uint64_t));
 | |
|   uint64_t offset = sizeof(uint64_t);
 | |
| 
 | |
|   // For each pair.
 | |
|   for (uint64_t i = 0; i < count; ++i)
 | |
|   {
 | |
|     uint32_t oid;
 | |
|     KeyType keyType;
 | |
|     std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
 | |
|     offset += sizeof(uint32_t);
 | |
|     std::memcpy(reinterpret_cast<char*>(&keyType), &dataStream[offset], sizeof(KeyType));
 | |
|     offset += sizeof(KeyType);
 | |
|     keyTypes[oid] = keyType;
 | |
|   }
 | |
| 
 | |
|   for (uint64_t i = 0; i < count; ++i)
 | |
|   {
 | |
|     uint32_t oid;
 | |
|     std::memcpy(reinterpret_cast<char*>(&oid), &dataStream[offset], sizeof(uint32_t));
 | |
|     offset += sizeof(uint32_t);
 | |
| 
 | |
|     uint32_t mcvSize;
 | |
|     std::memcpy(reinterpret_cast<char*>(&mcvSize), &dataStream[offset], sizeof(uint32_t));
 | |
|     offset += sizeof(uint32_t);
 | |
| 
 | |
|     std::unordered_map<uint64_t, uint32_t> columnMCV;
 | |
|     for (uint32_t j = 0; j < mcvSize; ++j)
 | |
|     {
 | |
|       uint64_t value;
 | |
|       std::memcpy(reinterpret_cast<char*>(&value), &dataStream[offset], sizeof(uint64_t));
 | |
|       offset += sizeof(uint64_t);
 | |
|       uint32_t count;
 | |
|       std::memcpy(reinterpret_cast<char*>(&count), &dataStream[offset], sizeof(uint32_t));
 | |
|       offset += sizeof(uint32_t);
 | |
|       columnMCV[value] = count;
 | |
|     }
 | |
|     mcv[oid] = std::move(columnMCV);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StatisticsManager::saveToFile()
 | |
| {
 | |
|   std::lock_guard<std::mutex> lock(mut);
 | |
| 
 | |
|   const char* fileName = statsFile.c_str();
 | |
|   std::unique_ptr<IDBDataFile> out(
 | |
|       IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "wb", 1));
 | |
| 
 | |
|   if (!out)
 | |
|   {
 | |
|     BRM::log_errno("StatisticsManager::saveToFile(): open");
 | |
|     throw ios_base::failure("StatisticsManager::saveToFile(): open failed.");
 | |
|   }
 | |
| 
 | |
|   // Compute hash.
 | |
|   uint64_t dataStreamSize = 0;
 | |
|   std::unique_ptr<char[]> dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize);
 | |
|   utils::Hasher128 hasher;
 | |
| 
 | |
|   // Prepare a statistics file header.
 | |
|   const uint32_t headerSize = sizeof(StatisticsFileHeader);
 | |
|   StatisticsFileHeader fileHeader;
 | |
|   std::memset(&fileHeader, 0, headerSize);
 | |
|   fileHeader.version = version;
 | |
|   fileHeader.epoch = epoch;
 | |
|   fileHeader.dataSize = dataStreamSize;
 | |
|   // Compute hash from the data.
 | |
|   fileHeader.dataHash = hasher(dataStreamSmartPtr.get(), dataStreamSize);
 | |
| 
 | |
|   // Write statistics file header.
 | |
|   uint64_t size = out->write(reinterpret_cast<char*>(&fileHeader), headerSize);
 | |
|   if (size != headerSize)
 | |
|   {
 | |
|     auto rc = IDBPolicy::remove(fileName);
 | |
|     if (rc == -1)
 | |
|       std::cerr << "Cannot remove file " << fileName << std::endl;
 | |
| 
 | |
|     throw ios_base::failure("StatisticsManager::saveToFile(): write failed. ");
 | |
|   }
 | |
| 
 | |
|   // Write data.
 | |
|   size = out->write(dataStreamSmartPtr.get(), dataStreamSize);
 | |
|   if (size != dataStreamSize)
 | |
|   {
 | |
|     auto rc = IDBPolicy::remove(fileName);
 | |
|     if (rc == -1)
 | |
|       std::cerr << "Cannot remove file " << fileName << std::endl;
 | |
| 
 | |
|     throw ios_base::failure("StatisticsManager::saveToFile(): write failed. ");
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StatisticsManager::loadFromFile()
 | |
| {
 | |
|   std::lock_guard<std::mutex> lock(mut);
 | |
|   // Check that stats file does exist.
 | |
|   if (!boost::filesystem::exists(statsFile))
 | |
|     return;
 | |
| 
 | |
|   const char* fileName = statsFile.c_str();
 | |
|   std::unique_ptr<IDBDataFile> in(
 | |
|       IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "rb", 1));
 | |
| 
 | |
|   if (!in)
 | |
|   {
 | |
|     BRM::log_errno("StatisticsManager::loadFromFile(): open");
 | |
|     throw ios_base::failure("StatisticsManager::loadFromFile(): open failed. Check the error log.");
 | |
|   }
 | |
| 
 | |
|   // Read the file header.
 | |
|   StatisticsFileHeader fileHeader;
 | |
|   const uint32_t headerSize = sizeof(StatisticsFileHeader);
 | |
|   int64_t size = in->read(reinterpret_cast<char*>(&fileHeader), headerSize);
 | |
|   if (size != headerSize)
 | |
|     throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. ");
 | |
| 
 | |
|   // Initialize fields from the file header.
 | |
|   version = fileHeader.version;
 | |
|   epoch = fileHeader.epoch;
 | |
|   const auto dataHash = fileHeader.dataHash;
 | |
|   const auto dataStreamSize = fileHeader.dataSize;
 | |
| 
 | |
|   // Allocate the memory for the file data.
 | |
|   std::unique_ptr<char[]> dataStreamSmartPtr(new char[dataStreamSize]);
 | |
|   auto* dataStream = dataStreamSmartPtr.get();
 | |
| 
 | |
|   // Read the data.
 | |
|   uint64_t dataOffset = 0;
 | |
|   auto sizeToRead = dataStreamSize;
 | |
|   size = in->read(dataStream, sizeToRead);
 | |
|   sizeToRead -= size;
 | |
|   dataOffset += size;
 | |
| 
 | |
|   while (sizeToRead > 0)
 | |
|   {
 | |
|     size = in->read(dataStream + dataOffset, sizeToRead);
 | |
|     if (size < 0)
 | |
|       throw ios_base::failure("StatisticsManager::loadFromFile(): read failed. ");
 | |
| 
 | |
|     sizeToRead -= size;
 | |
|     dataOffset += size;
 | |
|   }
 | |
| 
 | |
|   utils::Hasher128 hasher;
 | |
|   auto computedDataHash = hasher(dataStream, dataStreamSize);
 | |
|   if (dataHash != computedDataHash)
 | |
|     throw ios_base::failure("StatisticsManager::loadFromFile(): invalid file hash. ");
 | |
| 
 | |
|   convertStatsFromDataStream(std::move(dataStreamSmartPtr));
 | |
| }
 | |
| 
 | |
| uint64_t StatisticsManager::computeHashFromStats()
 | |
| {
 | |
|   utils::Hasher128 hasher;
 | |
|   uint64_t dataStreamSize = 0;
 | |
|   std::unique_ptr<char[]> dataStreamSmartPtr = convertStatsToDataStream(dataStreamSize);
 | |
|   return hasher(dataStreamSmartPtr.get(), dataStreamSize);
 | |
| }
 | |
| 
 | |
| void StatisticsManager::serialize(messageqcpp::ByteStream& bs)
 | |
| {
 | |
|   uint64_t count = keyTypes.size();
 | |
|   bs << version;
 | |
|   bs << epoch;
 | |
|   bs << count;
 | |
| 
 | |
|   // PK_FK
 | |
|   for (const auto& keyType : keyTypes)
 | |
|   {
 | |
|     bs << keyType.first;
 | |
|     bs << (uint32_t)keyType.second;
 | |
|   }
 | |
| 
 | |
|   // MCV
 | |
|   for (const auto& p : mcv)
 | |
|   {
 | |
|     bs << p.first;
 | |
|     const auto& mcvColumn = p.second;
 | |
|     bs << static_cast<uint32_t>(mcvColumn.size());
 | |
|     for (const auto& mcvPair : mcvColumn)
 | |
|     {
 | |
|       bs << mcvPair.first;
 | |
|       bs << mcvPair.second;
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StatisticsManager::unserialize(messageqcpp::ByteStream& bs)
 | |
| {
 | |
|   uint64_t count;
 | |
|   bs >> version;
 | |
|   bs >> epoch;
 | |
|   bs >> count;
 | |
| 
 | |
|   // PK_FK
 | |
|   for (uint32_t i = 0; i < count; ++i)
 | |
|   {
 | |
|     uint32_t oid, keyType;
 | |
|     bs >> oid;
 | |
|     bs >> keyType;
 | |
|     keyTypes[oid] = static_cast<KeyType>(keyType);
 | |
|   }
 | |
| 
 | |
|   // MCV
 | |
|   for (uint32_t i = 0; i < count; ++i)
 | |
|   {
 | |
|     uint32_t oid, mcvSize;
 | |
|     bs >> oid;
 | |
|     bs >> mcvSize;
 | |
|     std::unordered_map<uint64_t, uint32_t> mcvColumn;
 | |
| 
 | |
|     for (uint32_t j = 0; j < mcvSize; ++j)
 | |
|     {
 | |
|       uint64_t value;
 | |
|       uint32_t count;
 | |
|       bs >> value;
 | |
|       bs >> count;
 | |
|       mcvColumn[value] = count;
 | |
|     }
 | |
| 
 | |
|     mcv[oid] = std::move(mcvColumn);
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool StatisticsManager::hasKey(uint32_t oid)
 | |
| {
 | |
|   return keyTypes.count(oid) > 0 ? true : false;
 | |
| }
 | |
| 
 | |
| KeyType StatisticsManager::getKeyType(uint32_t oid)
 | |
| {
 | |
|   return keyTypes[oid];
 | |
| }
 | |
| 
 | |
| StatisticsDistributor* StatisticsDistributor::instance()
 | |
| {
 | |
|   static StatisticsDistributor* sd = new StatisticsDistributor();
 | |
|   return sd;
 | |
| }
 | |
| 
 | |
| void StatisticsDistributor::distributeStatistics()
 | |
| {
 | |
|   countClients();
 | |
|   {
 | |
|     std::lock_guard<std::mutex> lock(mut);
 | |
|     // No clients.
 | |
|     if (clientsCount == 0)
 | |
|       return;
 | |
| 
 | |
| #ifdef DEBUG_STATISTICS
 | |
|     std::cout << "Distribute statistics from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
 | |
| #endif
 | |
| 
 | |
|     messageqcpp::ByteStream msg, statsHash, statsBs;
 | |
|     // Current hash.
 | |
|     statsHash << statistics::StatisticsManager::instance()->computeHashFromStats();
 | |
|     // Statistics.
 | |
|     statistics::StatisticsManager::instance()->serialize(statsBs);
 | |
| 
 | |
|     for (uint32_t i = 0; i < clientsCount; ++i)
 | |
|     {
 | |
|       try
 | |
|       {
 | |
|         messageqcpp::ByteStream::quadbyte qb = ANALYZE_TABLE_REC_STATS;
 | |
|         msg << qb;
 | |
| 
 | |
|         auto exeMgrID = "ExeMgr" + std::to_string(i + 2);
 | |
|         // Create a client.
 | |
|         std::unique_ptr<messageqcpp::MessageQueueClient> exemgrClient(
 | |
|             new messageqcpp::MessageQueueClient(exeMgrID));
 | |
| 
 | |
| #ifdef DEBUG_STATISTICS
 | |
|         std::cout << "Try to connect to " << exeMgrID << std::endl;
 | |
| #endif
 | |
|         // Try to connect to the client.
 | |
|         if (!exemgrClient->connect())
 | |
|         {
 | |
|           msg.restart();
 | |
| #ifdef DEBUG_STATISTICS
 | |
|           std::cout << "Unable to connect to " << exeMgrID << std::endl;
 | |
| #endif
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
| #ifdef DEBUG_STATISTICS
 | |
|         std::cout << "Write flag ANALYZE_TABLE_REC_STATS from ExeMgr(Server) to ExeMgr(Clients) "
 | |
|                   << std::endl;
 | |
| #endif
 | |
|         // Write a flag to client ExeMgr.
 | |
|         exemgrClient->write(msg);
 | |
| 
 | |
| #ifdef DEBUG_STATISTICS
 | |
|         std::cout << "Write statistics hash from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
 | |
| #endif
 | |
|         // Write a hash of the stats.
 | |
|         exemgrClient->write(statsHash);
 | |
| 
 | |
|         // Read the state from Client.
 | |
|         msg.restart();
 | |
|         msg = exemgrClient->read();
 | |
|         msg >> qb;
 | |
| 
 | |
|         // Do not need a stats.
 | |
|         if (qb == ANALYZE_TABLE_SUCCESS)
 | |
|         {
 | |
|           msg.restart();
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
| #ifdef DEBUG_STATISTICS
 | |
|         std::cout << "Write statistics bytestream from ExeMgr(Server) to ExeMgr(Clients) " << std::endl;
 | |
| #endif
 | |
|         // Write a statistics to client ExeMgr.
 | |
|         exemgrClient->write(statsBs);
 | |
| 
 | |
|         // Read the flag back from the client ExeMgr.
 | |
|         msg.restart();
 | |
|         msg = exemgrClient->read();
 | |
| 
 | |
|         if (msg.length() == 0)
 | |
|           throw runtime_error("Lost conection to ExeMgr.");
 | |
| #ifdef DEBUG_STATISTICS
 | |
|         std::cout << "Read flag on ExeMgr(Server) from ExeMgr(Client) " << std::endl;
 | |
| #endif
 | |
|         msg.restart();
 | |
|       }
 | |
|       catch (std::exception& e)
 | |
|       {
 | |
|         msg.restart();
 | |
|         std::cerr << "distributeStatistics() failed with error: " << e.what() << std::endl;
 | |
|       }
 | |
|       catch (...)
 | |
|       {
 | |
|         msg.restart();
 | |
|         std::cerr << "distributeStatistics() failed with unknown error." << std::endl;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void StatisticsDistributor::countClients()
 | |
| {
 | |
| #ifdef DEBUG_STATISTICS
 | |
|   std::cout << "count clients to distribute statistics " << std::endl;
 | |
| #endif
 | |
|   auto* config = config::Config::makeConfig();
 | |
|   // Starting from the ExeMgr2, since the Server starts on the ExeMgr1.
 | |
|   std::atomic<uint32_t> exeMgrNumber(2);
 | |
| 
 | |
|   try
 | |
|   {
 | |
|     while (true)
 | |
|     {
 | |
|       auto exeMgrID = "ExeMgr" + std::to_string(exeMgrNumber);
 | |
|       auto exeMgrIP = config->getConfig(exeMgrID, "IPAddr");
 | |
|       if (exeMgrIP == "")
 | |
|         break;
 | |
| #ifdef DEBUG_STATISTICS
 | |
|       std::cout << "Client: " << exeMgrID << std::endl;
 | |
| #endif
 | |
|       ++exeMgrNumber;
 | |
|     }
 | |
|   }
 | |
|   catch (std::exception& e)
 | |
|   {
 | |
|     std::cerr << "countClients() failed with error: " << e.what() << std::endl;
 | |
|   }
 | |
|   catch (...)
 | |
|   {
 | |
|     std::cerr << "countClients() failed with unknown error: ";
 | |
|   }
 | |
| 
 | |
|   clientsCount = exeMgrNumber - 2;
 | |
| #ifdef DEBUG_STATISTICS
 | |
|   std::cout << "Number of clients: " << clientsCount << std::endl;
 | |
| #endif
 | |
| }
 | |
| 
 | |
| }  // namespace statistics
 |