You've already forked mariadb-columnstore-engine
							
							
				mirror of
				https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
				synced 2025-10-30 07:25:34 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			2670 lines
		
	
	
		
			90 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			2670 lines
		
	
	
		
			90 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Copyright (C) 2014 InfiniDB, Inc.
 | |
| 
 | |
|    This program is free software; you can redistribute it and/or
 | |
|    modify it under the terms of the GNU General Public License
 | |
|    as published by the Free Software Foundation; version 2 of
 | |
|    the License.
 | |
| 
 | |
|    This program is distributed in the hope that it will be useful,
 | |
|    but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
|    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
|    GNU General Public License for more details.
 | |
| 
 | |
|    You should have received a copy of the GNU General Public License
 | |
|    along with this program; if not, write to the Free Software
 | |
|    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 | |
|    MA 02110-1301, USA. */
 | |
| 
 | |
| //  $Id: we_fileop.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
 | |
| 
 | |
| #include "mcsconfig.h"
 | |
| #include <unistd.h>
 | |
| #include <stdio.h>
 | |
| #include <string.h>
 | |
| #include <errno.h>
 | |
| #include <sstream>
 | |
| #include <iostream>
 | |
| #include <memory>
 | |
| #include <string>
 | |
| #include <stdexcept>
 | |
| #if defined(__FreeBSD__)
 | |
| #include <sys/param.h>
 | |
| #include <sys/mount.h>
 | |
| #else
 | |
| #include <sys/vfs.h>
 | |
| #endif
 | |
| #include <boost/filesystem/operations.hpp>
 | |
| #include <boost/filesystem/path.hpp>
 | |
| #include <boost/scoped_array.hpp>
 | |
| using namespace std;
 | |
| 
 | |
| #include "we_fileop.h"
 | |
| #include "we_convertor.h"
 | |
| #include "we_log.h"
 | |
| #include "we_config.h"
 | |
| #include "we_stats.h"
 | |
| #include "we_simplesyslog.h"
 | |
| 
 | |
| #include "idbcompress.h"
 | |
| using namespace compress;
 | |
| 
 | |
| #include "messagelog.h"
 | |
| using namespace logging;
 | |
| 
 | |
| #include "IDBDataFile.h"
 | |
| #include "IDBFileSystem.h"
 | |
| #include "IDBPolicy.h"
 | |
| using namespace idbdatafile;
 | |
| 
 | |
| namespace WriteEngine
 | |
| {
 | |
| /*static*/ boost::mutex FileOp::m_createDbRootMutexes;
 | |
| /*static*/ boost::mutex FileOp::m_mkdirMutex;
 | |
| /*static*/ std::map<int, boost::mutex> FileOp::m_DbRootAddExtentMutexes;
 | |
| // in 1 call to fwrite(), during initialization
 | |
| 
 | |
| // StopWatch timer;
 | |
| 
 | |
| /**
 | |
|  * Constructor
 | |
|  */
 | |
| FileOp::FileOp(bool doAlloc) : m_compressionType(0), m_transId((TxnID)INVALID_NUM), m_buffer(0)
 | |
| {
 | |
|   if (doAlloc)
 | |
|   {
 | |
|     m_buffer = new char[DEFAULT_BUFSIZ];
 | |
|     memset(m_buffer, '\0', DEFAULT_BUFSIZ);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Default Destructor
 | |
|  */
 | |
| FileOp::~FileOp()
 | |
| {
 | |
|   if (m_buffer)
 | |
|   {
 | |
|     delete[] m_buffer;
 | |
|   }
 | |
| 
 | |
|   m_buffer = 0;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Close a file
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ***********************************************************/
 | |
| void FileOp::closeFile(IDBDataFile* pFile) const
 | |
| {
 | |
|   delete pFile;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Create directory
 | |
|  *    Function uses mutex lock to prevent thread contention trying to create
 | |
|  *    2 subdirectories in the same directory at the same time.
 | |
|  * PARAMETERS:
 | |
|  *    dirName - directory name
 | |
|  *    mode - create mode
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success, otherwise if fail
 | |
|  ***********************************************************/
 | |
| int FileOp::createDir(const char* dirName, mode_t /*mode*/) const
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(m_mkdirMutex);
 | |
|   int rc = IDBPolicy::mkdir(dirName);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     int errRc = errno;
 | |
| 
 | |
|     if (errRc == EEXIST)
 | |
|       return NO_ERROR;  // ignore "File exists" error
 | |
| 
 | |
|     if (getLogger())
 | |
|     {
 | |
|       std::ostringstream oss;
 | |
|       std::string errnoMsg;
 | |
|       Convertor::mapErrnoToString(errRc, errnoMsg);
 | |
|       oss << "Error creating directory " << dirName << "; err-" << errRc << "; " << errnoMsg;
 | |
|       getLogger()->logMsg(oss.str(), ERR_DIR_CREATE, MSGLVL_ERROR);
 | |
|     }
 | |
| 
 | |
|     return ERR_DIR_CREATE;
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Create the "first" segment file for a column with a fixed file size
 | |
|  *    Note: the file is created in binary mode
 | |
|  * PARAMETERS:
 | |
|  *    fileName - file name with complete path
 | |
|  *    numOfBlock - the total number of blocks to be initialized (written out)
 | |
|  *    compressionType - Compression Type
 | |
|  *    emptyVal - empty value used to initialize column values
 | |
|  *    width    - width of column in bytes
 | |
|  *    dbRoot   - DBRoot of column file we are creating
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_EXIST if file exists
 | |
|  *    ERR_FILE_CREATE if can not create the file
 | |
|  ***********************************************************/
 | |
| int FileOp::createFile(const char* fileName, int numOfBlock, const uint8_t* emptyVal, int width,
 | |
|                        execplan::CalpontSystemCatalog::ColDataType colDataType, uint16_t dbRoot,
 | |
|                        BRM::LBID_t startLbid)
 | |
| {
 | |
|   IDBDataFile* pFile = IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "w+b",
 | |
|                                          IDBDataFile::USE_VBUF, width);
 | |
|   int rc = 0;
 | |
| 
 | |
|   if (pFile != NULL)
 | |
|   {
 | |
|     // Initialize the contents of the extent.
 | |
|     if (m_compressionType)
 | |
|     {
 | |
|       rc = initAbbrevCompColumnExtent(pFile, dbRoot, numOfBlock, emptyVal, width, startLbid, colDataType);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       rc = initColumnExtent(pFile, dbRoot, numOfBlock, emptyVal, width, colDataType,
 | |
|                             true,   // new file
 | |
|                             false,  // don't expand; add new extent
 | |
|                             true);  // add abbreviated extent
 | |
|     }
 | |
| 
 | |
|     closeFile(pFile);
 | |
|   }
 | |
|   else
 | |
|     return ERR_FILE_CREATE;
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Create the "first" segment file for a column with a fixed file size
 | |
|  *    Note: the file is created in binary mode
 | |
|  * PARAMETERS:
 | |
|  *    fid      - OID of the column file to be created
 | |
|  *    allocSize (out) - number of blocks allocated to the first extent
 | |
|  *    dbRoot   - DBRoot where file is to be located
 | |
|  *    partition- Starting partition number for segment file path
 | |
|  *    compressionType - Compression type
 | |
|  *    colDataType - the column data type
 | |
|  *    emptyVal - designated "empty" value for this OID
 | |
|  *    width    - width of column in bytes
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_EXIST if file exists
 | |
|  *    ERR_FILE_CREATE if can not create the file
 | |
|  ***********************************************************/
 | |
| int FileOp::createFile(FID fid, int& allocSize, uint16_t dbRoot, uint32_t partition,
 | |
|                        execplan::CalpontSystemCatalog::ColDataType colDataType, const uint8_t* emptyVal,
 | |
|                        int width)
 | |
| {
 | |
|   // std::cout << "Creating file oid: " << fid <<
 | |
|   //    "; compress: " << m_compressionType << std::endl;
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
|   int rc;
 | |
| 
 | |
|   uint16_t segment = 0;  // should always be 0 when starting a new column
 | |
|   RETURN_ON_ERROR((rc = oid2FileName(fid, fileName, true, dbRoot, partition, segment)));
 | |
| 
 | |
|   //@Bug 3196
 | |
|   if (exists(fileName))
 | |
|     return ERR_FILE_EXIST;
 | |
| 
 | |
|   // allocatColExtent() treats dbRoot and partition as in/out
 | |
|   // arguments, so we need to pass in a non-const variable.
 | |
|   uint16_t dbRootx = dbRoot;
 | |
|   uint32_t partitionx = partition;
 | |
| 
 | |
|   // Since we are creating a new column OID, we know partition
 | |
|   // and segment are 0, so we ignore their output values.
 | |
|   // timer.start( "allocateColExtent" );
 | |
| 
 | |
|   BRM::LBID_t startLbid;
 | |
|   uint32_t startBlock;
 | |
|   RETURN_ON_ERROR(BRMWrapper::getInstance()->allocateColExtentExactFile((OID)fid, (uint32_t)width, dbRootx,
 | |
|                                                                         partitionx, segment, colDataType,
 | |
|                                                                         startLbid, allocSize, startBlock));
 | |
| 
 | |
|   // We allocate a full extent from BRM, but only write an abbreviated 256K
 | |
|   // rows to disk for 1st extent, to conserve disk usage for small tables.
 | |
|   // One exception here is if we have rolled off partition 0, and we are
 | |
|   // adding a column to an existing table, then we are adding a column
 | |
|   // whose first partition is not 0.  In this case, we know we are not
 | |
|   // dealing with a small table, so we init a full extent for 1st extent.
 | |
|   int totalSize = 0;
 | |
| 
 | |
|   if (partition == 0)
 | |
|     totalSize = (INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * width;
 | |
|   else
 | |
|     totalSize = allocSize;  // full extent if starting partition > 0
 | |
| 
 | |
|   // Note we can't pass full file name to isDiskSpaceAvail() because the
 | |
|   // file does not exist yet, but passing DBRoot directory should suffice.
 | |
|   if (!isDiskSpaceAvail(Config::getDBRootByNum(dbRoot), totalSize))
 | |
|   {
 | |
|     return ERR_FILE_DISK_SPACE;
 | |
|   }
 | |
| 
 | |
|   // timer.stop( "allocateColExtent" );
 | |
| 
 | |
|   return createFile(fileName, totalSize, emptyVal, width, colDataType, dbRoot, startLbid);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Delete a file
 | |
|  * PARAMETERS:
 | |
|  *    fileName - file name with complete path
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_NOT_EXIST if file does not exist
 | |
|  *    ERR_FILE_DELETE if can not delete a file
 | |
|  ***********************************************************/
 | |
| int FileOp::deleteFile(const char* fileName) const
 | |
| {
 | |
|   if (!exists(fileName))
 | |
|     return ERR_FILE_NOT_EXIST;
 | |
| 
 | |
|   return (IDBPolicy::remove(fileName) == -1) ? ERR_FILE_DELETE : NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Deletes all the segment or dictionary store files associated with the
 | |
|  *    specified fid.
 | |
|  * PARAMETERS:
 | |
|  *    fid - OID of the column being deleted.
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_DM_CONVERT_OID if error occurs converting OID to file name
 | |
|  ***********************************************************/
 | |
| int FileOp::deleteFile(FID fid) const
 | |
| {
 | |
|   char tempFileName[FILE_NAME_SIZE];
 | |
|   char oidDirName[FILE_NAME_SIZE];
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
| 
 | |
|   RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, 0, 0)));
 | |
|   sprintf(oidDirName, "%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3]);
 | |
|   // std::cout << "Deleting files for OID " << fid <<
 | |
|   //             "; dirpath: " << oidDirName << std::endl;
 | |
|   // need check return code.
 | |
|   RETURN_ON_ERROR(BRMWrapper::getInstance()->deleteOid(fid));
 | |
| 
 | |
|   std::vector<std::string> dbRootPathList;
 | |
|   Config::getDBRootPathList(dbRootPathList);
 | |
| 
 | |
|   int rc;
 | |
| 
 | |
|   for (unsigned i = 0; i < dbRootPathList.size(); i++)
 | |
|   {
 | |
|     char rootOidDirName[FILE_NAME_SIZE];
 | |
|     rc = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", dbRootPathList[i].c_str(), oidDirName);
 | |
| 
 | |
|     if (rc == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0)
 | |
|     {
 | |
|       ostringstream oss;
 | |
|       oss << "Unable to remove " << rootOidDirName;
 | |
|       throw std::runtime_error(oss.str());
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Deletes all the segment or dictionary store files associated with the
 | |
|  *    specified fid.
 | |
|  * PARAMETERS:
 | |
|  *    fid - OIDs of the column/dictionary being deleted.
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_DM_CONVERT_OID if error occurs converting OID to file name
 | |
|  ***********************************************************/
 | |
| int FileOp::deleteFiles(const std::vector<int32_t>& fids) const
 | |
| {
 | |
|   char tempFileName[FILE_NAME_SIZE];
 | |
|   char oidDirName[FILE_NAME_SIZE];
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
|   std::vector<std::string> dbRootPathList;
 | |
|   Config::getDBRootPathList(dbRootPathList);
 | |
|   int rc;
 | |
| 
 | |
|   for (unsigned n = 0; n < fids.size(); n++)
 | |
|   {
 | |
|     RETURN_ON_ERROR((Convertor::oid2FileName(fids[n], tempFileName, dbDir, 0, 0)));
 | |
|     sprintf(oidDirName, "%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3]);
 | |
|     // std::cout << "Deleting files for OID " << fid <<
 | |
|     //             "; dirpath: " << oidDirName << std::endl;
 | |
| 
 | |
|     for (unsigned i = 0; i < dbRootPathList.size(); i++)
 | |
|     {
 | |
|       char rootOidDirName[FILE_NAME_SIZE];
 | |
|       rc = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", dbRootPathList[i].c_str(), oidDirName);
 | |
| 
 | |
|       if (rc == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         oss << "Unable to remove " << rootOidDirName;
 | |
|         throw std::runtime_error(oss.str());
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Deletes all the segment or dictionary store files associated with the
 | |
|  *    specified fid and partition.
 | |
|  * PARAMETERS:
 | |
|  *    fids - OIDs of the column/dictionary being deleted.
 | |
|  *    partition - the partition number
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_DM_CONVERT_OID if error occurs converting OID to file name
 | |
|  ***********************************************************/
 | |
| int FileOp::deletePartitions(const std::vector<OID>& /*fids*/,
 | |
|                              const std::vector<BRM::PartitionInfo>& partitions) const
 | |
| {
 | |
|   char tempFileName[FILE_NAME_SIZE];
 | |
|   char oidDirName[FILE_NAME_SIZE];
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
|   char rootOidDirName[FILE_NAME_SIZE];
 | |
|   char partitionDirName[FILE_NAME_SIZE];
 | |
|   int rcd, rcp;
 | |
| 
 | |
|   for (uint32_t i = 0; i < partitions.size(); i++)
 | |
|   {
 | |
|     RETURN_ON_ERROR((Convertor::oid2FileName(partitions[i].oid, tempFileName, dbDir, partitions[i].lp.pp,
 | |
|                                              partitions[i].lp.seg)));
 | |
|     sprintf(oidDirName, "%s/%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3], dbDir[4]);
 | |
|     // config expects dbroot starting from 0
 | |
|     std::string rt(Config::getDBRootByNum(partitions[i].lp.dbroot));
 | |
|     rcd = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", rt.c_str(), tempFileName);
 | |
|     rcp = snprintf(partitionDirName, FILE_NAME_SIZE, "%s/%s", rt.c_str(), oidDirName);
 | |
| 
 | |
|     if (rcd == FILE_NAME_SIZE || rcp == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0)
 | |
|     {
 | |
|       ostringstream oss;
 | |
|       oss << "Unable to remove " << rootOidDirName;
 | |
|       throw std::runtime_error(oss.str());
 | |
|     }
 | |
| 
 | |
|     list<string> dircontents;
 | |
| 
 | |
|     if (IDBPolicy::listDirectory(partitionDirName, dircontents) == 0)
 | |
|     {
 | |
|       // the directory exists, now check if empty
 | |
|       if (dircontents.size() == 0)
 | |
|       {
 | |
|         // empty directory
 | |
|         if (IDBPolicy::remove(partitionDirName) != 0)
 | |
|         {
 | |
|           ostringstream oss;
 | |
|           oss << "Unable to remove " << rootOidDirName;
 | |
|           throw std::runtime_error(oss.str());
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Deletes the specified db segment file.
 | |
|  * PARAMETERS:
 | |
|  *    fid       - column OID of file to be deleted.
 | |
|  *    dbRoot    - DBRoot associated with segment file
 | |
|  *    partition - partition number of associated segment file
 | |
|  *    segment   - segment number of associated segment file
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  ***********************************************************/
 | |
| int FileOp::deleteFile(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment) const
 | |
| {
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
| 
 | |
|   RETURN_ON_ERROR(getFileName(fid, fileName, dbRoot, partition, segment));
 | |
| 
 | |
|   return (deleteFile(fileName));
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Check whether a file exists or not
 | |
|  * PARAMETERS:
 | |
|  *    fileName - file name with complete path
 | |
|  * RETURN:
 | |
|  *    true if exists, false otherwise
 | |
|  ***********************************************************/
 | |
| bool FileOp::exists(const char* fileName) const
 | |
| {
 | |
|   return IDBPolicy::exists(fileName);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Check whether a file exists or not
 | |
|  * PARAMETERS:
 | |
|  *    fid       - OID of file to be checked
 | |
|  *    dbRoot    - DBRoot associated with segment file
 | |
|  *    partition - partition number of associated segment file
 | |
|  *    segment   - segment number of associated segment file
 | |
|  * RETURN:
 | |
|  *    true if exists, false otherwise
 | |
|  ***********************************************************/
 | |
| bool FileOp::exists(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment) const
 | |
| {
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
| 
 | |
|   if (getFileName(fid, fileName, dbRoot, partition, segment) != NO_ERROR)
 | |
|     return false;
 | |
| 
 | |
|   return exists(fileName);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Check whether an OID directory exists or not
 | |
|  * PARAMETERS:
 | |
|  *    fid - column or dictionary store OID
 | |
|  * RETURN:
 | |
|  *    true if exists, false otherwise
 | |
|  ***********************************************************/
 | |
| bool FileOp::existsOIDDir(FID fid) const
 | |
| {
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
| 
 | |
|   if (oid2DirName(fid, fileName) != NO_ERROR)
 | |
|   {
 | |
|     return false;
 | |
|   }
 | |
|   return exists(fileName);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Adds an extent to the specified column OID and DBRoot.
 | |
|  *    Function uses ExtentMap to add the extent and determine which
 | |
|  *    specific column segment file the extent is to be added to. If
 | |
|  *    the applicable column segment file does not exist, it is created.
 | |
|  *    If this is the very first file for the specified DBRoot, then the
 | |
|  *    partition and segment number must be specified, else the selected
 | |
|  *    partition and segment numbers are returned. This method tries to
 | |
|  *    optimize full extents creation skiping disk space
 | |
|  *    preallocation(if activated).
 | |
|  * PARAMETERS:
 | |
|  *    oid       - OID of the column to be extended
 | |
|  *    emptyVal  - Empty value to be used for oid
 | |
|  *    width     - Width of the column (in bytes)
 | |
|  *    hwm       - The HWM (or fbo) of the column segment file where the new
 | |
|  *                extent begins.
 | |
|  *    startLbid - The starting LBID for the new extent
 | |
|  *    allocSize - Number of blocks allocated to the extent.
 | |
|  *    dbRoot    - The DBRoot of the file with the new extent.
 | |
|  *    partition - The partition number of the file with the new extent.
 | |
|  *    segment   - The segment number of the file with the new extent.
 | |
|  *    segFile   - The name of the relevant column segment file.
 | |
|  *    pFile     - IDBDataFile ptr to the file where the extent is added.
 | |
|  *    newFile   - Indicates if the extent was added to a new or existing file
 | |
|  *    hdrs      - Contents of the headers if file is compressed.
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    else the applicable error code is returned
 | |
|  ***********************************************************/
 | |
| int FileOp::extendFile(OID oid, const uint8_t* emptyVal, int width,
 | |
|                        execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm,
 | |
|                        BRM::LBID_t startLbid, int allocSize, uint16_t dbRoot, uint32_t partition,
 | |
|                        uint16_t segment, std::string& segFile, IDBDataFile*& pFile, bool& newFile, char* hdrs)
 | |
| {
 | |
|   int rc = NO_ERROR;
 | |
|   pFile = 0;
 | |
|   segFile.clear();
 | |
|   newFile = false;
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
| 
 | |
|   // If starting hwm or fbo is 0 then this is the first extent of a new file,
 | |
|   // else we are adding an extent to an existing segment file
 | |
|   if (hwm > 0)  // db segment file should exist
 | |
|   {
 | |
|     RETURN_ON_ERROR(oid2FileName(oid, fileName, false, dbRoot, partition, segment));
 | |
|     segFile = fileName;
 | |
| 
 | |
|     if (!exists(fileName))
 | |
|     {
 | |
|       ostringstream oss;
 | |
|       oss << "oid: " << oid << " with path " << segFile;
 | |
|       logging::Message::Args args;
 | |
|       args.add("File not found ");
 | |
|       args.add(oss.str());
 | |
|       args.add("");
 | |
|       args.add("");
 | |
|       SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
 | |
|       return ERR_FILE_NOT_EXIST;
 | |
|     }
 | |
| 
 | |
|     pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b");  // old file
 | |
| 
 | |
|     if (pFile == 0)
 | |
|     {
 | |
|       ostringstream oss;
 | |
|       oss << "oid: " << oid << " with path " << segFile;
 | |
|       logging::Message::Args args;
 | |
|       args.add("Error opening file ");
 | |
|       args.add(oss.str());
 | |
|       args.add("");
 | |
|       args.add("");
 | |
|       SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
 | |
|       return ERR_FILE_OPEN;
 | |
|     }
 | |
| 
 | |
|     if (isDebug(DEBUG_1) && getLogger())
 | |
|     {
 | |
|       std::ostringstream oss;
 | |
|       oss << "Opening existing column file (extendFile)" << ": OID-" << oid << "; DBRoot-" << dbRoot
 | |
|           << "; part-" << partition << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm
 | |
|           << "; file-" << segFile;
 | |
|       getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
 | |
|     }
 | |
| 
 | |
|     // @bug 5349: check that new extent's fbo is not past current EOF
 | |
|     if (m_compressionType)
 | |
|     {
 | |
|       char hdrsIn[compress::CompressInterface::HDR_BUF_LEN * 2];
 | |
|       RETURN_ON_ERROR(readHeaders(pFile, hdrsIn));
 | |
| 
 | |
|       std::unique_ptr<compress::CompressInterface> compressor(
 | |
|           compress::getCompressInterfaceByType(compress::CompressInterface::getCompressionType(hdrsIn)));
 | |
| 
 | |
|       unsigned int ptrCount = compress::CompressInterface::getPtrCount(hdrsIn);
 | |
|       unsigned int chunkIndex = 0;
 | |
|       unsigned int blockOffsetWithinChunk = 0;
 | |
|       compressor->locateBlock((hwm - 1), chunkIndex, blockOffsetWithinChunk);
 | |
| 
 | |
|       // std::ostringstream oss1;
 | |
|       // oss1 << "Extending compressed column file"<<
 | |
|       //   ": OID-"    << oid       <<
 | |
|       //   "; LBID-"   << startLbid <<
 | |
|       //   "; fbo-"    << hwm       <<
 | |
|       //   "; file-"   << segFile   <<
 | |
|       //   "; chkidx-" << chunkIndex<<
 | |
|       //   "; numPtrs-"<< ptrCount;
 | |
|       // getLogger()->logMsg( oss1.str(), MSGLVL_INFO2 );
 | |
| 
 | |
|       if (chunkIndex >= ptrCount)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         oss << "oid: " << oid << " with path " << segFile << "; new extent fbo " << hwm
 | |
|             << "; number of "
 | |
|                "compressed chunks "
 | |
|             << ptrCount << "; chunkIndex " << chunkIndex;
 | |
|         logging::Message::Args args;
 | |
|         args.add("compressed");
 | |
|         args.add(oss.str());
 | |
|         SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0103);
 | |
| 
 | |
|         // Expand the partial extent to full with emptyVal
 | |
|         // Since fillCompColumnExtentEmptyChunks() messes with the
 | |
|         // file on disk, we need to close it and reopen after or
 | |
|         // the cache isn't updated.
 | |
|         if ((pFile))
 | |
|           closeFile(pFile);
 | |
| 
 | |
|         pFile = NULL;
 | |
|         string failedTask;  // For return error message, if any.
 | |
|         rc = FileOp::fillCompColumnExtentEmptyChunks(oid, width, emptyVal, dbRoot, partition, segment,
 | |
|                                                      colDataType, hwm, segFile, failedTask);
 | |
| 
 | |
|         if (rc != NO_ERROR)
 | |
|         {
 | |
|           if (getLogger())
 | |
|           {
 | |
|             std::ostringstream oss;
 | |
|             oss << "FileOp::extendFile: error padding partial compressed extent for " << "column OID-" << oid
 | |
|                 << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; hwm-" << hwm
 | |
|                 << " " << failedTask.c_str();
 | |
|             getLogger()->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
 | |
|           }
 | |
| 
 | |
|           return rc;
 | |
|         }
 | |
| 
 | |
|         pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b");  // modified file
 | |
|       }
 | |
| 
 | |
|       // Get the latest file header for the caller. If a partial extent was filled out,
 | |
|       // this will be different than when we first read the headers.
 | |
|       if (hdrs)
 | |
|       {
 | |
|         RETURN_ON_ERROR(readHeaders(pFile, hdrs));
 | |
|       }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       long long fileSize;
 | |
|       RETURN_ON_ERROR(getFileSize(pFile, fileSize));
 | |
|       long long calculatedFileSize = ((long long)hwm) * BYTE_PER_BLOCK;
 | |
| 
 | |
|       // std::ostringstream oss2;
 | |
|       // oss2 << "Extending uncompressed column file"<<
 | |
|       //   ": OID-"    << oid       <<
 | |
|       //   "; LBID-"   << startLbid <<
 | |
|       //   "; fbo-"    << hwm       <<
 | |
|       //   "; file-"   << segFile   <<
 | |
|       //   "; filesize-"<<fileSize;
 | |
|       // getLogger()->logMsg( oss2.str(), MSGLVL_INFO2 );
 | |
| 
 | |
|       if (calculatedFileSize > fileSize)
 | |
|       {
 | |
|         ostringstream oss;
 | |
|         oss << "oid: " << oid << " with path " << segFile << "; new extent fbo " << hwm
 | |
|             << "; file size (bytes) " << fileSize;
 | |
|         logging::Message::Args args;
 | |
|         args.add("uncompressed");
 | |
|         args.add(oss.str());
 | |
|         SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0103);
 | |
|         // Expand the partial extent to full with emptyVal
 | |
|         // This generally won't ever happen, as uncompressed files
 | |
|         // are created with full extents.
 | |
|         rc = FileOp::expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, width, colDataType);
 | |
| 
 | |
|         if (rc != NO_ERROR)
 | |
|         {
 | |
|           if (getLogger())
 | |
|           {
 | |
|             std::ostringstream oss;
 | |
|             oss << "FileOp::extendFile: error padding partial uncompressed extent for " << "column OID-"
 | |
|                 << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment << "; hwm-"
 | |
|                 << hwm;
 | |
|             getLogger()->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
 | |
|           }
 | |
| 
 | |
|           return rc;
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   else  // db segment file should not exist
 | |
|   {
 | |
|     RETURN_ON_ERROR(oid2FileName(oid, fileName, true, dbRoot, partition, segment));
 | |
|     segFile = fileName;
 | |
| 
 | |
|     // if obsolete file exists, "w+b" will truncate and write over
 | |
|     pFile = openFile(fileName, "w+b");  // new file
 | |
|     if (pFile == 0)
 | |
|       return ERR_FILE_CREATE;
 | |
| 
 | |
|     {
 | |
|       // We presume the path will contain /
 | |
|       std::string filePath(fileName);
 | |
|       if (chownDataPath(filePath))
 | |
|         return ERR_FILE_CHOWN;
 | |
|     }
 | |
| 
 | |
|     newFile = true;
 | |
| 
 | |
|     if (isDebug(DEBUG_1) && getLogger())
 | |
|     {
 | |
|       std::ostringstream oss;
 | |
|       oss << "Opening new column file" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition
 | |
|           << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile;
 | |
|       getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
 | |
|     }
 | |
| 
 | |
|     if ((m_compressionType) && (hdrs))
 | |
|     {
 | |
|       compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
 | |
|       compress::CompressInterface::setLBIDByIndex(hdrs, startLbid, 0);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (!isDiskSpaceAvail(segFile, allocSize))
 | |
|   {
 | |
|     return ERR_FILE_DISK_SPACE;
 | |
|   }
 | |
| 
 | |
|   // We set to EOF just before we start adding the blocks for the new extent.
 | |
|   // At one time, I considered changing this to seek to the HWM block, but
 | |
|   // with compressed files, this is murky; do I find and seek to the chunk
 | |
|   // containing the HWM block?  So I left as-is for now, seeking to EOF.
 | |
|   rc = setFileOffset(pFile, 0, SEEK_END);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|     return rc;
 | |
| 
 | |
|   // Initialize the contents of the extent.
 | |
|   // MCOL-498 optimize full extent creation.
 | |
|   rc = initColumnExtent(pFile, dbRoot, allocSize, emptyVal, width, colDataType,
 | |
|                         newFile,  // new or existing file
 | |
|                         false,    // don't expand; new extent
 | |
|                         false,    // add full (not abbreviated) extent
 | |
|                         true,     // try to optimize extent creation
 | |
|                         startLbid);
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Add an extent to the exact segment file specified by
 | |
|  *    the designated OID, DBRoot, partition, and segment.
 | |
|  * PARAMETERS:
 | |
|  *    oid       - OID of the column to be extended
 | |
|  *    emptyVal  - Empty value to be used for oid
 | |
|  *    width     - Width of the column (in bytes)
 | |
|  *    allocSize - Number of blocks allocated to the extent.
 | |
|  *    dbRoot    - The DBRoot of the file with the new extent.
 | |
|  *    partition - The partition number of the file with the new extent.
 | |
|  *    segment   - The segment number of the file with the new extent.
 | |
|  *    segFile   - The name of the relevant column segment file.
 | |
|  *    startLbid - The starting LBID for the new extent
 | |
|  *    newFile   - Indicates if the extent was added to a new or existing file
 | |
|  *    hdrs      - Contents of the headers if file is compressed.
 | |
|  * RETURN:
 | |
|  *    none
 | |
|  ***********************************************************/
 | |
| int FileOp::addExtentExactFile(OID oid, const uint8_t* emptyVal, int width, int& allocSize, uint16_t dbRoot,
 | |
|                                uint32_t partition, uint16_t segment,
 | |
|                                execplan::CalpontSystemCatalog::ColDataType colDataType, std::string& segFile,
 | |
|                                BRM::LBID_t& startLbid, bool& newFile, char* hdrs)
 | |
| {
 | |
|   int rc = NO_ERROR;
 | |
|   IDBDataFile* pFile = 0;
 | |
|   segFile.clear();
 | |
|   newFile = false;
 | |
|   HWM hwm;
 | |
| 
 | |
|   // Allocate the new extent in the ExtentMap
 | |
|   RETURN_ON_ERROR(BRMWrapper::getInstance()->allocateColExtentExactFile(
 | |
|       oid, width, dbRoot, partition, segment, colDataType, startLbid, allocSize, hwm));
 | |
| 
 | |
|   // Determine the existence of the "next" segment file, and either open
 | |
|   // or create the segment file accordingly.
 | |
|   if (exists(oid, dbRoot, partition, segment))
 | |
|   {
 | |
|     pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b");  // old file
 | |
| 
 | |
|     if (pFile == 0)
 | |
|     {
 | |
|       ostringstream oss;
 | |
|       oss << "oid: " << oid << " with path " << segFile;
 | |
|       logging::Message::Args args;
 | |
|       args.add("Error opening file ");
 | |
|       args.add(oss.str());
 | |
|       args.add("");
 | |
|       args.add("");
 | |
|       SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
 | |
|       return ERR_FILE_OPEN;
 | |
|     }
 | |
| 
 | |
|     if (isDebug(DEBUG_1) && getLogger())
 | |
|     {
 | |
|       std::ostringstream oss;
 | |
|       oss << "Opening existing column file" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-"
 | |
|           << partition << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-"
 | |
|           << segFile;
 | |
|       getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
 | |
|     }
 | |
| 
 | |
|     if ((m_compressionType) && (hdrs))
 | |
|     {
 | |
|       rc = readHeaders(pFile, hdrs);
 | |
| 
 | |
|       if (rc != NO_ERROR)
 | |
|         return rc;
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     char fileName[FILE_NAME_SIZE];
 | |
|     RETURN_ON_ERROR(oid2FileName(oid, fileName, true, dbRoot, partition, segment));
 | |
|     segFile = fileName;
 | |
| 
 | |
|     pFile = openFile(fileName, "w+b");  // new file
 | |
| 
 | |
|     if (pFile == 0)
 | |
|       return ERR_FILE_CREATE;
 | |
| 
 | |
|     newFile = true;
 | |
| 
 | |
|     if (isDebug(DEBUG_1) && getLogger())
 | |
|     {
 | |
|       std::ostringstream oss;
 | |
|       oss << "Opening new column file" << ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition
 | |
|           << "; seg-" << segment << "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile;
 | |
|       getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
 | |
|     }
 | |
| 
 | |
|     if ((m_compressionType) && (hdrs))
 | |
|     {
 | |
|       compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
 | |
|       compress::CompressInterface::setLBIDByIndex(hdrs, startLbid, 0);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (!isDiskSpaceAvail(segFile, allocSize))
 | |
|   {
 | |
|     return ERR_FILE_DISK_SPACE;
 | |
|   }
 | |
| 
 | |
|   // We set to EOF just before we start adding the blocks for the new extent.
 | |
|   // At one time, I considered changing this to seek to the HWM block, but
 | |
|   // with compressed files, this is murky; do I find and seek to the chunk
 | |
|   // containing the HWM block?  So I left as-is for now, seeking to EOF.
 | |
|   rc = setFileOffset(pFile, 0, SEEK_END);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|     return rc;
 | |
| 
 | |
|   // Initialize the contents of the extent.
 | |
|   // CS doesn't optimize file operations to have a valid
 | |
|   // segment files with empty magics
 | |
|   rc = initColumnExtent(pFile, dbRoot, allocSize, emptyVal, width, colDataType,
 | |
|                         newFile,  // new or existing file
 | |
|                         false,    // don't expand; new extent
 | |
|                         false,    // add full (not abbreviated) extent
 | |
|                         startLbid);
 | |
| 
 | |
|   closeFile(pFile);
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write out (initialize) an extent in a column file.
 | |
|  *    A mutex is used for each DBRoot, to prevent contention between
 | |
|  *    threads, because if multiple threads are creating extents on
 | |
|  *    the same DBRoot at the same time, the extents can become
 | |
|  *    fragmented.  It is best to only create one extent at a time
 | |
|  *    on each DBRoot.
 | |
|  *    This function can be used to initialize an entirely new extent, or
 | |
|  *    to finish initializing an extent that has already been started.
 | |
|  *    nBlocks controls how many 8192-byte blocks are to be written out.
 | |
|  *    If bOptExtension is set then method first checks config for
 | |
|  *    DBRootX.Prealloc. If it is disabled then it skips disk space
 | |
|  *    preallocation.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    dbRoot  (in) - DBRoot of pFile
 | |
|  *    nBlocks (in) - number of blocks to be written for an extent
 | |
|  *    emptyVal(in) - empty value to be used for column data values
 | |
|  *    width   (in) - width of the applicable column
 | |
|  *    bNewFile(in) - are we adding an extent to a new file, in which case
 | |
|  *                   headers will be included "if" it is a compressed file.
 | |
|  *    bExpandExtent (in) - Expand existing extent, or initialize a new one
 | |
|  *    bAbbrevExtent(in) - if creating new extent, is it an abbreviated extent
 | |
|  *    bOptExtension(in) - skip full extent preallocation.
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::initColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, const uint8_t* emptyVal,
 | |
|                              int width, execplan::CalpontSystemCatalog::ColDataType colDataType,
 | |
|                              bool bNewFile, bool bExpandExtent, bool bAbbrevExtent, bool bOptExtension,
 | |
|                              int64_t lbid)
 | |
| {
 | |
|   if ((bNewFile) && (m_compressionType))
 | |
|   {
 | |
|     char hdrs[CompressInterface::HDR_BUF_LEN * 2];
 | |
|     compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
 | |
|     compress::CompressInterface::setLBIDByIndex(hdrs, lbid, 0);
 | |
|     if (bAbbrevExtent)
 | |
|       compress::CompressInterface::setBlockCount(hdrs, nBlocks);
 | |
| 
 | |
|     RETURN_ON_ERROR(writeHeaders(pFile, hdrs));
 | |
|   }
 | |
| 
 | |
|   // @bug5769 Don't initialize extents or truncate db files on HDFS
 | |
|   if (idbdatafile::IDBPolicy::useHdfs())
 | |
|   {
 | |
|     //@Bug 3219. update the compression header after the extent is expanded.
 | |
|     if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
 | |
|     {
 | |
|       updateColumnExtent(pFile, nBlocks, lbid);
 | |
|     }
 | |
| 
 | |
|     // @bug 2378. Synchronize here to avoid write buffer pile up too much,
 | |
|     // which could cause controllernode to timeout later when it needs to
 | |
|     // save a snapshot.
 | |
|     pFile->flush();
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     // Create vector of mutexes used to serialize extent access per DBRoot
 | |
|     initDbRootExtentMutexes();
 | |
| 
 | |
|     // MCOL-498 Skip the huge preallocations if the option is set
 | |
|     // for the dbroot. This check is skiped for abbreviated extent.
 | |
|     // IMO it is better to check bool then to call a function.
 | |
|     if (bOptExtension)
 | |
|     {
 | |
|       bOptExtension = (idbdatafile::IDBPolicy::PreallocSpaceDisabled(dbRoot)) ? bOptExtension : false;
 | |
|     }
 | |
|     // Reduce number of blocks allocated for abbreviated extents thus
 | |
|     // CS writes less when creates a new table. This couldn't be zero
 | |
|     // b/c Snappy compressed file format doesn't tolerate empty files.
 | |
|     int realNBlocks = (bOptExtension && nBlocks <= MAX_INITIAL_EXTENT_BLOCKS_TO_DISK) ? 3 : nBlocks;
 | |
| 
 | |
|     // Determine the number of blocks in each call to fwrite(), and the
 | |
|     // number of fwrite() calls to make, based on this.  In other words,
 | |
|     // we put a cap on the "writeSize" so that we don't allocate and write
 | |
|     // an entire extent at once for the 64M row extents.  If we are
 | |
|     // expanding an abbreviated 64M extent, we may not have an even
 | |
|     // multiple of MAX_NBLOCKS to write; remWriteSize is the number of
 | |
|     // blocks above and beyond loopCount*MAX_NBLOCKS.
 | |
|     int writeSize = realNBlocks * BYTE_PER_BLOCK;  // 1M and 8M row extent size
 | |
|     int loopCount = 1;
 | |
|     int remWriteSize = 0;
 | |
| 
 | |
|     if (realNBlocks > MAX_NBLOCKS)  // 64M row extent size
 | |
|     {
 | |
|       writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
 | |
|       loopCount = realNBlocks / MAX_NBLOCKS;
 | |
|       remWriteSize = realNBlocks - (loopCount * MAX_NBLOCKS);
 | |
|     }
 | |
| 
 | |
|     // Allocate a buffer, initialize it, and use it to create the extent
 | |
|     idbassert(dbRoot > 0);
 | |
| #ifdef PROFILE
 | |
| 
 | |
|     if (bExpandExtent)
 | |
|       Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT);
 | |
|     else
 | |
|       Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT);
 | |
| 
 | |
| #endif
 | |
|     boost::mutex::scoped_lock lk(m_DbRootAddExtentMutexes[dbRoot]);
 | |
| #ifdef PROFILE
 | |
| 
 | |
|     if (bExpandExtent)
 | |
|       Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT);
 | |
|     else
 | |
|       Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT);
 | |
| #endif
 | |
|     // Skip space preallocation if configured so
 | |
|     // fallback to sequential write otherwise.
 | |
|     // Couldn't avoid preallocation for full extents,
 | |
|     // e.g. ADD COLUMN DDL b/c CS has to fill the file
 | |
|     // with empty magics.
 | |
|     if (!bOptExtension || !m_compressionType)
 | |
|     {
 | |
| #ifdef PROFILE
 | |
|       Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT);
 | |
| #endif
 | |
|       // Allocate buffer, store it in scoped_array to insure it's deletion.
 | |
|       // Create scope {...} to manage deletion of writeBuf.
 | |
|       {
 | |
|         unsigned char* writeBuf = new unsigned char[writeSize];
 | |
|         boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
 | |
| 
 | |
|         setEmptyBuf(writeBuf, writeSize, emptyVal, width);
 | |
| 
 | |
| #ifdef PROFILE
 | |
|         Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT);
 | |
| 
 | |
|         if (bExpandExtent)
 | |
|           Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT);
 | |
|         else
 | |
|           Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT);
 | |
| 
 | |
| #endif
 | |
| 
 | |
|         // std::ostringstream oss;
 | |
|         // oss << "initColExtent: width-" << width <<
 | |
|         //"; loopCount-" << loopCount <<
 | |
|         //"; writeSize-" << writeSize;
 | |
|         // std::cout << oss.str() << std::endl;
 | |
|         if (remWriteSize > 0)
 | |
|         {
 | |
|           if (pFile->write(writeBuf, remWriteSize) != remWriteSize)
 | |
|           {
 | |
|             return ERR_FILE_WRITE;
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         for (int j = 0; j < loopCount; j++)
 | |
|         {
 | |
|           if (pFile->write(writeBuf, writeSize) != writeSize)
 | |
|           {
 | |
|             return ERR_FILE_WRITE;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       //@Bug 3219. update the compression header after the extent is expanded.
 | |
|       if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
 | |
|       {
 | |
|         updateColumnExtent(pFile, nBlocks, lbid);
 | |
|       }
 | |
| 
 | |
|       // @bug 2378. Synchronize here to avoid write buffer pile up too much,
 | |
|       // which could cause controllernode to timeout later when it needs to
 | |
|       // save a snapshot.
 | |
|       pFile->flush();
 | |
| 
 | |
| #ifdef PROFILE
 | |
|       if (bExpandExtent)
 | |
|         Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT);
 | |
|       else
 | |
|         Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT);
 | |
| #endif
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write (initialize) an abbreviated compressed extent in a column file.
 | |
|  *    nBlocks controls how many 8192-byte blocks are to be written out.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    dbRoot  (in) - DBRoot of pFile
 | |
|  *    nBlocks (in) - number of blocks to be written for an extent
 | |
|  *    emptyVal(in) - empty value to be used for column data values
 | |
|  *    width   (in) - width of the applicable column
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::initAbbrevCompColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks,
 | |
|                                        const uint8_t* emptyVal, int width, BRM::LBID_t startLBID,
 | |
|                                        execplan::CalpontSystemCatalog::ColDataType colDataType)
 | |
| {
 | |
|   // Reserve disk space for optimized abbreviated extent
 | |
|   int rc = initColumnExtent(pFile, dbRoot, nBlocks, emptyVal, width, colDataType,
 | |
|                             true,   // new file
 | |
|                             false,  // don't expand; add new extent
 | |
|                             true,   // add abbreviated extent
 | |
|                             true,   // optimize the initial extent
 | |
|                             startLBID);
 | |
|   if (rc != NO_ERROR)
 | |
|   {
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|   Stats::startParseEvent(WE_STATS_COMPRESS_COL_INIT_ABBREV_EXT);
 | |
| #endif
 | |
| 
 | |
|   char hdrs[CompressInterface::HDR_BUF_LEN * 2];
 | |
|   rc = writeInitialCompColumnChunk(pFile, nBlocks, INITIAL_EXTENT_ROWS_TO_DISK, emptyVal, width, startLBID,
 | |
|                                    colDataType, hdrs);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|   {
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|   Stats::stopParseEvent(WE_STATS_COMPRESS_COL_INIT_ABBREV_EXT);
 | |
| #endif
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write (initialize) the first extent in a compressed db file.
 | |
|  * PARAMETERS:
 | |
|  *    pFile    - IDBDataFile* of column segment file to be written to
 | |
|  *    nBlocksAllocated - number of blocks allocated to the extent; should be
 | |
|  *        enough blocks for a full extent, unless it's the abbreviated extent
 | |
|  *    nRows    - number of rows to initialize, or write out to the file
 | |
|  *    emptyVal - empty value to be used for column data values
 | |
|  *    width    - width of the applicable column (in bytes)
 | |
|  *    hdrs     - (in/out) chunk pointer headers
 | |
|  * RETURN:
 | |
|  *    returns NO_ERROR on success.
 | |
|  ***********************************************************/
 | |
| int FileOp::writeInitialCompColumnChunk(IDBDataFile* pFile, int nBlocksAllocated, int nRows,
 | |
|                                         const uint8_t* emptyVal, int width, BRM::LBID_t startLBID,
 | |
|                                         execplan::CalpontSystemCatalog::ColDataType colDataType, char* hdrs)
 | |
| {
 | |
|   const size_t INPUT_BUFFER_SIZE = nRows * width;
 | |
|   char* toBeCompressedInput = new char[INPUT_BUFFER_SIZE];
 | |
|   unsigned int userPaddingBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
 | |
|   // Compress an initialized abbreviated extent
 | |
|   // Initially m_compressionType == 0, but this function is used under
 | |
|   // condtion where m_compressionType > 0.
 | |
|   std::unique_ptr<CompressInterface> compressor(
 | |
|       compress::getCompressInterfaceByType(m_compressionType, userPaddingBytes));
 | |
|   const size_t OUTPUT_BUFFER_SIZE = compressor->maxCompressedSize(INPUT_BUFFER_SIZE) + userPaddingBytes +
 | |
|                                     compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE;
 | |
| 
 | |
|   unsigned char* compressedOutput = new unsigned char[OUTPUT_BUFFER_SIZE];
 | |
|   size_t outputLen = OUTPUT_BUFFER_SIZE;
 | |
|   boost::scoped_array<char> toBeCompressedInputPtr(toBeCompressedInput);
 | |
|   boost::scoped_array<unsigned char> compressedOutputPtr(compressedOutput);
 | |
| 
 | |
|   setEmptyBuf((unsigned char*)toBeCompressedInput, INPUT_BUFFER_SIZE, emptyVal, width);
 | |
| 
 | |
|   int rc = compressor->compressBlock(toBeCompressedInput, INPUT_BUFFER_SIZE, compressedOutput, outputLen);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     return ERR_COMP_COMPRESS;
 | |
|   }
 | |
| 
 | |
|   // Round up the compressed chunk size
 | |
|   rc = compressor->padCompressedChunks(compressedOutput, outputLen, OUTPUT_BUFFER_SIZE);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     return ERR_COMP_PAD_DATA;
 | |
|   }
 | |
| 
 | |
|   //  std::cout << "Uncompressed rowCount: " << nRows <<
 | |
|   //      "; colWidth: "      << width   <<
 | |
|   //      "; uncompByteCnt: " << INPUT_BUFFER_SIZE <<
 | |
|   //      "; blkAllocCnt: "   << nBlocksAllocated  <<
 | |
|   //      "; compressedByteCnt: "  << outputLen << std::endl;
 | |
| 
 | |
|   compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
 | |
|   compress::CompressInterface::setBlockCount(hdrs, nBlocksAllocated);
 | |
|   compress::CompressInterface::setLBIDByIndex(hdrs, startLBID, 0);
 | |
| 
 | |
|   // Store compression pointers in the header
 | |
|   std::vector<uint64_t> ptrs;
 | |
|   ptrs.push_back(CompressInterface::HDR_BUF_LEN * 2);
 | |
|   ptrs.push_back(outputLen + (CompressInterface::HDR_BUF_LEN * 2));
 | |
|   compress::CompressInterface::storePtrs(ptrs, hdrs);
 | |
| 
 | |
|   RETURN_ON_ERROR(writeHeaders(pFile, hdrs));
 | |
| 
 | |
|   // Write the compressed data
 | |
|   size_t writtenLen = pFile->write(compressedOutput, outputLen);
 | |
|   if (writtenLen != outputLen)
 | |
|     return ERR_FILE_WRITE;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Fill specified compressed extent with empty value chunks.
 | |
|  * PARAMETERS:
 | |
|  *    oid         - OID for relevant column
 | |
|  *    colWidth    - width in bytes of this column
 | |
|  *    emptyVal    - empty value to be used in filling empty chunks
 | |
|  *    dbRoot      - DBRoot of extent to be filled
 | |
|  *    partition   - partition of extent to be filled
 | |
|  *    segment     - segment file number of extent to be filled
 | |
|  *    colDataType - Column data type
 | |
|  *    hwm         - proposed new HWM of filled in extent
 | |
|  *    segFile     - (out) name of updated segment file
 | |
|  *    failedTask  - (out) if error occurs, this is the task that failed
 | |
|  * RETURN:
 | |
|  *    returns NO_ERROR if success.
 | |
|  ***********************************************************/
 | |
| int FileOp::fillCompColumnExtentEmptyChunks(OID oid, int colWidth, const uint8_t* emptyVal, uint16_t dbRoot,
 | |
|                                             uint32_t partition, uint16_t segment,
 | |
|                                             execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm,
 | |
|                                             std::string& segFile, std::string& failedTask)
 | |
| {
 | |
|   int rc = NO_ERROR;
 | |
|   segFile.clear();
 | |
|   failedTask.clear();
 | |
| 
 | |
|   // Open the file and read the headers with the compression chunk pointers
 | |
|   // @bug 5572 - HDFS usage: incorporate *.tmp file backup flag
 | |
|   IDBDataFile* pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b", DEFAULT_COLSIZ, true);
 | |
| 
 | |
|   if (!pFile)
 | |
|   {
 | |
|     failedTask = "Opening file";
 | |
|     ostringstream oss;
 | |
|     oss << "oid: " << oid << " with path " << segFile;
 | |
|     logging::Message::Args args;
 | |
|     args.add("Error opening file ");
 | |
|     args.add(oss.str());
 | |
|     args.add("");
 | |
|     args.add("");
 | |
|     SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
 | |
|     return ERR_FILE_OPEN;
 | |
|   }
 | |
| 
 | |
|   char hdrs[CompressInterface::HDR_BUF_LEN * 2];
 | |
|   rc = readHeaders(pFile, hdrs);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|   {
 | |
|     failedTask = "Reading headers";
 | |
|     closeFile(pFile);
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
|   int userPadBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
 | |
| 
 | |
|   std::unique_ptr<CompressInterface> compressor(compress::getCompressInterfaceByType(
 | |
|       compress::CompressInterface::getCompressionType(hdrs), userPadBytes));
 | |
| 
 | |
|   CompChunkPtrList chunkPtrs;
 | |
|   int rcComp = compress::CompressInterface::getPtrList(hdrs, chunkPtrs);
 | |
| 
 | |
|   if (rcComp != 0)
 | |
|   {
 | |
|     failedTask = "Getting header ptrs";
 | |
|     closeFile(pFile);
 | |
|     return ERR_COMP_PARSE_HDRS;
 | |
|   }
 | |
| 
 | |
|   // Nothing to do if the proposed HWM is < the current block count
 | |
|   uint64_t blkCount = compress::CompressInterface::getBlockCount(hdrs);
 | |
| 
 | |
|   if (blkCount > (hwm + 1))
 | |
|   {
 | |
|     closeFile(pFile);
 | |
|     return NO_ERROR;
 | |
|   }
 | |
| 
 | |
|   const unsigned int ROWS_PER_EXTENT = BRMWrapper::getInstance()->getInstance()->getExtentRows();
 | |
|   const unsigned int ROWS_PER_CHUNK = CompressInterface::UNCOMPRESSED_INBUF_LEN / colWidth;
 | |
|   const unsigned int CHUNKS_PER_EXTENT = ROWS_PER_EXTENT / ROWS_PER_CHUNK;
 | |
| 
 | |
|   // If this is an abbreviated extent, we first expand to a full extent
 | |
|   // @bug 4340 - support moving the DBRoot with a single abbrev extent
 | |
|   if ((chunkPtrs.size() == 1) &&
 | |
|       ((blkCount * BYTE_PER_BLOCK) == (uint64_t)(INITIAL_EXTENT_ROWS_TO_DISK * colWidth)))
 | |
|   {
 | |
|     if (getLogger())
 | |
|     {
 | |
|       std::ostringstream oss;
 | |
|       oss << "Converting abbreviated partial extent to full extent for" << ": OID-" << oid << "; DBRoot-"
 | |
|           << dbRoot << "; part-" << partition << "; seg-" << segment << "; file-" << segFile << "; wid-"
 | |
|           << colWidth << "; oldBlkCnt-" << blkCount << "; newBlkCnt-"
 | |
|           << ((ROWS_PER_EXTENT * colWidth) / BYTE_PER_BLOCK);
 | |
|       getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
 | |
|     }
 | |
| 
 | |
|     off64_t endHdrsOffset = pFile->tell();
 | |
|     rc = expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, colWidth, colDataType);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       failedTask = "Expanding abbreviated extent";
 | |
|       closeFile(pFile);
 | |
|       return rc;
 | |
|     }
 | |
| 
 | |
|     CompChunkPtr chunkOutPtr;
 | |
|     rc = expandAbbrevColumnChunk(pFile, emptyVal, colWidth, chunkPtrs[0], chunkOutPtr, hdrs);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       failedTask = "Expanding abbreviated chunk";
 | |
|       closeFile(pFile);
 | |
|       return rc;
 | |
|     }
 | |
| 
 | |
|     chunkPtrs[0] = chunkOutPtr;  // update chunkPtrs with new chunk size
 | |
| 
 | |
|     rc = setFileOffset(pFile, endHdrsOffset);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       failedTask = "Positioning file to end of headers";
 | |
|       closeFile(pFile);
 | |
|       return rc;
 | |
|     }
 | |
| 
 | |
|     // Update block count to reflect a full extent
 | |
|     blkCount = (ROWS_PER_EXTENT * colWidth) / BYTE_PER_BLOCK;
 | |
|     compress::CompressInterface::setBlockCount(hdrs, blkCount);
 | |
|   }
 | |
| 
 | |
|   // Calculate the number of empty chunks we need to add to fill this extent
 | |
|   unsigned numChunksToFill = 0;
 | |
|   ldiv_t ldivResult = ldiv(chunkPtrs.size(), CHUNKS_PER_EXTENT);
 | |
| 
 | |
|   if (ldivResult.rem != 0)
 | |
|   {
 | |
|     numChunksToFill = CHUNKS_PER_EXTENT - ldivResult.rem;
 | |
|   }
 | |
| 
 | |
| #if 0
 | |
|     std::cout << "Number of allocated blocks:     " <<
 | |
|               compressor.getBlockCount(hdrs) << std::endl;
 | |
|     std::cout << "Pointer Header Size (in bytes): " <<
 | |
|               (compressor.getHdrSize(hdrs) -
 | |
|                CompressInterface::HDR_BUF_LEN) << std::endl;
 | |
|     std::cout << "Chunk Pointers (offset,length): " << std::endl;
 | |
| 
 | |
|     for (unsigned k = 0; k < chunkPtrs.size(); k++)
 | |
|     {
 | |
|         std::cout << "  " << k << ". " << chunkPtrs[k].first <<
 | |
|                   " , " << chunkPtrs[k].second << std::endl;
 | |
|     }
 | |
| 
 | |
|     std::cout << std::endl;
 | |
|     std::cout << "Number of chunks to fill in: " << numChunksToFill <<
 | |
|               std::endl << std::endl;
 | |
| #endif
 | |
| 
 | |
|   off64_t endOffset = 0;
 | |
| 
 | |
|   // Fill in or add necessary remaining empty chunks
 | |
|   if (numChunksToFill > 0)
 | |
|   {
 | |
|     const int IN_BUF_LEN = CompressInterface::UNCOMPRESSED_INBUF_LEN;
 | |
|     const int OUT_BUF_LEN = compressor->maxCompressedSize(IN_BUF_LEN) + userPadBytes +
 | |
|                             compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE;
 | |
| 
 | |
|     // Allocate buffer, and store in scoped_array to insure it's deletion.
 | |
|     // Create scope {...} to manage deletion of buffers
 | |
|     {
 | |
|       char* toBeCompressedBuf = new char[IN_BUF_LEN];
 | |
|       unsigned char* compressedBuf = new unsigned char[OUT_BUF_LEN];
 | |
|       boost::scoped_array<char> toBeCompressedInputPtr(toBeCompressedBuf);
 | |
|       boost::scoped_array<unsigned char> compressedOutputPtr(compressedBuf);
 | |
| 
 | |
|       // Compress and then pad the compressed chunk
 | |
|       setEmptyBuf((unsigned char*)toBeCompressedBuf, IN_BUF_LEN, emptyVal, colWidth);
 | |
|       size_t outputLen = OUT_BUF_LEN;
 | |
|       rcComp = compressor->compressBlock(toBeCompressedBuf, IN_BUF_LEN, compressedBuf, outputLen);
 | |
| 
 | |
|       if (rcComp != 0)
 | |
|       {
 | |
|         failedTask = "Compressing chunk";
 | |
|         closeFile(pFile);
 | |
|         return ERR_COMP_COMPRESS;
 | |
|       }
 | |
| 
 | |
|       toBeCompressedInputPtr.reset();  // release memory
 | |
| 
 | |
|       rcComp = compressor->padCompressedChunks(compressedBuf, outputLen, OUT_BUF_LEN);
 | |
| 
 | |
|       if (rcComp != 0)
 | |
|       {
 | |
|         failedTask = "Padding compressed chunk";
 | |
|         closeFile(pFile);
 | |
|         return ERR_COMP_PAD_DATA;
 | |
|       }
 | |
| 
 | |
|       // Position file to write empty chunks; default to end of headers
 | |
|       // in case there are no chunks listed in the header
 | |
|       off64_t startOffset = pFile->tell();
 | |
| 
 | |
|       if (chunkPtrs.size() > 0)
 | |
|       {
 | |
|         startOffset = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second;
 | |
|         rc = setFileOffset(pFile, startOffset);
 | |
| 
 | |
|         if (rc != NO_ERROR)
 | |
|         {
 | |
|           failedTask = "Positioning file to begin filling chunks";
 | |
|           closeFile(pFile);
 | |
|           return rc;
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       // Write chunks needed to fill out the current extent, add chunk ptr
 | |
|       for (unsigned k = 0; k < numChunksToFill; k++)
 | |
|       {
 | |
|         rc = writeFile(pFile, (unsigned char*)compressedBuf, outputLen);
 | |
| 
 | |
|         if (rc != NO_ERROR)
 | |
|         {
 | |
|           failedTask = "Writing  a chunk";
 | |
|           closeFile(pFile);
 | |
|           return rc;
 | |
|         }
 | |
| 
 | |
|         CompChunkPtr compChunk(startOffset, outputLen);
 | |
|         chunkPtrs.push_back(compChunk);
 | |
|         startOffset = pFile->tell();
 | |
|       }
 | |
|     }  // end of scope for boost scoped array pointers
 | |
| 
 | |
|     endOffset = pFile->tell();
 | |
| 
 | |
|     // Update the compressed chunk pointers in the header
 | |
|     std::vector<uint64_t> ptrs;
 | |
| 
 | |
|     for (unsigned i = 0; i < chunkPtrs.size(); i++)
 | |
|     {
 | |
|       ptrs.push_back(chunkPtrs[i].first);
 | |
|     }
 | |
| 
 | |
|     ptrs.push_back(chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second);
 | |
|     compress::CompressInterface::storePtrs(ptrs, hdrs);
 | |
| 
 | |
|     rc = writeHeaders(pFile, hdrs);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       failedTask = "Writing headers";
 | |
|       closeFile(pFile);
 | |
|       return rc;
 | |
|     }
 | |
|   }  // end of "numChunksToFill > 0"
 | |
|   else
 | |
|   {
 | |
|     // if no chunks to add, then set endOffset to truncate the db file
 | |
|     // strictly based on the chunks that are already in the file
 | |
|     if (chunkPtrs.size() > 0)
 | |
|     {
 | |
|       endOffset = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Truncate the file to release unused space for the extent we just filled
 | |
|   if (endOffset > 0)
 | |
|   {
 | |
|     rc = truncateFile(pFile, endOffset);
 | |
| 
 | |
|     if (rc != NO_ERROR)
 | |
|     {
 | |
|       failedTask = "Truncating file";
 | |
|       closeFile(pFile);
 | |
|       return rc;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   closeFile(pFile);
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Expand first chunk in pFile from an abbreviated chunk for an abbreviated
 | |
|  *    extent to a full compressed chunk for a full extent.
 | |
|  * PARAMETERS:
 | |
|  *    pFile      - file to be updated
 | |
|  *    colWidth   - width in bytes of this column
 | |
|  *    emptyVal   - empty value to be used in filling empty chunks
 | |
|  *    chunkInPtr - chunk pointer referencing first (abbrev) chunk
 | |
|  *    chunkOutPtr- (out) updated chunk ptr referencing first (full) chunk
 | |
|  * RETURN:
 | |
|  *    returns NO_ERROR if success.
 | |
|  ***********************************************************/
 | |
| int FileOp::expandAbbrevColumnChunk(IDBDataFile* pFile, const uint8_t* emptyVal, int colWidth,
 | |
|                                     const CompChunkPtr& chunkInPtr, CompChunkPtr& chunkOutPtr,
 | |
|                                     const char* hdrs)
 | |
| {
 | |
|   int userPadBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
 | |
|   auto realCompressionType = m_compressionType;
 | |
|   if (hdrs)
 | |
|   {
 | |
|     realCompressionType = compress::CompressInterface::getCompressionType(hdrs);
 | |
|   }
 | |
|   std::unique_ptr<CompressInterface> compressor(
 | |
|       compress::getCompressInterfaceByType(realCompressionType, userPadBytes));
 | |
| 
 | |
|   const int IN_BUF_LEN = CompressInterface::UNCOMPRESSED_INBUF_LEN;
 | |
|   const int OUT_BUF_LEN = compressor->maxCompressedSize(IN_BUF_LEN) + userPadBytes +
 | |
|                           compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE;
 | |
| 
 | |
|   char* toBeCompressedBuf = new char[IN_BUF_LEN];
 | |
|   boost::scoped_array<char> toBeCompressedPtr(toBeCompressedBuf);
 | |
| 
 | |
|   setEmptyBuf((unsigned char*)toBeCompressedBuf, IN_BUF_LEN, emptyVal, colWidth);
 | |
| 
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, chunkInPtr.first, SEEK_SET));
 | |
| 
 | |
|   char* compressedInBuf = new char[chunkInPtr.second];
 | |
|   boost::scoped_array<char> compressedInBufPtr(compressedInBuf);
 | |
|   RETURN_ON_ERROR(readFile(pFile, (unsigned char*)compressedInBuf, chunkInPtr.second));
 | |
| 
 | |
|   // Uncompress an "abbreviated" chunk into our 4MB buffer
 | |
|   size_t outputLen = IN_BUF_LEN;
 | |
|   int rc = compressor->uncompressBlock(compressedInBuf, chunkInPtr.second, (unsigned char*)toBeCompressedBuf,
 | |
|                                        outputLen);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     return ERR_COMP_UNCOMPRESS;
 | |
|   }
 | |
| 
 | |
|   compressedInBufPtr.reset();  // release memory
 | |
| 
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, chunkInPtr.first, SEEK_SET));
 | |
| 
 | |
|   unsigned char* compressedOutBuf = new unsigned char[OUT_BUF_LEN];
 | |
|   boost::scoped_array<unsigned char> compressedOutBufPtr(compressedOutBuf);
 | |
| 
 | |
|   // Compress the data we just read, as a "full" 4MB chunk
 | |
|   outputLen = OUT_BUF_LEN;
 | |
|   rc = compressor->compressBlock(reinterpret_cast<char*>(toBeCompressedBuf), IN_BUF_LEN, compressedOutBuf,
 | |
|                                  outputLen);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     return ERR_COMP_COMPRESS;
 | |
|   }
 | |
| 
 | |
|   // Round up the compressed chunk size
 | |
|   rc = compressor->padCompressedChunks(compressedOutBuf, outputLen, OUT_BUF_LEN);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     return ERR_COMP_PAD_DATA;
 | |
|   }
 | |
| 
 | |
|   RETURN_ON_ERROR(writeFile(pFile, compressedOutBuf, outputLen));
 | |
| 
 | |
|   chunkOutPtr.first = chunkInPtr.first;
 | |
|   chunkOutPtr.second = outputLen;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write headers to a compressed column file.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    hdr     (in) - header pointers to be written
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::writeHeaders(IDBDataFile* pFile, const char* hdr) const
 | |
| {
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_SET));
 | |
| 
 | |
|   // Write the headers
 | |
|   if (pFile->write(hdr, CompressInterface::HDR_BUF_LEN * 2) != CompressInterface::HDR_BUF_LEN * 2)
 | |
|   {
 | |
|     return ERR_FILE_WRITE;
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write headers to a compressed column or dictionary file.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    controlHdr (in) - control header to be written
 | |
|  *    pointerHdr (in) - pointer header to be written
 | |
|  *    ptrHdrSize (in) - size (in bytes) of pointer header
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::writeHeaders(IDBDataFile* pFile, const char* controlHdr, const char* pointerHdr,
 | |
|                          uint64_t ptrHdrSize) const
 | |
| {
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_SET));
 | |
| 
 | |
|   // Write the control header
 | |
|   if (pFile->write(controlHdr, CompressInterface::HDR_BUF_LEN) != CompressInterface::HDR_BUF_LEN)
 | |
|   {
 | |
|     return ERR_FILE_WRITE;
 | |
|   }
 | |
| 
 | |
|   // Write the pointer header
 | |
|   if (pFile->write(pointerHdr, ptrHdrSize) != (ssize_t)ptrHdrSize)
 | |
|   {
 | |
|     return ERR_FILE_WRITE;
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write out (initialize) an extent in a dictionary store file.
 | |
|  *    A mutex is used for each DBRoot, to prevent contention between
 | |
|  *    threads, because if multiple threads are creating extents on
 | |
|  *    the same DBRoot at the same time, the extents can become
 | |
|  *    fragmented.  It is best to only create one extent at a time
 | |
|  *    on each DBRoot.
 | |
|  *    This function can be used to initialize an entirely new extent, or
 | |
|  *    to finish initializing an extent that has already been started.
 | |
|  *    nBlocks controls how many 8192-byte blocks are to be written out.
 | |
|  *    If bOptExtension is set then method first checks config for
 | |
|  *    DBRootX.Prealloc. If it is disabled then it skips disk space
 | |
|  *    preallocation.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    dbRoot  (in) - DBRoot of pFile
 | |
|  *    nBlocks (in) - number of blocks to be written for an extent
 | |
|  *    blockHdrInit(in) - data used to initialize each block
 | |
|  *    blockHdrInitSize(in) - number of bytes in blockHdrInit
 | |
|  *    bExpandExtent (in) - Expand existing extent, or initialize a new one
 | |
|  *    bOptExtension(in) - skip full extent preallocation.
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::initDctnryExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, unsigned char* blockHdrInit,
 | |
|                              int blockHdrInitSize, bool /*bExpandExtent*/, bool bOptExtension, int64_t lbid)
 | |
| {
 | |
|   // @bug5769 Don't initialize extents or truncate db files on HDFS
 | |
|   if (idbdatafile::IDBPolicy::useHdfs())
 | |
|   {
 | |
|     if (m_compressionType)
 | |
|       updateDctnryExtent(pFile, nBlocks, lbid);
 | |
| 
 | |
|     // Synchronize to avoid write buffer pile up too much, which could cause
 | |
|     // controllernode to timeout later when it needs to save a snapshot.
 | |
|     pFile->flush();
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     // Create vector of mutexes used to serialize extent access per DBRoot
 | |
|     initDbRootExtentMutexes();
 | |
| 
 | |
|     // MCOL-498 Skip the huge preallocations if the option is set
 | |
|     // for the dbroot. This check is skiped for abbreviated extent.
 | |
|     // IMO it is better to check bool then to call a function.
 | |
|     // CS uses non-compressed dict files for its system catalog so
 | |
|     // CS doesn't optimize non-compressed dict creation.
 | |
|     if (bOptExtension)
 | |
|     {
 | |
|       bOptExtension = (idbdatafile::IDBPolicy::PreallocSpaceDisabled(dbRoot) && m_compressionType)
 | |
|                           ? bOptExtension
 | |
|                           : false;
 | |
|     }
 | |
|     // Reduce number of blocks allocated for abbreviated extents thus
 | |
|     // CS writes less when creates a new table. This couldn't be zero
 | |
|     // b/c Snappy compressed file format doesn't tolerate empty files.
 | |
|     int realNBlocks = (bOptExtension && nBlocks <= MAX_INITIAL_EXTENT_BLOCKS_TO_DISK) ? 1 : nBlocks;
 | |
| 
 | |
|     // Determine the number of blocks in each call to fwrite(), and the
 | |
|     // number of fwrite() calls to make, based on this.  In other words,
 | |
|     // we put a cap on the "writeSize" so that we don't allocate and write
 | |
|     // an entire extent at once for the 64M row extents.  If we are
 | |
|     // expanding an abbreviated 64M extent, we may not have an even
 | |
|     // multiple of MAX_NBLOCKS to write; remWriteSize is the number of
 | |
|     // blocks above and beyond loopCount*MAX_NBLOCKS.
 | |
|     int writeSize = realNBlocks * BYTE_PER_BLOCK;  // 1M and 8M row extent size
 | |
|     int loopCount = 1;
 | |
|     int remWriteSize = 0;
 | |
| 
 | |
|     if (realNBlocks > MAX_NBLOCKS)  // 64M row extent size
 | |
|     {
 | |
|       writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
 | |
|       loopCount = realNBlocks / MAX_NBLOCKS;
 | |
|       remWriteSize = realNBlocks - (loopCount * MAX_NBLOCKS);
 | |
|     }
 | |
| 
 | |
|     // Allocate a buffer, initialize it, and use it to create the extent
 | |
|     idbassert(dbRoot > 0);
 | |
| 
 | |
| #ifdef PROFILE
 | |
|     if (bExpandExtent)
 | |
|       Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT);
 | |
|     else
 | |
|       Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT);
 | |
| #endif
 | |
| 
 | |
|     boost::mutex::scoped_lock lk(m_DbRootAddExtentMutexes[dbRoot]);
 | |
| 
 | |
| #ifdef PROFILE
 | |
|     if (bExpandExtent)
 | |
|       Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT);
 | |
|     else
 | |
|       Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT);
 | |
| #endif
 | |
|     // Skip space preallocation if configured so
 | |
|     // fallback to sequential write otherwise.
 | |
|     // Couldn't avoid preallocation for full extents,
 | |
|     // e.g. ADD COLUMN DDL b/c CS has to fill the file
 | |
|     // with empty magics.
 | |
|     if (!bOptExtension)
 | |
|     {
 | |
|       // Allocate buffer, and store in scoped_array to insure it's deletion.
 | |
|       // Create scope {...} to manage deletion of writeBuf.
 | |
|       {
 | |
| #ifdef PROFILE
 | |
|         Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT);
 | |
| #endif
 | |
| 
 | |
|         unsigned char* writeBuf = new unsigned char[writeSize];
 | |
|         boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
 | |
| 
 | |
|         memset(writeBuf, 0, writeSize);
 | |
| 
 | |
|         for (int i = 0; i < realNBlocks; i++)
 | |
|         {
 | |
|           memcpy(writeBuf + (i * BYTE_PER_BLOCK), blockHdrInit, blockHdrInitSize);
 | |
|         }
 | |
| 
 | |
| #ifdef PROFILE
 | |
|         Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT);
 | |
| 
 | |
|         if (bExpandExtent)
 | |
|           Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
 | |
|         else
 | |
|           Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT);
 | |
| #endif
 | |
| 
 | |
|         if (remWriteSize > 0)
 | |
|         {
 | |
|           if (pFile->write(writeBuf, remWriteSize) != remWriteSize)
 | |
|           {
 | |
|             return ERR_FILE_WRITE;
 | |
|           }
 | |
|         }
 | |
| 
 | |
|         for (int j = 0; j < loopCount; j++)
 | |
|         {
 | |
|           if (pFile->write(writeBuf, writeSize) != writeSize)
 | |
|           {
 | |
|             return ERR_FILE_WRITE;
 | |
|           }
 | |
|         }
 | |
|         // CS doesn't account flush timings.
 | |
| #ifdef PROFILE
 | |
|         if (bExpandExtent)
 | |
|           Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
 | |
|         else
 | |
|           Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT);
 | |
| #endif
 | |
|       }
 | |
|     }  // preallocation fallback end
 | |
| 
 | |
|     // MCOL-498 CS has to set a number of blocs in the chunk header
 | |
|     if (m_compressionType)
 | |
|     {
 | |
|       updateDctnryExtent(pFile, nBlocks, lbid);
 | |
|     }
 | |
|     pFile->flush();
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Create a vector containing the mutexes used to serialize
 | |
|  *    extent creation per DBRoot.  Serializing extent creation
 | |
|  *    helps to prevent disk fragmentation.
 | |
|  ***********************************************************/
 | |
| /* static */
 | |
| void FileOp::initDbRootExtentMutexes()
 | |
| {
 | |
|   boost::mutex::scoped_lock lk(m_createDbRootMutexes);
 | |
| 
 | |
|   if (m_DbRootAddExtentMutexes.size() == 0)
 | |
|   {
 | |
|     std::vector<uint16_t> rootIds;
 | |
|     Config::getRootIdList(rootIds);
 | |
| 
 | |
|     for (size_t i = 0; i < rootIds.size(); i++)
 | |
|     {
 | |
|       m_DbRootAddExtentMutexes.emplace(std::piecewise_construct, std::forward_as_tuple(rootIds[i]),
 | |
|                                        std::forward_as_tuple());
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write out (reinitialize) a partial extent in a column file.
 | |
|  *    A mutex is not used to prevent contention between threads,
 | |
|  *    because the extent should already be in place on disk; so
 | |
|  *    disk fragmentation is not an issue.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    startOffset(in)-file offset where we are to begin writing blocks
 | |
|  *    nBlocks (in) - number of blocks to be written to the extent
 | |
|  *    emptyVal(in) - empty value to be used for column data values
 | |
|  *    width   (in) - width of the applicable column
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::reInitPartialColumnExtent(IDBDataFile* pFile, long long startOffset, int nBlocks,
 | |
|                                       const uint8_t* emptyVal, int width)
 | |
| {
 | |
|   int rc = setFileOffset(pFile, startOffset, SEEK_SET);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|     return rc;
 | |
| 
 | |
|   if (nBlocks == 0)
 | |
|     return NO_ERROR;
 | |
| 
 | |
|   // Determine the number of blocks in each call to fwrite(), and the
 | |
|   // number of fwrite() calls to make, based on this.  In other words,
 | |
|   // we put a cap on the "writeSize" so that we don't allocate and write
 | |
|   // an entire extent at once for the 64M row extents.
 | |
|   int writeSize = nBlocks * BYTE_PER_BLOCK;  // 1M and 8M row extent size
 | |
|   int loopCount = 0;
 | |
|   int remainderSize = writeSize;
 | |
| 
 | |
|   if (nBlocks > MAX_NBLOCKS)  // 64M row extent size
 | |
|   {
 | |
|     writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
 | |
|     loopCount = nBlocks / MAX_NBLOCKS;
 | |
|     remainderSize = nBlocks - (loopCount * MAX_NBLOCKS);
 | |
|   }
 | |
| 
 | |
|   // Allocate a buffer, initialize it, and use it to initialize the extent
 | |
|   // Store in scoped_array to insure it's deletion.
 | |
|   // Create scope {...} to manage deletion of writeBuf.
 | |
|   {
 | |
|     unsigned char* writeBuf = new unsigned char[writeSize];
 | |
|     boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
 | |
| 
 | |
|     setEmptyBuf(writeBuf, writeSize, emptyVal, width);
 | |
| 
 | |
|     for (int j = 0; j < loopCount; j++)
 | |
|     {
 | |
|       if (pFile->write(writeBuf, writeSize) != writeSize)
 | |
|       {
 | |
|         return ERR_FILE_WRITE;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (remainderSize > 0)
 | |
|     {
 | |
|       if (pFile->write(writeBuf, remainderSize) != remainderSize)
 | |
|       {
 | |
|         return ERR_FILE_WRITE;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Synchronize here to avoid write buffer pile up too much, which could
 | |
|   // cause controllernode to timeout later when it needs to save a snapshot.
 | |
|   pFile->flush();
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write out (reinitialize) a partial extent in a dictionary store file.
 | |
|  *    A mutex is not used to prevent contention between threads,
 | |
|  *    because the extent should already be in place on disk; so
 | |
|  *    disk fragmentation is not an issue.
 | |
|  * PARAMETERS:
 | |
|  *    pFile   (in) - IDBDataFile* of column segment file to be written to
 | |
|  *    startOffset(in)-file offset where we are to begin writing blocks
 | |
|  *    nBlocks (in) - number of blocks to be written to the extent
 | |
|  *    blockHdrInit(in) - data used to initialize each block
 | |
|  *    blockHdrInitSize(in) - number of bytes in blockHdrInit
 | |
|  * RETURN:
 | |
|  *    returns ERR_FILE_WRITE if an error occurs,
 | |
|  *    else returns NO_ERROR.
 | |
|  ***********************************************************/
 | |
| int FileOp::reInitPartialDctnryExtent(IDBDataFile* pFile, long long startOffset, int nBlocks,
 | |
|                                       unsigned char* blockHdrInit, int blockHdrInitSize)
 | |
| {
 | |
|   int rc = setFileOffset(pFile, startOffset, SEEK_SET);
 | |
| 
 | |
|   if (rc != NO_ERROR)
 | |
|     return rc;
 | |
| 
 | |
|   if (nBlocks == 0)
 | |
|     return NO_ERROR;
 | |
| 
 | |
|   // Determine the number of blocks in each call to fwrite(), and the
 | |
|   // number of fwrite() calls to make, based on this.  In other words,
 | |
|   // we put a cap on the "writeSize" so that we don't allocate and write
 | |
|   // an entire extent at once for the 64M row extents.
 | |
|   int writeSize = nBlocks * BYTE_PER_BLOCK;  // 1M and 8M row extent size
 | |
|   int loopCount = 0;
 | |
|   int remainderSize = writeSize;
 | |
| 
 | |
|   if (nBlocks > MAX_NBLOCKS)  // 64M row extent size
 | |
|   {
 | |
|     writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
 | |
|     loopCount = nBlocks / MAX_NBLOCKS;
 | |
|     remainderSize = nBlocks - (loopCount * MAX_NBLOCKS);
 | |
|     nBlocks = MAX_NBLOCKS;
 | |
|   }
 | |
| 
 | |
|   // Allocate a buffer, initialize it, and use it to initialize the extent
 | |
|   // Store in scoped_array to insure it's deletion.
 | |
|   // Create scope {...} to manage deletion of writeBuf.
 | |
|   {
 | |
|     unsigned char* writeBuf = new unsigned char[writeSize];
 | |
|     boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
 | |
| 
 | |
|     memset(writeBuf, 0, writeSize);
 | |
| 
 | |
|     for (int i = 0; i < nBlocks; i++)
 | |
|     {
 | |
|       memcpy(writeBuf + (i * BYTE_PER_BLOCK), blockHdrInit, blockHdrInitSize);
 | |
|     }
 | |
| 
 | |
|     for (int j = 0; j < loopCount; j++)
 | |
|     {
 | |
|       if (pFile->write(writeBuf, writeSize) != writeSize)
 | |
|       {
 | |
|         return ERR_FILE_WRITE;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if (remainderSize > 0)
 | |
|     {
 | |
|       if (pFile->write(writeBuf, remainderSize) != remainderSize)
 | |
|       {
 | |
|         return ERR_FILE_WRITE;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Synchronize here to avoid write buffer pile up too much, which could
 | |
|   // cause controllernode to timeout later when it needs to save a snapshot.
 | |
|   pFile->flush();
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  *    fileSize (out) - file size in bytes
 | |
|  * RETURN:
 | |
|  *    error code
 | |
|  ***********************************************************/
 | |
| int FileOp::getFileSize(IDBDataFile* pFile, long long& fileSize) const
 | |
| {
 | |
|   fileSize = 0;
 | |
| 
 | |
|   if (pFile == NULL)
 | |
|     return ERR_FILE_NULL;
 | |
| 
 | |
|   fileSize = pFile->size();
 | |
| 
 | |
|   if (fileSize < 0)
 | |
|   {
 | |
|     fileSize = 0;
 | |
|     return ERR_FILE_STAT;
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Get file size using file id
 | |
|  * PARAMETERS:
 | |
|  *    fid    - column OID
 | |
|  *    dbroot    - DBRoot    of applicable segment file
 | |
|  *    partition - partition of applicable segment file
 | |
|  *    segment   - segment   of applicable segment file
 | |
|  *    fileSize (out) - current file size for requested segment file
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if okay, else an error return code.
 | |
|  ***********************************************************/
 | |
| int FileOp::getFileSize(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment,
 | |
|                         long long& fileSize) const
 | |
| {
 | |
|   fileSize = 0;
 | |
| 
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
|   RETURN_ON_ERROR(getFileName(fid, fileName, dbRoot, partition, segment));
 | |
| 
 | |
|   fileSize = IDBPolicy::size(fileName);
 | |
| 
 | |
|   if (fileSize < 0)
 | |
|   {
 | |
|     fileSize = 0;
 | |
|     return ERR_FILE_STAT;
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Check whether it is a directory
 | |
|  * PARAMETERS:
 | |
|  *    dirName - directory name
 | |
|  * RETURN:
 | |
|  *    true if it is, false otherwise
 | |
|  ***********************************************************/
 | |
| bool FileOp::isDir(const char* dirName) const
 | |
| {
 | |
|   return IDBPolicy::isDir(dirName);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Convert an oid to a filename
 | |
|  * PARAMETERS:
 | |
|  *    fid - fid
 | |
|  *    fullFileName - file name
 | |
|  *    bCreateDir - whether need to create a directory
 | |
|  *    dbRoot     - DBRoot where file is to be located; 1->DBRoot1,
 | |
|  *                 2->DBRoot2, etc.  If bCreateDir is false, meaning we
 | |
|  *                 are not creating the file but only searching for an
 | |
|  *                 existing file, then dbRoot can be 0, and oid2FileName
 | |
|  *                 will search all the DBRoots for the applicable filename.
 | |
|  *    partition  - Partition number to be used in filepath subdirectory
 | |
|  *    segment    - Segment number to be used in filename
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success, other if fail
 | |
|  ***********************************************************/
 | |
| int FileOp::oid2FileName(FID fid, char* fullFileName, bool bCreateDir, uint16_t dbRoot, uint32_t partition,
 | |
|                          uint16_t segment) const
 | |
| {
 | |
| #ifdef SHARED_NOTHING_DEMO_2
 | |
| 
 | |
|   if (fid >= 10000)
 | |
|   {
 | |
|     char root[FILE_NAME_SIZE];
 | |
|     Config::getSharedNothingRoot(root);
 | |
|     sprintf(fullFileName, "%s/FILE%d", root, fid);
 | |
|     return NO_ERROR;
 | |
|   }
 | |
| 
 | |
| #endif
 | |
| 
 | |
| // Need this stub to use ColumnOp::writeRow in the unit tests
 | |
| #ifdef WITH_UNIT_TESTS
 | |
|   if (fid == 42)
 | |
|   {
 | |
|     sprintf(fullFileName, "./versionbuffer");
 | |
|     return NO_ERROR;
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   /* If is a version buffer file, the format is different. */
 | |
|   if (fid < 1000)
 | |
|   {
 | |
|     /* Get the dbroot #
 | |
|      * Get the root of that dbroot
 | |
|      * Add "/versionbuffer.cdf"
 | |
|      */
 | |
|     BRM::DBRM dbrm;
 | |
|     int _dbroot = dbrm.getDBRootOfVBOID(fid);
 | |
| 
 | |
|     if (_dbroot < 0)
 | |
|       return ERR_INVALID_VBOID;
 | |
| 
 | |
|     snprintf(fullFileName, FILE_NAME_SIZE, "%s/versionbuffer.cdf", Config::getDBRootByNum(_dbroot).c_str());
 | |
|     return NO_ERROR;
 | |
|   }
 | |
| 
 | |
|   // Get hashed part of the filename. This is the tail-end of the filename path,
 | |
|   //  excluding the DBRoot.
 | |
|   char tempFileName[FILE_NAME_SIZE];
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
|   RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, partition, segment)));
 | |
| 
 | |
|   // see if file exists in specified DBRoot; return if found
 | |
|   if (fullFileName == nullptr)
 | |
|   {
 | |
|     return ERR_INTERNAL;
 | |
|   }
 | |
| 
 | |
|   if (dbRoot > 0)
 | |
|   {
 | |
|     sprintf(fullFileName, "%s/%s", Config::getDBRootByNum(dbRoot).c_str(), tempFileName);
 | |
| 
 | |
|     // std::cout << "oid2FileName() OID: " << fid <<
 | |
|     //   " searching for file: " << fullFileName <<std::endl;
 | |
|     // if (access(fullFileName, R_OK) == 0) return NO_ERROR;
 | |
|     //@Bug 5397
 | |
|     if (IDBPolicy::exists(fullFileName))
 | |
|       return NO_ERROR;
 | |
| 
 | |
|     // file wasn't found, user doesn't want dirs to be created, we're done
 | |
|     if (!bCreateDir)
 | |
|       return NO_ERROR;
 | |
| 
 | |
|     // std::cout << "oid2FileName() OID: " << fid <<
 | |
|     //   " creating file: " << fullFileName <<std::endl;
 | |
|   }
 | |
|   else
 | |
|   {
 | |
|     // Now try to find the file in each of the DBRoots.
 | |
|     std::vector<std::string> dbRootPathList;
 | |
|     Config::getDBRootPathList(dbRootPathList);
 | |
| 
 | |
|     for (unsigned i = 0; i < dbRootPathList.size(); i++)
 | |
|     {
 | |
|       sprintf(fullFileName, "%s/%s", dbRootPathList[i].c_str(), tempFileName);
 | |
| 
 | |
|       // found it, nothing more to do, return
 | |
|       // if (access(fullFileName, R_OK) == 0) return NO_ERROR;
 | |
|       //@Bug 5397
 | |
|       if (IDBPolicy::exists(fullFileName))
 | |
|         return NO_ERROR;
 | |
|     }
 | |
| 
 | |
|     // file wasn't found, user didn't specify DBRoot so we can't create
 | |
|     return ERR_FILE_NOT_EXIST;
 | |
|   }
 | |
| 
 | |
|   std::stringstream aDirName;
 | |
|   for (size_t i = 0; i < MaxDirLevels; i++)
 | |
|   {
 | |
|     if (i == 0)
 | |
|     {
 | |
|       aDirName << Config::getDBRootByNum(dbRoot).c_str() << "/" << dbDir[i];
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|       aDirName << "/" << dbDir[i];
 | |
|     }
 | |
|     if (!isDir(aDirName.str().c_str()))
 | |
|       RETURN_ON_ERROR(createDir(aDirName.str().c_str()));
 | |
| 
 | |
|     {
 | |
|       std::ostringstream ossChown;
 | |
|       if (chownDataPath(aDirName.str()))
 | |
|         return ERR_FILE_CHOWN;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| void FileOp::getFileNameForPrimProc(FID fid, char* fullFileName, uint16_t dbRoot, uint32_t partition,
 | |
|                                     uint16_t segment) const
 | |
| {
 | |
|   string dbRootPath = Config::getDBRootByNum(dbRoot);
 | |
|   if (dbRootPath.empty())
 | |
|   {
 | |
|     ostringstream oss;
 | |
|     oss << "(dbroot " << dbRoot << " offline)";
 | |
|     dbRootPath = oss.str();
 | |
|   }
 | |
| 
 | |
|   // different filenames for the version buffer files
 | |
|   if (fid < 1000)
 | |
|     snprintf(fullFileName, FILE_NAME_SIZE, "%s/versionbuffer.cdf", dbRootPath.c_str());
 | |
|   else
 | |
|     snprintf(fullFileName, FILE_NAME_SIZE, "%s/%03u.dir/%03u.dir/%03u.dir/%03u.dir/%03u.dir/FILE%03d.cdf",
 | |
|              dbRootPath.c_str(), fid >> 24, (fid & 0x00ff0000) >> 16, (fid & 0x0000ff00) >> 8,
 | |
|              fid & 0x000000ff, partition, segment);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Search for directory path associated with specified OID.
 | |
|  *    If the OID is a version buffer file, it returns the whole
 | |
|  *    filename.
 | |
|  * PARAMETERS:
 | |
|  *    fid   - (in)  OID to search for
 | |
|  *    pFile - (out) OID directory path (including DBRoot) that is found
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if OID dir path found, else returns ERR_FILE_NOT_EXIST
 | |
|  ***********************************************************/
 | |
| int FileOp::oid2DirName(FID fid, char* oidDirName) const
 | |
| {
 | |
|   char tempFileName[FILE_NAME_SIZE];
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
| 
 | |
|   /* If is a version buffer file, the format is different. */
 | |
|   if (fid < 1000)
 | |
|   {
 | |
|     /* Get the dbroot #
 | |
|      * Get the root of that dbroot
 | |
|      */
 | |
|     BRM::DBRM dbrm;
 | |
|     int _dbroot = dbrm.getDBRootOfVBOID(fid);
 | |
| 
 | |
|     if (_dbroot < 0)
 | |
|       return ERR_INVALID_VBOID;
 | |
| 
 | |
|     snprintf(oidDirName, FILE_NAME_SIZE, "%s", Config::getDBRootByNum(_dbroot).c_str());
 | |
|     return NO_ERROR;
 | |
|   }
 | |
| 
 | |
|   if (oidDirName == nullptr)
 | |
|   {
 | |
|     return ERR_INTERNAL;
 | |
|   }
 | |
| 
 | |
|   RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, 0, 0)));
 | |
| 
 | |
|   // Now try to find the directory in each of the DBRoots.
 | |
|   std::vector<std::string> dbRootPathList;
 | |
|   Config::getDBRootPathList(dbRootPathList);
 | |
| 
 | |
|   for (unsigned i = 0; i < dbRootPathList.size(); i++)
 | |
|   {
 | |
|     sprintf(oidDirName, "%s/%s/%s/%s/%s", dbRootPathList[i].c_str(), dbDir[0], dbDir[1], dbDir[2], dbDir[3]);
 | |
| 
 | |
|     // found it, nothing more to do, return
 | |
|     //@Bug 5397. use the new way to check
 | |
|     if (IDBPolicy::exists(oidDirName))
 | |
|       return NO_ERROR;
 | |
|   }
 | |
| 
 | |
|   return ERR_FILE_NOT_EXIST;
 | |
| }
 | |
| 
 | |
| bool FileOp::existsDefaultFile(FID fid) const
 | |
| {
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
| 
 | |
|   RETURN_ON_ERROR((Convertor::oid2FileName(fid, fileName, dbDir, 0, 0)));
 | |
|   std::vector<std::string> dbRootPathList;
 | |
|   Config::getDBRootPathList(dbRootPathList);
 | |
| 
 | |
|   for (unsigned i = 0; i < dbRootPathList.size(); i++)
 | |
|   {
 | |
|     std::stringstream stream;
 | |
|     stream << dbRootPathList[i].c_str() << "/" << fileName;
 | |
|     string fullFileName = stream.str();
 | |
|     if (IDBPolicy::exists(fullFileName.c_str()))
 | |
|       return true;
 | |
|   }
 | |
| 
 | |
|   return false;
 | |
| }
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Construct directory path for the specified fid (OID), DBRoot, and
 | |
|  *    partition number.  Directory path need not exist, nor is it created.
 | |
|  * PARAMETERS:
 | |
|  *    fid       - (in)  OID of interest
 | |
|  *    dbRoot    - (in)  DBRoot of interest
 | |
|  *    partition - (in)  partition of interest
 | |
|  *    dirName   - (out) constructed directory path
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if path is successfully constructed.
 | |
|  ***********************************************************/
 | |
| int FileOp::getDirName(FID fid, uint16_t dbRoot, uint32_t partition, std::string& dirName) const
 | |
| {
 | |
|   char tempFileName[FILE_NAME_SIZE];
 | |
|   char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
 | |
| 
 | |
|   RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, partition, 0)));
 | |
| 
 | |
|   std::string rootPath = Config::getDBRootByNum(dbRoot);
 | |
|   std::ostringstream oss;
 | |
|   oss << rootPath << '/' << dbDir[0] << '/' << dbDir[1] << '/' << dbDir[2] << '/' << dbDir[3] << '/'
 | |
|       << dbDir[4];
 | |
|   dirName = oss.str();
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Open a file
 | |
|  * PARAMETERS:
 | |
|  *    fileName - file name with complete path
 | |
|  *    pFile - file handle
 | |
|  * RETURN:
 | |
|  *    true if exists, false otherwise
 | |
|  ***********************************************************/
 | |
| // @bug 5572 - HDFS usage: add *.tmp file backup flag
 | |
| IDBDataFile* FileOp::openFile(const char* fileName, const char* mode, const int ioColSize,
 | |
|                               bool useTmpSuffix) const
 | |
| {
 | |
|   IDBDataFile* pFile;
 | |
|   errno = 0;
 | |
| 
 | |
|   unsigned opts;
 | |
| 
 | |
|   if (ioColSize > 0)
 | |
|     opts = IDBDataFile::USE_VBUF;
 | |
|   else
 | |
|     opts = IDBDataFile::USE_NOVBUF;
 | |
| 
 | |
|   if ((useTmpSuffix) && idbdatafile::IDBPolicy::useHdfs())
 | |
|     opts |= IDBDataFile::USE_TMPFILE;
 | |
| 
 | |
|   pFile =
 | |
|       IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, mode, opts, ioColSize);
 | |
| 
 | |
|   if (pFile == NULL)
 | |
|   {
 | |
|     int errRc = errno;
 | |
|     std::ostringstream oss;
 | |
|     std::string errnoMsg;
 | |
|     Convertor::mapErrnoToString(errRc, errnoMsg);
 | |
|     oss << "FileOp::openFile(): fopen(" << fileName << ", " << mode << "): errno = " << errRc << ": "
 | |
|         << errnoMsg;
 | |
|     logging::Message::Args args;
 | |
|     args.add(oss.str());
 | |
|     SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0006);
 | |
|     SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0006);
 | |
|   }
 | |
| 
 | |
|   return pFile;
 | |
| }
 | |
| 
 | |
| // @bug 5572 - HDFS usage: add *.tmp file backup flag
 | |
| IDBDataFile* FileOp::openFile(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment,
 | |
|                               std::string& segFile, const char* mode, int ioColSize, bool useTmpSuffix) const
 | |
| {
 | |
|   char fileName[FILE_NAME_SIZE];
 | |
|   int rc;
 | |
| 
 | |
|   // fid2FileName( fileName, fid );
 | |
|   RETURN_ON_WE_ERROR((rc = getFileName(fid, fileName, dbRoot, partition, segment)), NULL);
 | |
| 
 | |
|   // disable buffering for versionbuffer file
 | |
|   if (fid < 1000)
 | |
|     ioColSize = 0;
 | |
| 
 | |
|   IDBDataFile* pF = openFile(fileName, mode, ioColSize, useTmpSuffix);
 | |
| 
 | |
|   segFile = fileName;
 | |
| 
 | |
|   return pF;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Read a portion of file to a buffer
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  *    readBuf - read buffer
 | |
|  *    readSize - the size to read
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_NULL if file handle is NULL
 | |
|  *    ERR_FILE_READ if something wrong in reading the file
 | |
|  ***********************************************************/
 | |
| int FileOp::readFile(IDBDataFile* pFile, unsigned char* readBuf, int readSize) const
 | |
| {
 | |
|   if (pFile != NULL)
 | |
|   {
 | |
|     int bc = pFile->read(readBuf, readSize);
 | |
|     if (bc != readSize)
 | |
|     {
 | |
|       // MCOL-498 EOF if a next block is empty
 | |
|       if (bc == 0)
 | |
|       {
 | |
|         return ERR_FILE_EOF;
 | |
|       }
 | |
|       return ERR_FILE_READ;
 | |
|     }
 | |
|   }
 | |
|   else
 | |
|     return ERR_FILE_NULL;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * Reads contents of headers from "pFile" and stores into "hdrs".
 | |
|  ***********************************************************/
 | |
| int FileOp::readHeaders(IDBDataFile* pFile, char* hdrs) const
 | |
| {
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, 0));
 | |
|   RETURN_ON_ERROR(
 | |
|       readFile(pFile, reinterpret_cast<unsigned char*>(hdrs), (CompressInterface::HDR_BUF_LEN * 2)));
 | |
|   int rc = compress::CompressInterface::verifyHdr(hdrs);
 | |
| 
 | |
|   if (rc != 0)
 | |
|   {
 | |
|     return ERR_COMP_VERIFY_HDRS;
 | |
|   }
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * Reads contents of headers from "pFile" and stores into "hdr1" and "hdr2".
 | |
|  ***********************************************************/
 | |
| int FileOp::readHeaders(IDBDataFile* pFile, char* hdr1, char* hdr2) const
 | |
| {
 | |
|   unsigned char* hdrPtr = reinterpret_cast<unsigned char*>(hdr1);
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, 0));
 | |
|   RETURN_ON_ERROR(readFile(pFile, hdrPtr, CompressInterface::HDR_BUF_LEN));
 | |
| 
 | |
|   int ptrSecSize = compress::CompressInterface::getHdrSize(hdrPtr) - CompressInterface::HDR_BUF_LEN;
 | |
|   return readFile(pFile, reinterpret_cast<unsigned char*>(hdr2), ptrSecSize);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION: No change Old signature
 | |
|  *    Read a portion of file to a buffer
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  *    offset - file offset
 | |
|  *    origin - can be SEEK_SET, or SEEK_CUR, or SEEK_END
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_NULL if file handle is NULL
 | |
|  *    ERR_FILE_SEEK if something wrong in setting the position
 | |
|  ***********************************************************/
 | |
| int FileOp::setFileOffset(IDBDataFile* pFile, long long offset, int origin) const
 | |
| {
 | |
|   int rc;
 | |
|   long long fboOffset = offset;  // workaround solution to pass leakcheck error
 | |
| 
 | |
|   if (pFile == NULL)
 | |
|     return ERR_FILE_NULL;
 | |
| 
 | |
|   if (offset < 0)
 | |
|     return ERR_FILE_FBO_NEG;
 | |
| 
 | |
|   rc = pFile->seek(fboOffset, origin);
 | |
| 
 | |
|   if (rc)
 | |
|     return ERR_FILE_SEEK;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Read a portion of file to a buffer
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  *    offset - file offset
 | |
|  *    origin - can be SEEK_SET, or SEEK_CUR, or SEEK_END
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_NULL if file handle is NULL
 | |
|  *    ERR_FILE_SEEK if something wrong in setting the position
 | |
|  ***********************************************************/
 | |
| int FileOp::setFileOffsetBlock(IDBDataFile* pFile, uint64_t lbid, int origin) const
 | |
| {
 | |
|   long long fboOffset = 0;
 | |
|   int fbo = 0;
 | |
| 
 | |
|   // only when fboFlag is false, we get in here
 | |
|   uint16_t dbRoot;
 | |
|   uint32_t partition;
 | |
|   uint16_t segment;
 | |
|   RETURN_ON_ERROR(BRMWrapper::getInstance()->getFboOffset(lbid, dbRoot, partition, segment, fbo));
 | |
|   fboOffset = ((long long)fbo) * (long)BYTE_PER_BLOCK;
 | |
| 
 | |
|   return setFileOffset(pFile, fboOffset, origin);
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Truncate file to the specified size.
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  *    fileSize - size of file in bytes.
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_NULL if file handle is NULL
 | |
|  *    ERR_FILE_SEEK if something wrong in setting the position
 | |
|  ***********************************************************/
 | |
| int FileOp::truncateFile(IDBDataFile* pFile, long long fileSize) const
 | |
| {
 | |
|   if (pFile == NULL)
 | |
|     return ERR_FILE_NULL;
 | |
| 
 | |
|   if (pFile->truncate(fileSize) != 0)
 | |
|     return ERR_FILE_TRUNCATE;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Write a buffer to a file at at current location
 | |
|  * PARAMETERS:
 | |
|  *    pFile - file handle
 | |
|  *    writeBuf - write buffer
 | |
|  *    writeSize - the write size
 | |
|  * RETURN:
 | |
|  *    NO_ERROR if success
 | |
|  *    ERR_FILE_NULL if file handle is NULL
 | |
|  *    ERR_FILE_WRITE if something wrong in writing to the file
 | |
|  ***********************************************************/
 | |
| int FileOp::writeFile(IDBDataFile* pFile, const unsigned char* writeBuf, int writeSize) const
 | |
| {
 | |
|   if (pFile != NULL)
 | |
|   {
 | |
|     if (pFile->write(writeBuf, writeSize) != writeSize)
 | |
|       return ERR_FILE_WRITE;
 | |
|   }
 | |
|   else
 | |
|     return ERR_FILE_NULL;
 | |
| 
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| /***********************************************************
 | |
|  * DESCRIPTION:
 | |
|  *    Determine whether the applicable filesystem has room to add the
 | |
|  *    specified number of blocks (where the blocks contain BYTE_PER_BLOCK
 | |
|  *    bytes).
 | |
|  * PARAMETERS:
 | |
|  *    fileName - file whose file system is to be checked.  Does not have to
 | |
|  *               be a complete file name.  Dir path is sufficient.
 | |
|  *    nBlock   - number of 8192-byte blocks to be added
 | |
|  * RETURN:
 | |
|  *    true if there is room for the blocks or it can not be determined;
 | |
|  *    false if file system usage would exceed allowable threshold
 | |
|  ***********************************************************/
 | |
| bool FileOp::isDiskSpaceAvail(const std::string& fileName, int nBlocks) const
 | |
| {
 | |
|   bool bSpaceAvail = true;
 | |
| 
 | |
|   unsigned maxDiskUsage = Config::getMaxFileSystemDiskUsage();
 | |
| 
 | |
|   if (maxDiskUsage < 100)  // 100% means to disable the check
 | |
|   {
 | |
|     struct statfs fStats;
 | |
|     int rc = statfs(fileName.c_str(), &fStats);
 | |
| 
 | |
|     if (rc == 0)
 | |
|     {
 | |
|       double totalBlocks = fStats.f_blocks;
 | |
|       double blksToAlloc = (double)(nBlocks * BYTE_PER_BLOCK) / fStats.f_bsize;
 | |
|       double freeBlocks = fStats.f_bavail - blksToAlloc;
 | |
| 
 | |
|       if ((((totalBlocks - freeBlocks) / totalBlocks) * 100.0) > maxDiskUsage)
 | |
|         bSpaceAvail = false;
 | |
| 
 | |
|       // std::cout         << "isDiskSpaceAvail"   <<
 | |
|       //": totalBlocks: " << totalBlocks          <<
 | |
|       //"; blkSize: "     << fStats.f_bsize       <<
 | |
|       //"; nBlocks: "     << nBlocks              <<
 | |
|       //"; freeBlks: "    << freeBlocks           <<
 | |
|       //"; pctUsed: " << (((totalBlocks-freeBlocks)/totalBlocks)*100.0) <<
 | |
|       //"; bAvail: "      << bSpaceAvail          << std::endl;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return bSpaceAvail;
 | |
| }
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Virtual default functions follow; placeholders for derived class if they want
 | |
| // to override (see ColumnOpCompress1 and DctnryCompress1 in /wrapper).
 | |
| //------------------------------------------------------------------------------
 | |
| 
 | |
| //------------------------------------------------------------------------------
 | |
| // Expand current abbreviated extent to a full extent for column segment file
 | |
| // associated with pFile.  Function leaves fileposition at end of file after
 | |
| // extent is expanded.
 | |
| //------------------------------------------------------------------------------
 | |
| int FileOp::expandAbbrevColumnExtent(
 | |
|     IDBDataFile* pFile,       // FILE ptr to file where abbrev extent is to be expanded
 | |
|     uint16_t dbRoot,          // The DBRoot of the file with the abbreviated extent
 | |
|     const uint8_t* emptyVal,  // Empty value to be used in expanding the extent
 | |
|     int width,                // Width of the column (in bytes)
 | |
|     execplan::CalpontSystemCatalog::ColDataType colDataType)  // Column data type.
 | |
| {
 | |
|   // Based on extent size, see how many blocks to add to fill the extent
 | |
|   int blksToAdd =
 | |
|       (((int)BRMWrapper::getInstance()->getExtentRows() - INITIAL_EXTENT_ROWS_TO_DISK) / BYTE_PER_BLOCK) *
 | |
|       width;
 | |
| 
 | |
|   // Make sure there is enough disk space to expand the extent.
 | |
|   RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_END));
 | |
| 
 | |
|   // TODO-will have to address this DiskSpaceAvail check at some point
 | |
|   if (!isDiskSpaceAvail(Config::getDBRootByNum(dbRoot), blksToAdd))
 | |
|   {
 | |
|     return ERR_FILE_DISK_SPACE;
 | |
|   }
 | |
| 
 | |
|   // Add blocks to turn the abbreviated extent into a full extent.
 | |
|   int rc = FileOp::initColumnExtent(pFile, dbRoot, blksToAdd, emptyVal, width, colDataType,
 | |
|                                     false,  // existing file
 | |
|                                     true,   // expand existing extent
 | |
|                                     false,  // n/a since not adding new extent
 | |
|                                     true);  // optimize segment file extension
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| void FileOp::setTransId(const TxnID& transId)
 | |
| {
 | |
|   m_transId = transId;
 | |
| }
 | |
| 
 | |
| void FileOp::setBulkFlag(bool isBulkLoad)
 | |
| {
 | |
|   m_isBulk = isBulkLoad;
 | |
| }
 | |
| 
 | |
| int FileOp::flushFile(int /*rc*/, std::map<FID, FID>& /*oids*/)
 | |
| {
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| int FileOp::updateColumnExtent(IDBDataFile* /*pFile*/, int /*nBlocks*/, int64_t /*lbid*/)
 | |
| {
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| int FileOp::updateDctnryExtent(IDBDataFile* /*pFile*/, int /*nBlocks*/, int64_t /*lbid*/)
 | |
| {
 | |
|   return NO_ERROR;
 | |
| }
 | |
| 
 | |
| void FileOp::setFixFlag(bool isFix)
 | |
| {
 | |
|   m_isFix = isFix;
 | |
| }
 | |
| 
 | |
| // Small note. We call chownFileDir in couple places to chown of the
 | |
| // target file and call in oid2Filename() chowns directories created
 | |
| bool FileOp::chownDataPath(const std::string& fileName) const
 | |
| {
 | |
|   std::ostringstream error;
 | |
|   idbdatafile::IDBFileSystem& fs = IDBPolicy::getFs(fileName);
 | |
|   if (chownPath(error, fileName, fs))
 | |
|   {
 | |
|     logging::Message::Args args;
 | |
|     logging::Message message(1);
 | |
|     args.add(error.str());
 | |
|     message.format(args);
 | |
|     logging::LoggingID lid(SUBSYSTEM_ID_WE_BULK);
 | |
|     logging::MessageLog ml(lid);
 | |
|     ml.logErrorMessage(message);
 | |
|     return true;
 | |
|   }
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| }  // namespace WriteEngine
 |