You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-11-03 17:13:17 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			412 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			412 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/* Copyright (C) 2014 InfiniDB, Inc.
 | 
						|
 | 
						|
   This program is free software; you can redistribute it and/or
 | 
						|
   modify it under the terms of the GNU General Public License
 | 
						|
   as published by the Free Software Foundation; version 2 of
 | 
						|
   the License.
 | 
						|
 | 
						|
   This program is distributed in the hope that it will be useful,
 | 
						|
   but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
   GNU General Public License for more details.
 | 
						|
 | 
						|
   You should have received a copy of the GNU General Public License
 | 
						|
   along with this program; if not, write to the Free Software
 | 
						|
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | 
						|
   MA 02110-1301, USA. */
 | 
						|
 | 
						|
/*****************************************************************************
 | 
						|
 * $Id: we_getfilesizes.cpp 4450 2013-01-21 14:13:24Z rdempsey $
 | 
						|
 *
 | 
						|
 ****************************************************************************/
 | 
						|
#include "we_getfilesizes.h"
 | 
						|
 | 
						|
#include <iostream>
 | 
						|
#include <stdexcept>
 | 
						|
using namespace std;
 | 
						|
 | 
						|
#include "calpontsystemcatalog.h"
 | 
						|
using namespace execplan;
 | 
						|
 | 
						|
#include "threadpool.h"
 | 
						|
using namespace threadpool;
 | 
						|
 | 
						|
#include "bytestream.h"
 | 
						|
using namespace messageqcpp;
 | 
						|
#include "we_fileop.h"
 | 
						|
 | 
						|
#include "idbcompress.h"
 | 
						|
using namespace compress;
 | 
						|
 | 
						|
#include "IDBFileSystem.h"
 | 
						|
#include "IDBPolicy.h"
 | 
						|
using namespace idbdatafile;
 | 
						|
 | 
						|
namespace WriteEngine
 | 
						|
{
 | 
						|
struct FileInfo
 | 
						|
{
 | 
						|
  uint32_t partition;      /** @brief Partition for a file*/
 | 
						|
  uint16_t segment;        /** @brief Segment for a file */
 | 
						|
  uint16_t dbRoot;         /** @brief DbRoot for a file */
 | 
						|
  std::string segFileName; /** @brief seg file path */
 | 
						|
  double fileSize;         /** @brief seg file size in giga bytes */
 | 
						|
  void serialize(messageqcpp::ByteStream& bs)
 | 
						|
  {
 | 
						|
    bs << partition;
 | 
						|
    bs << segment;
 | 
						|
    bs << dbRoot;
 | 
						|
    bs << segFileName;
 | 
						|
    bs << (*(uint64_t*)(&fileSize));
 | 
						|
  }
 | 
						|
};
 | 
						|
typedef std::vector<FileInfo> Files;
 | 
						|
typedef std::map<uint32_t, Files> columnMap;
 | 
						|
typedef std::map<int, columnMap*> allColumnMap;
 | 
						|
allColumnMap wholeMap;
 | 
						|
boost::mutex columnMapLock;
 | 
						|
ActiveThreadCounter* activeThreadCounter;
 | 
						|
 | 
						|
size_t readFillBuffer(idbdatafile::IDBDataFile* pFile, char* buffer, size_t bytesReq)
 | 
						|
{
 | 
						|
  char* pBuf = buffer;
 | 
						|
  ssize_t nBytes;
 | 
						|
  size_t bytesToRead = bytesReq;
 | 
						|
  size_t totalBytesRead = 0;
 | 
						|
 | 
						|
  while (1)
 | 
						|
  {
 | 
						|
    nBytes = pFile->read(pBuf, bytesToRead);
 | 
						|
 | 
						|
    if (nBytes > 0)
 | 
						|
      totalBytesRead += nBytes;
 | 
						|
    else
 | 
						|
      break;
 | 
						|
 | 
						|
    if ((size_t)nBytes == bytesToRead)
 | 
						|
      break;
 | 
						|
 | 
						|
    pBuf += nBytes;
 | 
						|
    bytesToRead = bytesToRead - (size_t)nBytes;
 | 
						|
  }
 | 
						|
 | 
						|
  return totalBytesRead;
 | 
						|
}
 | 
						|
 | 
						|
static off64_t getCompressedDataSize(string& fileName)
 | 
						|
{
 | 
						|
  off64_t dataSize = 0;
 | 
						|
  IDBDataFile* pFile = 0;
 | 
						|
  size_t nBytes;
 | 
						|
  // Some IDBPolicy functions can throw exceptions, caller will catch it
 | 
						|
  IDBPolicy::configIDBPolicy();
 | 
						|
  bool bHdfsFile = IDBPolicy::useHdfs();
 | 
						|
 | 
						|
  if (bHdfsFile)
 | 
						|
    pFile = IDBDataFile::open(IDBDataFile::HDFS, fileName.c_str(), "r", 0);
 | 
						|
  else
 | 
						|
    pFile = IDBDataFile::open(IDBDataFile::BUFFERED, fileName.c_str(), "r", 0);
 | 
						|
 | 
						|
  if (!pFile)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Cannot open file " << fileName << " for read.";
 | 
						|
    throw std::runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  //--------------------------------------------------------------------------
 | 
						|
  // Read headers and extract compression pointers
 | 
						|
  //--------------------------------------------------------------------------
 | 
						|
  char hdr1[CompressInterface::HDR_BUF_LEN];
 | 
						|
  nBytes = readFillBuffer(pFile, hdr1, CompressInterface::HDR_BUF_LEN);
 | 
						|
 | 
						|
  if (nBytes != CompressInterface::HDR_BUF_LEN)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Error reading first header from file " << fileName;
 | 
						|
    throw std::runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  int64_t ptrSecSize = compress::CompressInterface::getHdrSize(hdr1) - CompressInterface::HDR_BUF_LEN;
 | 
						|
  char* hdr2 = new char[ptrSecSize];
 | 
						|
  nBytes = readFillBuffer(pFile, hdr2, ptrSecSize);
 | 
						|
 | 
						|
  if ((int64_t)nBytes != ptrSecSize)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Error reading second header from file " << fileName;
 | 
						|
    throw std::runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  CompChunkPtrList chunkPtrs;
 | 
						|
  int rc = compress::CompressInterface::getPtrList(hdr2, ptrSecSize, chunkPtrs);
 | 
						|
  delete[] hdr2;
 | 
						|
 | 
						|
  if (rc != 0)
 | 
						|
  {
 | 
						|
    std::ostringstream oss;
 | 
						|
    oss << "Error decompressing second header from file " << fileName;
 | 
						|
    throw std::runtime_error(oss.str());
 | 
						|
  }
 | 
						|
 | 
						|
  unsigned k = chunkPtrs.size();
 | 
						|
  // last header's offset + length will be the data bytes
 | 
						|
  dataSize = chunkPtrs[k - 1].first + chunkPtrs[k - 1].second;
 | 
						|
  delete pFile;
 | 
						|
  return dataSize;
 | 
						|
}
 | 
						|
struct ColumnThread
 | 
						|
{
 | 
						|
  ColumnThread(uint32_t oid, int32_t compressionType, bool reportRealUse, int key)
 | 
						|
   : fOid(oid), fCompressionType(compressionType), fReportRealUse(reportRealUse), fKey(key)
 | 
						|
  {
 | 
						|
  }
 | 
						|
  void operator()()
 | 
						|
  {
 | 
						|
    Config config;
 | 
						|
    config.initConfigCache();
 | 
						|
    std::vector<uint16_t> rootList;
 | 
						|
    config.getRootIdList(rootList);
 | 
						|
    FileOp fileOp;
 | 
						|
    Files aFiles;
 | 
						|
 | 
						|
    // This function relies on IDBPolicy being initialized by
 | 
						|
    // IDBPolicy::init().  This is done when WriteEngineServer main() calls
 | 
						|
    // IDBPolicy::configIDBPolicy();
 | 
						|
    IDBDataFile::Types fileType;
 | 
						|
    bool bUsingHdfs = IDBPolicy::useHdfs();
 | 
						|
 | 
						|
    if (bUsingHdfs)
 | 
						|
      fileType = IDBDataFile::HDFS;
 | 
						|
    else if (IDBPolicy::useCloud())
 | 
						|
      fileType = IDBDataFile::CLOUD;
 | 
						|
    else
 | 
						|
      fileType = IDBDataFile::UNBUFFERED;
 | 
						|
 | 
						|
    IDBFileSystem& fs = IDBFileSystem::getFs(fileType);
 | 
						|
 | 
						|
    for (uint32_t i = 0; i < rootList.size(); i++)
 | 
						|
    {
 | 
						|
      std::vector<struct BRM::EMEntry> entries;
 | 
						|
      (void)BRMWrapper::getInstance()->getExtents_dbroot(fOid, entries, rootList[i]);
 | 
						|
      std::vector<struct BRM::EMEntry>::const_iterator iter = entries.begin();
 | 
						|
 | 
						|
      while (iter != entries.end())  // organize extents into files
 | 
						|
      {
 | 
						|
        // Find the size of this file
 | 
						|
        // string fileName;
 | 
						|
        char fileName[200];
 | 
						|
        (void)fileOp.getFileName(fOid, fileName, rootList[i], entries[0].partitionNum, entries[0].segmentNum);
 | 
						|
        string aFile(fileName);  // convert between char* and string
 | 
						|
        off64_t fileSize = 0;
 | 
						|
 | 
						|
        if (fReportRealUse && (fCompressionType > 0))
 | 
						|
        {
 | 
						|
          try
 | 
						|
          {
 | 
						|
            fileSize = getCompressedDataSize(aFile);
 | 
						|
          }
 | 
						|
          catch (std::exception& ex)
 | 
						|
          {
 | 
						|
            cerr << ex.what();
 | 
						|
          }
 | 
						|
        }
 | 
						|
        else
 | 
						|
          fileSize = fs.size(fileName);
 | 
						|
 | 
						|
        if (fileSize > 0)  // File exists, add to list
 | 
						|
        {
 | 
						|
          FileInfo aFileInfo;
 | 
						|
          aFileInfo.partition = entries[0].partitionNum;
 | 
						|
          aFileInfo.segment = entries[0].segmentNum;
 | 
						|
          aFileInfo.dbRoot = rootList[i];
 | 
						|
          aFileInfo.segFileName = aFile;
 | 
						|
          aFileInfo.fileSize = (double)fileSize / (1024 * 1024 * 1024);
 | 
						|
          aFiles.push_back(aFileInfo);
 | 
						|
          // cout.precision(15);
 | 
						|
          // cout << "The file " << aFileInfo.segFileName << " has size " << fixed << aFileInfo.fileSize <<
 | 
						|
          // "GB" << endl;
 | 
						|
        }
 | 
						|
 | 
						|
        // erase the entries from this dbroot.
 | 
						|
        std::vector<struct BRM::EMEntry> entriesTrimed;
 | 
						|
 | 
						|
        for (uint32_t m = 0; m < entries.size(); m++)
 | 
						|
        {
 | 
						|
          if ((entries[0].partitionNum != entries[m].partitionNum) ||
 | 
						|
              (entries[0].segmentNum != entries[m].segmentNum))
 | 
						|
            entriesTrimed.push_back(entries[m]);
 | 
						|
        }
 | 
						|
 | 
						|
        entriesTrimed.swap(entries);
 | 
						|
        iter = entries.begin();
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    boost::mutex::scoped_lock lk(columnMapLock);
 | 
						|
    // cout << "Current size of columnsMap is " << columnsMap.size() << endl;
 | 
						|
    allColumnMap::iterator colMapiter = wholeMap.find(fKey);
 | 
						|
 | 
						|
    if (colMapiter != wholeMap.end())
 | 
						|
    {
 | 
						|
      (colMapiter->second)->insert(make_pair(fOid, aFiles));
 | 
						|
      activeThreadCounter->decr();
 | 
						|
      // cout << "Added to columnsMap aFiles with size " << aFiles.size() << " for oid " << fOid << endl;
 | 
						|
    }
 | 
						|
  }
 | 
						|
  uint32_t fOid;
 | 
						|
  int32_t fCompressionType;
 | 
						|
  bool fReportRealUse;
 | 
						|
  int fKey;
 | 
						|
};
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Get file size from file name in bytestream object
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
int WE_GetFileSizes::processFileName(messageqcpp::ByteStream& bs, std::string& errMsg, int /*key*/)
 | 
						|
{
 | 
						|
  uint8_t rc = 0;
 | 
						|
  off_t fileSize = 0;
 | 
						|
  off_t compressedFileSize = 0;
 | 
						|
  errMsg.clear();
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    std::string fileName;
 | 
						|
 | 
						|
    bs >> fileName;
 | 
						|
    fileSize = IDBPolicy::size(fileName.c_str());
 | 
						|
    compressedFileSize = IDBPolicy::compressedSize(fileName.c_str());
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    errMsg = ex.what();
 | 
						|
    rc = 1;
 | 
						|
  }
 | 
						|
 | 
						|
  bs.reset();
 | 
						|
  bs << fileSize;
 | 
						|
  bs << compressedFileSize;
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
// Process a table size based on input from the
 | 
						|
// bytestream object.
 | 
						|
//------------------------------------------------------------------------------
 | 
						|
int WE_GetFileSizes::processTable(messageqcpp::ByteStream& bs, std::string& errMsg, int key)
 | 
						|
{
 | 
						|
  uint8_t rc = 0;
 | 
						|
  errMsg.clear();
 | 
						|
 | 
						|
  try
 | 
						|
  {
 | 
						|
    std::string aTableName;
 | 
						|
    std::string schemaName;
 | 
						|
    bool reportRealUse = false;
 | 
						|
    ByteStream::byte tmp8;
 | 
						|
    bs >> schemaName;
 | 
						|
    // cout << "schema: "<< schemaName << endl;
 | 
						|
 | 
						|
    bs >> aTableName;
 | 
						|
    // cout << "tableName: " << aTableName << endl;
 | 
						|
    bs >> tmp8;
 | 
						|
    reportRealUse = (tmp8 != 0);
 | 
						|
 | 
						|
    // get column oids
 | 
						|
    boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
 | 
						|
        CalpontSystemCatalog::makeCalpontSystemCatalog(0);
 | 
						|
    CalpontSystemCatalog::TableName tableName;
 | 
						|
    tableName.schema = schemaName;
 | 
						|
    tableName.table = aTableName;
 | 
						|
    CalpontSystemCatalog::RIDList columnList = systemCatalogPtr->columnRIDs(tableName);
 | 
						|
    CalpontSystemCatalog::ColType colType;
 | 
						|
    CalpontSystemCatalog::DictOIDList dictOidList = systemCatalogPtr->dictOIDs(tableName);
 | 
						|
    int serverThreads = 20;
 | 
						|
    int serverQueueSize = serverThreads * 100;
 | 
						|
    threadpool::ThreadPool tp(serverThreads, serverQueueSize);
 | 
						|
    int totalSize = columnList.size() + dictOidList.size();
 | 
						|
    activeThreadCounter = new ActiveThreadCounter(totalSize);
 | 
						|
 | 
						|
    columnMap* columnsMap = new columnMap();
 | 
						|
    {
 | 
						|
      boost::mutex::scoped_lock lk(columnMapLock);
 | 
						|
      wholeMap[key] = columnsMap;
 | 
						|
    }
 | 
						|
 | 
						|
    for (uint32_t i = 0; i < columnList.size(); i++)
 | 
						|
    {
 | 
						|
      colType = systemCatalogPtr->colType(columnList[i].objnum);
 | 
						|
      tp.invoke(ColumnThread(columnList[i].objnum, colType.compressionType, reportRealUse, key));
 | 
						|
 | 
						|
      if (colType.ddn.dictOID > 0)
 | 
						|
        tp.invoke(ColumnThread(colType.ddn.dictOID, colType.compressionType, reportRealUse, key));
 | 
						|
    }
 | 
						|
 | 
						|
    /*	for (uint32_t i=0; i < dictOidList.size(); i++)
 | 
						|
            {
 | 
						|
                    tp.invoke(ColumnThread(dictOidList[i].dictOID));
 | 
						|
            } */
 | 
						|
    // check whether all threads finish
 | 
						|
    int sleepTime = 100;  // sleep 100 milliseconds between checks
 | 
						|
    struct timespec rm_ts;
 | 
						|
 | 
						|
    rm_ts.tv_sec = sleepTime / 1000;
 | 
						|
    rm_ts.tv_nsec = sleepTime % 1000 * 1000000;
 | 
						|
    uint32_t currentActiveThreads = 10;
 | 
						|
 | 
						|
    while (currentActiveThreads > 0)
 | 
						|
    {
 | 
						|
      struct timespec abs_ts;
 | 
						|
 | 
						|
      do
 | 
						|
      {
 | 
						|
        abs_ts.tv_sec = rm_ts.tv_sec;
 | 
						|
        abs_ts.tv_nsec = rm_ts.tv_nsec;
 | 
						|
      } while (nanosleep(&abs_ts, &rm_ts) < 0);
 | 
						|
 | 
						|
      currentActiveThreads = activeThreadCounter->cur();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  catch (std::exception& ex)
 | 
						|
  {
 | 
						|
    // cout << "WE_GetFileSizes got exception-" << ex.what() <<
 | 
						|
    //	std::endl;
 | 
						|
    errMsg = ex.what();
 | 
						|
    rc = 1;
 | 
						|
  }
 | 
						|
 | 
						|
  // Build the message to send to the caller
 | 
						|
  bs.reset();
 | 
						|
  boost::mutex::scoped_lock lk(columnMapLock);
 | 
						|
  allColumnMap::iterator colMapiter = wholeMap.find(key);
 | 
						|
 | 
						|
  if (colMapiter != wholeMap.end())
 | 
						|
  {
 | 
						|
    columnMap::iterator iter = colMapiter->second->begin();
 | 
						|
    uint64_t size;
 | 
						|
    Files::iterator it;
 | 
						|
 | 
						|
    while (iter != colMapiter->second->end())
 | 
						|
    {
 | 
						|
      bs << iter->first;
 | 
						|
      // cout << "processTable::coloid = " << iter->first << endl;
 | 
						|
 | 
						|
      size = iter->second.size();
 | 
						|
      bs << size;
 | 
						|
 | 
						|
      for (it = iter->second.begin(); it != iter->second.end(); it++)
 | 
						|
        it->serialize(bs);
 | 
						|
 | 
						|
      // cout << "length now is " << bs.length() << endl;
 | 
						|
      iter++;
 | 
						|
    }
 | 
						|
 | 
						|
    wholeMap.erase(colMapiter);
 | 
						|
  }
 | 
						|
 | 
						|
  return rc;
 | 
						|
}
 | 
						|
 | 
						|
}  // namespace WriteEngine
 |