mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
2656 lines
90 KiB
C++
2656 lines
90 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: we_fileop.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
|
|
|
|
#include "mcsconfig.h"
|
|
#include <unistd.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <errno.h>
|
|
#include <sstream>
|
|
#include <iostream>
|
|
#include <memory>
|
|
#include <string>
|
|
#include <stdexcept>
|
|
#if defined(__FreeBSD__)
|
|
#include <sys/param.h>
|
|
#include <sys/mount.h>
|
|
#else
|
|
#include <sys/vfs.h>
|
|
#endif
|
|
#include <boost/filesystem/operations.hpp>
|
|
#include <boost/filesystem/path.hpp>
|
|
#include <boost/scoped_array.hpp>
|
|
using namespace std;
|
|
|
|
#include "we_fileop.h"
|
|
#include "we_convertor.h"
|
|
#include "we_log.h"
|
|
#include "we_config.h"
|
|
#include "we_stats.h"
|
|
#include "we_simplesyslog.h"
|
|
|
|
#include "idbcompress.h"
|
|
using namespace compress;
|
|
|
|
#include "messagelog.h"
|
|
using namespace logging;
|
|
|
|
#include "IDBDataFile.h"
|
|
#include "IDBFileSystem.h"
|
|
#include "IDBPolicy.h"
|
|
using namespace idbdatafile;
|
|
|
|
namespace WriteEngine
|
|
{
|
|
/*static*/ boost::mutex FileOp::m_createDbRootMutexes;
|
|
/*static*/ boost::mutex FileOp::m_mkdirMutex;
|
|
/*static*/ std::map<int, boost::mutex> FileOp::m_DbRootAddExtentMutexes;
|
|
// in 1 call to fwrite(), during initialization
|
|
|
|
// StopWatch timer;
|
|
|
|
/**
|
|
* Constructor
|
|
*/
|
|
FileOp::FileOp(bool doAlloc) : m_compressionType(0), m_transId((TxnID)INVALID_NUM), m_buffer(0)
|
|
{
|
|
if (doAlloc)
|
|
{
|
|
m_buffer = new char[DEFAULT_BUFSIZ];
|
|
memset(m_buffer, '\0', DEFAULT_BUFSIZ);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Default Destructor
|
|
*/
|
|
FileOp::~FileOp()
|
|
{
|
|
if (m_buffer)
|
|
{
|
|
delete[] m_buffer;
|
|
}
|
|
|
|
m_buffer = 0;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Close a file
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* RETURN:
|
|
* none
|
|
***********************************************************/
|
|
void FileOp::closeFile(IDBDataFile* pFile) const
|
|
{
|
|
delete pFile;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Create directory
|
|
* Function uses mutex lock to prevent thread contention trying to create
|
|
* 2 subdirectories in the same directory at the same time.
|
|
* PARAMETERS:
|
|
* dirName - directory name
|
|
* mode - create mode
|
|
* RETURN:
|
|
* NO_ERROR if success, otherwise if fail
|
|
***********************************************************/
|
|
int FileOp::createDir(const char* dirName, mode_t mode) const
|
|
{
|
|
boost::mutex::scoped_lock lk(m_mkdirMutex);
|
|
int rc = IDBPolicy::mkdir(dirName);
|
|
|
|
if (rc != 0)
|
|
{
|
|
int errRc = errno;
|
|
|
|
if (errRc == EEXIST)
|
|
return NO_ERROR; // ignore "File exists" error
|
|
|
|
if (getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
std::string errnoMsg;
|
|
Convertor::mapErrnoToString(errRc, errnoMsg);
|
|
oss << "Error creating directory " << dirName << "; err-" << errRc << "; " << errnoMsg;
|
|
getLogger()->logMsg(oss.str(), ERR_DIR_CREATE, MSGLVL_ERROR);
|
|
}
|
|
|
|
return ERR_DIR_CREATE;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Create the "first" segment file for a column with a fixed file size
|
|
* Note: the file is created in binary mode
|
|
* PARAMETERS:
|
|
* fileName - file name with complete path
|
|
* numOfBlock - the total number of blocks to be initialized (written out)
|
|
* compressionType - Compression Type
|
|
* emptyVal - empty value used to initialize column values
|
|
* width - width of column in bytes
|
|
* dbRoot - DBRoot of column file we are creating
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_EXIST if file exists
|
|
* ERR_FILE_CREATE if can not create the file
|
|
***********************************************************/
|
|
int FileOp::createFile(const char* fileName, int numOfBlock, const uint8_t* emptyVal, int width,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType, uint16_t dbRoot,
|
|
BRM::LBID_t startLbid)
|
|
{
|
|
IDBDataFile* pFile = IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, "w+b",
|
|
IDBDataFile::USE_VBUF, width);
|
|
int rc = 0;
|
|
|
|
if (pFile != NULL)
|
|
{
|
|
// Initialize the contents of the extent.
|
|
if (m_compressionType)
|
|
{
|
|
rc = initAbbrevCompColumnExtent(pFile, dbRoot, numOfBlock, emptyVal, width, startLbid, colDataType);
|
|
}
|
|
else
|
|
{
|
|
rc = initColumnExtent(pFile, dbRoot, numOfBlock, emptyVal, width, colDataType,
|
|
true, // new file
|
|
false, // don't expand; add new extent
|
|
true); // add abbreviated extent
|
|
}
|
|
|
|
closeFile(pFile);
|
|
}
|
|
else
|
|
return ERR_FILE_CREATE;
|
|
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Create the "first" segment file for a column with a fixed file size
|
|
* Note: the file is created in binary mode
|
|
* PARAMETERS:
|
|
* fid - OID of the column file to be created
|
|
* allocSize (out) - number of blocks allocated to the first extent
|
|
* dbRoot - DBRoot where file is to be located
|
|
* partition- Starting partition number for segment file path
|
|
* compressionType - Compression type
|
|
* colDataType - the column data type
|
|
* emptyVal - designated "empty" value for this OID
|
|
* width - width of column in bytes
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_EXIST if file exists
|
|
* ERR_FILE_CREATE if can not create the file
|
|
***********************************************************/
|
|
int FileOp::createFile(FID fid, int& allocSize, uint16_t dbRoot, uint32_t partition,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType, const uint8_t* emptyVal,
|
|
int width)
|
|
{
|
|
// std::cout << "Creating file oid: " << fid <<
|
|
// "; compress: " << m_compressionType << std::endl;
|
|
char fileName[FILE_NAME_SIZE];
|
|
int rc;
|
|
|
|
uint16_t segment = 0; // should always be 0 when starting a new column
|
|
RETURN_ON_ERROR((rc = oid2FileName(fid, fileName, true, dbRoot, partition, segment)));
|
|
|
|
//@Bug 3196
|
|
if (exists(fileName))
|
|
return ERR_FILE_EXIST;
|
|
|
|
// allocatColExtent() treats dbRoot and partition as in/out
|
|
// arguments, so we need to pass in a non-const variable.
|
|
uint16_t dbRootx = dbRoot;
|
|
uint32_t partitionx = partition;
|
|
|
|
// Since we are creating a new column OID, we know partition
|
|
// and segment are 0, so we ignore their output values.
|
|
// timer.start( "allocateColExtent" );
|
|
|
|
BRM::LBID_t startLbid;
|
|
uint32_t startBlock;
|
|
RETURN_ON_ERROR(BRMWrapper::getInstance()->allocateColExtentExactFile((OID)fid, (uint32_t)width, dbRootx,
|
|
partitionx, segment, colDataType,
|
|
startLbid, allocSize, startBlock));
|
|
|
|
// We allocate a full extent from BRM, but only write an abbreviated 256K
|
|
// rows to disk for 1st extent, to conserve disk usage for small tables.
|
|
// One exception here is if we have rolled off partition 0, and we are
|
|
// adding a column to an existing table, then we are adding a column
|
|
// whose first partition is not 0. In this case, we know we are not
|
|
// dealing with a small table, so we init a full extent for 1st extent.
|
|
int totalSize = 0;
|
|
|
|
if (partition == 0)
|
|
totalSize = (INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * width;
|
|
else
|
|
totalSize = allocSize; // full extent if starting partition > 0
|
|
|
|
// Note we can't pass full file name to isDiskSpaceAvail() because the
|
|
// file does not exist yet, but passing DBRoot directory should suffice.
|
|
if (!isDiskSpaceAvail(Config::getDBRootByNum(dbRoot), totalSize))
|
|
{
|
|
return ERR_FILE_DISK_SPACE;
|
|
}
|
|
|
|
// timer.stop( "allocateColExtent" );
|
|
|
|
return createFile(fileName, totalSize, emptyVal, width, colDataType, dbRoot, startLbid);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Delete a file
|
|
* PARAMETERS:
|
|
* fileName - file name with complete path
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NOT_EXIST if file does not exist
|
|
* ERR_FILE_DELETE if can not delete a file
|
|
***********************************************************/
|
|
int FileOp::deleteFile(const char* fileName) const
|
|
{
|
|
if (!exists(fileName))
|
|
return ERR_FILE_NOT_EXIST;
|
|
|
|
return (IDBPolicy::remove(fileName) == -1) ? ERR_FILE_DELETE : NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Deletes all the segment or dictionary store files associated with the
|
|
* specified fid.
|
|
* PARAMETERS:
|
|
* fid - OID of the column being deleted.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_DM_CONVERT_OID if error occurs converting OID to file name
|
|
***********************************************************/
|
|
int FileOp::deleteFile(FID fid) const
|
|
{
|
|
char tempFileName[FILE_NAME_SIZE];
|
|
char oidDirName[FILE_NAME_SIZE];
|
|
char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
|
|
|
|
RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, 0, 0)));
|
|
sprintf(oidDirName, "%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3]);
|
|
// std::cout << "Deleting files for OID " << fid <<
|
|
// "; dirpath: " << oidDirName << std::endl;
|
|
// need check return code.
|
|
RETURN_ON_ERROR(BRMWrapper::getInstance()->deleteOid(fid));
|
|
|
|
std::vector<std::string> dbRootPathList;
|
|
Config::getDBRootPathList(dbRootPathList);
|
|
|
|
int rc;
|
|
|
|
for (unsigned i = 0; i < dbRootPathList.size(); i++)
|
|
{
|
|
char rootOidDirName[FILE_NAME_SIZE];
|
|
rc = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", dbRootPathList[i].c_str(), oidDirName);
|
|
|
|
if (rc == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to remove " << rootOidDirName;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Deletes all the segment or dictionary store files associated with the
|
|
* specified fid.
|
|
* PARAMETERS:
|
|
* fid - OIDs of the column/dictionary being deleted.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_DM_CONVERT_OID if error occurs converting OID to file name
|
|
***********************************************************/
|
|
int FileOp::deleteFiles(const std::vector<int32_t>& fids) const
|
|
{
|
|
char tempFileName[FILE_NAME_SIZE];
|
|
char oidDirName[FILE_NAME_SIZE];
|
|
char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
|
|
std::vector<std::string> dbRootPathList;
|
|
Config::getDBRootPathList(dbRootPathList);
|
|
int rc;
|
|
|
|
for (unsigned n = 0; n < fids.size(); n++)
|
|
{
|
|
RETURN_ON_ERROR((Convertor::oid2FileName(fids[n], tempFileName, dbDir, 0, 0)));
|
|
sprintf(oidDirName, "%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3]);
|
|
// std::cout << "Deleting files for OID " << fid <<
|
|
// "; dirpath: " << oidDirName << std::endl;
|
|
|
|
for (unsigned i = 0; i < dbRootPathList.size(); i++)
|
|
{
|
|
char rootOidDirName[FILE_NAME_SIZE];
|
|
rc = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", dbRootPathList[i].c_str(), oidDirName);
|
|
|
|
if (rc == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to remove " << rootOidDirName;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Deletes all the segment or dictionary store files associated with the
|
|
* specified fid and partition.
|
|
* PARAMETERS:
|
|
* fids - OIDs of the column/dictionary being deleted.
|
|
* partition - the partition number
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_DM_CONVERT_OID if error occurs converting OID to file name
|
|
***********************************************************/
|
|
int FileOp::deletePartitions(const std::vector<OID>& fids,
|
|
const std::vector<BRM::PartitionInfo>& partitions) const
|
|
{
|
|
char tempFileName[FILE_NAME_SIZE];
|
|
char oidDirName[FILE_NAME_SIZE];
|
|
char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
|
|
char rootOidDirName[FILE_NAME_SIZE];
|
|
char partitionDirName[FILE_NAME_SIZE];
|
|
int rcd, rcp;
|
|
|
|
for (uint32_t i = 0; i < partitions.size(); i++)
|
|
{
|
|
RETURN_ON_ERROR((Convertor::oid2FileName(partitions[i].oid, tempFileName, dbDir, partitions[i].lp.pp,
|
|
partitions[i].lp.seg)));
|
|
sprintf(oidDirName, "%s/%s/%s/%s/%s", dbDir[0], dbDir[1], dbDir[2], dbDir[3], dbDir[4]);
|
|
// config expects dbroot starting from 0
|
|
std::string rt(Config::getDBRootByNum(partitions[i].lp.dbroot));
|
|
rcd = snprintf(rootOidDirName, FILE_NAME_SIZE, "%s/%s", rt.c_str(), tempFileName);
|
|
rcp = snprintf(partitionDirName, FILE_NAME_SIZE, "%s/%s", rt.c_str(), oidDirName);
|
|
|
|
if (rcd == FILE_NAME_SIZE || rcp == FILE_NAME_SIZE || IDBPolicy::remove(rootOidDirName) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to remove " << rootOidDirName;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
|
|
list<string> dircontents;
|
|
|
|
if (IDBPolicy::listDirectory(partitionDirName, dircontents) == 0)
|
|
{
|
|
// the directory exists, now check if empty
|
|
if (dircontents.size() == 0)
|
|
{
|
|
// empty directory
|
|
if (IDBPolicy::remove(partitionDirName) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Unable to remove " << rootOidDirName;
|
|
throw std::runtime_error(oss.str());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Deletes the specified db segment file.
|
|
* PARAMETERS:
|
|
* fid - column OID of file to be deleted.
|
|
* dbRoot - DBRoot associated with segment file
|
|
* partition - partition number of associated segment file
|
|
* segment - segment number of associated segment file
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
***********************************************************/
|
|
int FileOp::deleteFile(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment) const
|
|
{
|
|
char fileName[FILE_NAME_SIZE];
|
|
|
|
RETURN_ON_ERROR(getFileName(fid, fileName, dbRoot, partition, segment));
|
|
|
|
return (deleteFile(fileName));
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Check whether a file exists or not
|
|
* PARAMETERS:
|
|
* fileName - file name with complete path
|
|
* RETURN:
|
|
* true if exists, false otherwise
|
|
***********************************************************/
|
|
bool FileOp::exists(const char* fileName) const
|
|
{
|
|
return IDBPolicy::exists(fileName);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Check whether a file exists or not
|
|
* PARAMETERS:
|
|
* fid - OID of file to be checked
|
|
* dbRoot - DBRoot associated with segment file
|
|
* partition - partition number of associated segment file
|
|
* segment - segment number of associated segment file
|
|
* RETURN:
|
|
* true if exists, false otherwise
|
|
***********************************************************/
|
|
bool FileOp::exists(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment) const
|
|
{
|
|
char fileName[FILE_NAME_SIZE];
|
|
|
|
if (getFileName(fid, fileName, dbRoot, partition, segment) != NO_ERROR)
|
|
return false;
|
|
|
|
return exists(fileName);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Check whether an OID directory exists or not
|
|
* PARAMETERS:
|
|
* fid - column or dictionary store OID
|
|
* RETURN:
|
|
* true if exists, false otherwise
|
|
***********************************************************/
|
|
bool FileOp::existsOIDDir(FID fid) const
|
|
{
|
|
char fileName[FILE_NAME_SIZE];
|
|
|
|
if (oid2DirName(fid, fileName) != NO_ERROR)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
return exists(fileName);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Adds an extent to the specified column OID and DBRoot.
|
|
* Function uses ExtentMap to add the extent and determine which
|
|
* specific column segment file the extent is to be added to. If
|
|
* the applicable column segment file does not exist, it is created.
|
|
* If this is the very first file for the specified DBRoot, then the
|
|
* partition and segment number must be specified, else the selected
|
|
* partition and segment numbers are returned. This method tries to
|
|
* optimize full extents creation skiping disk space
|
|
* preallocation(if activated).
|
|
* PARAMETERS:
|
|
* oid - OID of the column to be extended
|
|
* emptyVal - Empty value to be used for oid
|
|
* width - Width of the column (in bytes)
|
|
* hwm - The HWM (or fbo) of the column segment file where the new
|
|
* extent begins.
|
|
* startLbid - The starting LBID for the new extent
|
|
* allocSize - Number of blocks allocated to the extent.
|
|
* dbRoot - The DBRoot of the file with the new extent.
|
|
* partition - The partition number of the file with the new extent.
|
|
* segment - The segment number of the file with the new extent.
|
|
* segFile - The name of the relevant column segment file.
|
|
* pFile - IDBDataFile ptr to the file where the extent is added.
|
|
* newFile - Indicates if the extent was added to a new or existing file
|
|
* hdrs - Contents of the headers if file is compressed.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* else the applicable error code is returned
|
|
***********************************************************/
|
|
int FileOp::extendFile(OID oid, const uint8_t* emptyVal, int width,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm,
|
|
BRM::LBID_t startLbid, int allocSize, uint16_t dbRoot, uint32_t partition,
|
|
uint16_t segment, std::string& segFile, IDBDataFile*& pFile, bool& newFile, char* hdrs)
|
|
{
|
|
int rc = NO_ERROR;
|
|
pFile = 0;
|
|
segFile.clear();
|
|
newFile = false;
|
|
char fileName[FILE_NAME_SIZE];
|
|
|
|
// If starting hwm or fbo is 0 then this is the first extent of a new file,
|
|
// else we are adding an extent to an existing segment file
|
|
if (hwm > 0) // db segment file should exist
|
|
{
|
|
RETURN_ON_ERROR(oid2FileName(oid, fileName, false, dbRoot, partition, segment));
|
|
segFile = fileName;
|
|
|
|
if (!exists(fileName))
|
|
{
|
|
ostringstream oss;
|
|
oss << "oid: " << oid << " with path " << segFile;
|
|
logging::Message::Args args;
|
|
args.add("File not found ");
|
|
args.add(oss.str());
|
|
args.add("");
|
|
args.add("");
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
|
|
return ERR_FILE_NOT_EXIST;
|
|
}
|
|
|
|
pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b"); // old file
|
|
|
|
if (pFile == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "oid: " << oid << " with path " << segFile;
|
|
logging::Message::Args args;
|
|
args.add("Error opening file ");
|
|
args.add(oss.str());
|
|
args.add("");
|
|
args.add("");
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
if (isDebug(DEBUG_1) && getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Opening existing column file (extendFile)"
|
|
<< ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment
|
|
<< "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile;
|
|
getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
// @bug 5349: check that new extent's fbo is not past current EOF
|
|
if (m_compressionType)
|
|
{
|
|
char hdrsIn[compress::CompressInterface::HDR_BUF_LEN * 2];
|
|
RETURN_ON_ERROR(readHeaders(pFile, hdrsIn));
|
|
|
|
std::unique_ptr<compress::CompressInterface> compressor(
|
|
compress::getCompressInterfaceByType(compress::CompressInterface::getCompressionType(hdrsIn)));
|
|
|
|
unsigned int ptrCount = compress::CompressInterface::getPtrCount(hdrsIn);
|
|
unsigned int chunkIndex = 0;
|
|
unsigned int blockOffsetWithinChunk = 0;
|
|
compressor->locateBlock((hwm - 1), chunkIndex, blockOffsetWithinChunk);
|
|
|
|
// std::ostringstream oss1;
|
|
// oss1 << "Extending compressed column file"<<
|
|
// ": OID-" << oid <<
|
|
// "; LBID-" << startLbid <<
|
|
// "; fbo-" << hwm <<
|
|
// "; file-" << segFile <<
|
|
// "; chkidx-" << chunkIndex<<
|
|
// "; numPtrs-"<< ptrCount;
|
|
// getLogger()->logMsg( oss1.str(), MSGLVL_INFO2 );
|
|
|
|
if (chunkIndex >= ptrCount)
|
|
{
|
|
ostringstream oss;
|
|
oss << "oid: " << oid << " with path " << segFile << "; new extent fbo " << hwm
|
|
<< "; number of "
|
|
"compressed chunks "
|
|
<< ptrCount << "; chunkIndex " << chunkIndex;
|
|
logging::Message::Args args;
|
|
args.add("compressed");
|
|
args.add(oss.str());
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0103);
|
|
|
|
// Expand the partial extent to full with emptyVal
|
|
// Since fillCompColumnExtentEmptyChunks() messes with the
|
|
// file on disk, we need to close it and reopen after or
|
|
// the cache isn't updated.
|
|
if ((pFile))
|
|
closeFile(pFile);
|
|
|
|
pFile = NULL;
|
|
string failedTask; // For return error message, if any.
|
|
rc = FileOp::fillCompColumnExtentEmptyChunks(oid, width, emptyVal, dbRoot, partition, segment,
|
|
colDataType, hwm, segFile, failedTask);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
if (getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "FileOp::extendFile: error padding partial compressed extent for "
|
|
<< "column OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-"
|
|
<< segment << "; hwm-" << hwm << " " << failedTask.c_str();
|
|
getLogger()->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b"); // modified file
|
|
}
|
|
|
|
// Get the latest file header for the caller. If a partial extent was filled out,
|
|
// this will be different than when we first read the headers.
|
|
if (hdrs)
|
|
{
|
|
RETURN_ON_ERROR(readHeaders(pFile, hdrs));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
long long fileSize;
|
|
RETURN_ON_ERROR(getFileSize(pFile, fileSize));
|
|
long long calculatedFileSize = ((long long)hwm) * BYTE_PER_BLOCK;
|
|
|
|
// std::ostringstream oss2;
|
|
// oss2 << "Extending uncompressed column file"<<
|
|
// ": OID-" << oid <<
|
|
// "; LBID-" << startLbid <<
|
|
// "; fbo-" << hwm <<
|
|
// "; file-" << segFile <<
|
|
// "; filesize-"<<fileSize;
|
|
// getLogger()->logMsg( oss2.str(), MSGLVL_INFO2 );
|
|
|
|
if (calculatedFileSize > fileSize)
|
|
{
|
|
ostringstream oss;
|
|
oss << "oid: " << oid << " with path " << segFile << "; new extent fbo " << hwm
|
|
<< "; file size (bytes) " << fileSize;
|
|
logging::Message::Args args;
|
|
args.add("uncompressed");
|
|
args.add(oss.str());
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0103);
|
|
// Expand the partial extent to full with emptyVal
|
|
// This generally won't ever happen, as uncompressed files
|
|
// are created with full extents.
|
|
rc = FileOp::expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, width, colDataType);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
if (getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "FileOp::extendFile: error padding partial uncompressed extent for "
|
|
<< "column OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-"
|
|
<< segment << "; hwm-" << hwm;
|
|
getLogger()->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else // db segment file should not exist
|
|
{
|
|
RETURN_ON_ERROR(oid2FileName(oid, fileName, true, dbRoot, partition, segment));
|
|
segFile = fileName;
|
|
|
|
// if obsolete file exists, "w+b" will truncate and write over
|
|
pFile = openFile(fileName, "w+b"); // new file
|
|
if (pFile == 0)
|
|
return ERR_FILE_CREATE;
|
|
|
|
{
|
|
// We presume the path will contain /
|
|
std::string filePath(fileName);
|
|
if (chownDataPath(filePath))
|
|
return ERR_FILE_CHOWN;
|
|
}
|
|
|
|
newFile = true;
|
|
|
|
if (isDebug(DEBUG_1) && getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Opening new column file"
|
|
<< ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment
|
|
<< "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile;
|
|
getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
if ((m_compressionType) && (hdrs))
|
|
{
|
|
compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
|
|
compress::CompressInterface::setLBIDByIndex(hdrs, startLbid, 0);
|
|
}
|
|
}
|
|
|
|
if (!isDiskSpaceAvail(segFile, allocSize))
|
|
{
|
|
return ERR_FILE_DISK_SPACE;
|
|
}
|
|
|
|
// We set to EOF just before we start adding the blocks for the new extent.
|
|
// At one time, I considered changing this to seek to the HWM block, but
|
|
// with compressed files, this is murky; do I find and seek to the chunk
|
|
// containing the HWM block? So I left as-is for now, seeking to EOF.
|
|
rc = setFileOffset(pFile, 0, SEEK_END);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
// Initialize the contents of the extent.
|
|
// MCOL-498 optimize full extent creation.
|
|
rc = initColumnExtent(pFile, dbRoot, allocSize, emptyVal, width, colDataType,
|
|
newFile, // new or existing file
|
|
false, // don't expand; new extent
|
|
false, // add full (not abbreviated) extent
|
|
true, // try to optimize extent creation
|
|
startLbid);
|
|
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Add an extent to the exact segment file specified by
|
|
* the designated OID, DBRoot, partition, and segment.
|
|
* PARAMETERS:
|
|
* oid - OID of the column to be extended
|
|
* emptyVal - Empty value to be used for oid
|
|
* width - Width of the column (in bytes)
|
|
* allocSize - Number of blocks allocated to the extent.
|
|
* dbRoot - The DBRoot of the file with the new extent.
|
|
* partition - The partition number of the file with the new extent.
|
|
* segment - The segment number of the file with the new extent.
|
|
* segFile - The name of the relevant column segment file.
|
|
* startLbid - The starting LBID for the new extent
|
|
* newFile - Indicates if the extent was added to a new or existing file
|
|
* hdrs - Contents of the headers if file is compressed.
|
|
* RETURN:
|
|
* none
|
|
***********************************************************/
|
|
int FileOp::addExtentExactFile(OID oid, const uint8_t* emptyVal, int width, int& allocSize, uint16_t dbRoot,
|
|
uint32_t partition, uint16_t segment,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType, std::string& segFile,
|
|
BRM::LBID_t& startLbid, bool& newFile, char* hdrs)
|
|
{
|
|
int rc = NO_ERROR;
|
|
IDBDataFile* pFile = 0;
|
|
segFile.clear();
|
|
newFile = false;
|
|
HWM hwm;
|
|
|
|
// Allocate the new extent in the ExtentMap
|
|
RETURN_ON_ERROR(BRMWrapper::getInstance()->allocateColExtentExactFile(
|
|
oid, width, dbRoot, partition, segment, colDataType, startLbid, allocSize, hwm));
|
|
|
|
// Determine the existence of the "next" segment file, and either open
|
|
// or create the segment file accordingly.
|
|
if (exists(oid, dbRoot, partition, segment))
|
|
{
|
|
pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b"); // old file
|
|
|
|
if (pFile == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "oid: " << oid << " with path " << segFile;
|
|
logging::Message::Args args;
|
|
args.add("Error opening file ");
|
|
args.add(oss.str());
|
|
args.add("");
|
|
args.add("");
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
if (isDebug(DEBUG_1) && getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Opening existing column file"
|
|
<< ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment
|
|
<< "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile;
|
|
getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
if ((m_compressionType) && (hdrs))
|
|
{
|
|
rc = readHeaders(pFile, hdrs);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
char fileName[FILE_NAME_SIZE];
|
|
RETURN_ON_ERROR(oid2FileName(oid, fileName, true, dbRoot, partition, segment));
|
|
segFile = fileName;
|
|
|
|
pFile = openFile(fileName, "w+b"); // new file
|
|
|
|
if (pFile == 0)
|
|
return ERR_FILE_CREATE;
|
|
|
|
newFile = true;
|
|
|
|
if (isDebug(DEBUG_1) && getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Opening new column file"
|
|
<< ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment
|
|
<< "; LBID-" << startLbid << "; hwm-" << hwm << "; file-" << segFile;
|
|
getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
if ((m_compressionType) && (hdrs))
|
|
{
|
|
compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
|
|
compress::CompressInterface::setLBIDByIndex(hdrs, startLbid, 0);
|
|
}
|
|
}
|
|
|
|
if (!isDiskSpaceAvail(segFile, allocSize))
|
|
{
|
|
return ERR_FILE_DISK_SPACE;
|
|
}
|
|
|
|
// We set to EOF just before we start adding the blocks for the new extent.
|
|
// At one time, I considered changing this to seek to the HWM block, but
|
|
// with compressed files, this is murky; do I find and seek to the chunk
|
|
// containing the HWM block? So I left as-is for now, seeking to EOF.
|
|
rc = setFileOffset(pFile, 0, SEEK_END);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
// Initialize the contents of the extent.
|
|
// CS doesn't optimize file operations to have a valid
|
|
// segment files with empty magics
|
|
rc = initColumnExtent(pFile, dbRoot, allocSize, emptyVal, width, colDataType,
|
|
newFile, // new or existing file
|
|
false, // don't expand; new extent
|
|
false, // add full (not abbreviated) extent
|
|
startLbid);
|
|
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write out (initialize) an extent in a column file.
|
|
* A mutex is used for each DBRoot, to prevent contention between
|
|
* threads, because if multiple threads are creating extents on
|
|
* the same DBRoot at the same time, the extents can become
|
|
* fragmented. It is best to only create one extent at a time
|
|
* on each DBRoot.
|
|
* This function can be used to initialize an entirely new extent, or
|
|
* to finish initializing an extent that has already been started.
|
|
* nBlocks controls how many 8192-byte blocks are to be written out.
|
|
* If bOptExtension is set then method first checks config for
|
|
* DBRootX.Prealloc. If it is disabled then it skips disk space
|
|
* preallocation.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* dbRoot (in) - DBRoot of pFile
|
|
* nBlocks (in) - number of blocks to be written for an extent
|
|
* emptyVal(in) - empty value to be used for column data values
|
|
* width (in) - width of the applicable column
|
|
* bNewFile(in) - are we adding an extent to a new file, in which case
|
|
* headers will be included "if" it is a compressed file.
|
|
* bExpandExtent (in) - Expand existing extent, or initialize a new one
|
|
* bAbbrevExtent(in) - if creating new extent, is it an abbreviated extent
|
|
* bOptExtension(in) - skip full extent preallocation.
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::initColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, const uint8_t* emptyVal,
|
|
int width, execplan::CalpontSystemCatalog::ColDataType colDataType,
|
|
bool bNewFile, bool bExpandExtent, bool bAbbrevExtent, bool bOptExtension,
|
|
int64_t lbid)
|
|
{
|
|
if ((bNewFile) && (m_compressionType))
|
|
{
|
|
char hdrs[CompressInterface::HDR_BUF_LEN * 2];
|
|
compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
|
|
compress::CompressInterface::setLBIDByIndex(hdrs, lbid, 0);
|
|
if (bAbbrevExtent)
|
|
compress::CompressInterface::setBlockCount(hdrs, nBlocks);
|
|
|
|
RETURN_ON_ERROR(writeHeaders(pFile, hdrs));
|
|
}
|
|
|
|
// @bug5769 Don't initialize extents or truncate db files on HDFS
|
|
if (idbdatafile::IDBPolicy::useHdfs())
|
|
{
|
|
//@Bug 3219. update the compression header after the extent is expanded.
|
|
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
|
|
{
|
|
updateColumnExtent(pFile, nBlocks, lbid);
|
|
}
|
|
|
|
// @bug 2378. Synchronize here to avoid write buffer pile up too much,
|
|
// which could cause controllernode to timeout later when it needs to
|
|
// save a snapshot.
|
|
pFile->flush();
|
|
}
|
|
else
|
|
{
|
|
// Create vector of mutexes used to serialize extent access per DBRoot
|
|
initDbRootExtentMutexes();
|
|
|
|
// MCOL-498 Skip the huge preallocations if the option is set
|
|
// for the dbroot. This check is skiped for abbreviated extent.
|
|
// IMO it is better to check bool then to call a function.
|
|
if (bOptExtension)
|
|
{
|
|
bOptExtension = (idbdatafile::IDBPolicy::PreallocSpaceDisabled(dbRoot)) ? bOptExtension : false;
|
|
}
|
|
// Reduce number of blocks allocated for abbreviated extents thus
|
|
// CS writes less when creates a new table. This couldn't be zero
|
|
// b/c Snappy compressed file format doesn't tolerate empty files.
|
|
int realNBlocks = (bOptExtension && nBlocks <= MAX_INITIAL_EXTENT_BLOCKS_TO_DISK) ? 3 : nBlocks;
|
|
|
|
// Determine the number of blocks in each call to fwrite(), and the
|
|
// number of fwrite() calls to make, based on this. In other words,
|
|
// we put a cap on the "writeSize" so that we don't allocate and write
|
|
// an entire extent at once for the 64M row extents. If we are
|
|
// expanding an abbreviated 64M extent, we may not have an even
|
|
// multiple of MAX_NBLOCKS to write; remWriteSize is the number of
|
|
// blocks above and beyond loopCount*MAX_NBLOCKS.
|
|
int writeSize = realNBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size
|
|
int loopCount = 1;
|
|
int remWriteSize = 0;
|
|
|
|
if (realNBlocks > MAX_NBLOCKS) // 64M row extent size
|
|
{
|
|
writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
|
|
loopCount = realNBlocks / MAX_NBLOCKS;
|
|
remWriteSize = realNBlocks - (loopCount * MAX_NBLOCKS);
|
|
}
|
|
|
|
// Allocate a buffer, initialize it, and use it to create the extent
|
|
idbassert(dbRoot > 0);
|
|
#ifdef PROFILE
|
|
|
|
if (bExpandExtent)
|
|
Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT);
|
|
else
|
|
Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT);
|
|
|
|
#endif
|
|
boost::mutex::scoped_lock lk(m_DbRootAddExtentMutexes[dbRoot]);
|
|
#ifdef PROFILE
|
|
|
|
if (bExpandExtent)
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_COL_EXTENT);
|
|
else
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_COL_EXTENT);
|
|
#endif
|
|
// Skip space preallocation if configured so
|
|
// fallback to sequential write otherwise.
|
|
// Couldn't avoid preallocation for full extents,
|
|
// e.g. ADD COLUMN DDL b/c CS has to fill the file
|
|
// with empty magics.
|
|
if (!bOptExtension || !m_compressionType)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_INIT_COL_EXTENT);
|
|
#endif
|
|
// Allocate buffer, store it in scoped_array to insure it's deletion.
|
|
// Create scope {...} to manage deletion of writeBuf.
|
|
{
|
|
unsigned char* writeBuf = new unsigned char[writeSize];
|
|
boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
|
|
|
|
setEmptyBuf(writeBuf, writeSize, emptyVal, width);
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_INIT_COL_EXTENT);
|
|
|
|
if (bExpandExtent)
|
|
Stats::startParseEvent(WE_STATS_EXPAND_COL_EXTENT);
|
|
else
|
|
Stats::startParseEvent(WE_STATS_CREATE_COL_EXTENT);
|
|
|
|
#endif
|
|
|
|
// std::ostringstream oss;
|
|
// oss << "initColExtent: width-" << width <<
|
|
//"; loopCount-" << loopCount <<
|
|
//"; writeSize-" << writeSize;
|
|
// std::cout << oss.str() << std::endl;
|
|
if (remWriteSize > 0)
|
|
{
|
|
if (pFile->write(writeBuf, remWriteSize) != remWriteSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
|
|
for (int j = 0; j < loopCount; j++)
|
|
{
|
|
if (pFile->write(writeBuf, writeSize) != writeSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
}
|
|
|
|
//@Bug 3219. update the compression header after the extent is expanded.
|
|
if ((!bNewFile) && (m_compressionType) && (bExpandExtent))
|
|
{
|
|
updateColumnExtent(pFile, nBlocks, lbid);
|
|
}
|
|
|
|
// @bug 2378. Synchronize here to avoid write buffer pile up too much,
|
|
// which could cause controllernode to timeout later when it needs to
|
|
// save a snapshot.
|
|
pFile->flush();
|
|
|
|
#ifdef PROFILE
|
|
if (bExpandExtent)
|
|
Stats::stopParseEvent(WE_STATS_EXPAND_COL_EXTENT);
|
|
else
|
|
Stats::stopParseEvent(WE_STATS_CREATE_COL_EXTENT);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write (initialize) an abbreviated compressed extent in a column file.
|
|
* nBlocks controls how many 8192-byte blocks are to be written out.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* dbRoot (in) - DBRoot of pFile
|
|
* nBlocks (in) - number of blocks to be written for an extent
|
|
* emptyVal(in) - empty value to be used for column data values
|
|
* width (in) - width of the applicable column
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::initAbbrevCompColumnExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks,
|
|
const uint8_t* emptyVal, int width, BRM::LBID_t startLBID,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType)
|
|
{
|
|
// Reserve disk space for optimized abbreviated extent
|
|
int rc = initColumnExtent(pFile, dbRoot, nBlocks, emptyVal, width, colDataType,
|
|
true, // new file
|
|
false, // don't expand; add new extent
|
|
true, // add abbreviated extent
|
|
true, // optimize the initial extent
|
|
startLBID);
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_COMPRESS_COL_INIT_ABBREV_EXT);
|
|
#endif
|
|
|
|
char hdrs[CompressInterface::HDR_BUF_LEN * 2];
|
|
rc = writeInitialCompColumnChunk(pFile, nBlocks, INITIAL_EXTENT_ROWS_TO_DISK, emptyVal, width, startLBID,
|
|
colDataType, hdrs);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
return rc;
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_COMPRESS_COL_INIT_ABBREV_EXT);
|
|
#endif
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write (initialize) the first extent in a compressed db file.
|
|
* PARAMETERS:
|
|
* pFile - IDBDataFile* of column segment file to be written to
|
|
* nBlocksAllocated - number of blocks allocated to the extent; should be
|
|
* enough blocks for a full extent, unless it's the abbreviated extent
|
|
* nRows - number of rows to initialize, or write out to the file
|
|
* emptyVal - empty value to be used for column data values
|
|
* width - width of the applicable column (in bytes)
|
|
* hdrs - (in/out) chunk pointer headers
|
|
* RETURN:
|
|
* returns NO_ERROR on success.
|
|
***********************************************************/
|
|
int FileOp::writeInitialCompColumnChunk(IDBDataFile* pFile, int nBlocksAllocated, int nRows,
|
|
const uint8_t* emptyVal, int width, BRM::LBID_t startLBID,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType, char* hdrs)
|
|
{
|
|
const size_t INPUT_BUFFER_SIZE = nRows * width;
|
|
char* toBeCompressedInput = new char[INPUT_BUFFER_SIZE];
|
|
unsigned int userPaddingBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
|
|
// Compress an initialized abbreviated extent
|
|
// Initially m_compressionType == 0, but this function is used under
|
|
// condtion where m_compressionType > 0.
|
|
std::unique_ptr<CompressInterface> compressor(
|
|
compress::getCompressInterfaceByType(m_compressionType, userPaddingBytes));
|
|
const size_t OUTPUT_BUFFER_SIZE = compressor->maxCompressedSize(INPUT_BUFFER_SIZE) + userPaddingBytes +
|
|
compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE;
|
|
|
|
unsigned char* compressedOutput = new unsigned char[OUTPUT_BUFFER_SIZE];
|
|
size_t outputLen = OUTPUT_BUFFER_SIZE;
|
|
boost::scoped_array<char> toBeCompressedInputPtr(toBeCompressedInput);
|
|
boost::scoped_array<unsigned char> compressedOutputPtr(compressedOutput);
|
|
|
|
setEmptyBuf((unsigned char*)toBeCompressedInput, INPUT_BUFFER_SIZE, emptyVal, width);
|
|
|
|
int rc = compressor->compressBlock(toBeCompressedInput, INPUT_BUFFER_SIZE, compressedOutput, outputLen);
|
|
|
|
if (rc != 0)
|
|
{
|
|
return ERR_COMP_COMPRESS;
|
|
}
|
|
|
|
// Round up the compressed chunk size
|
|
rc = compressor->padCompressedChunks(compressedOutput, outputLen, OUTPUT_BUFFER_SIZE);
|
|
|
|
if (rc != 0)
|
|
{
|
|
return ERR_COMP_PAD_DATA;
|
|
}
|
|
|
|
// std::cout << "Uncompressed rowCount: " << nRows <<
|
|
// "; colWidth: " << width <<
|
|
// "; uncompByteCnt: " << INPUT_BUFFER_SIZE <<
|
|
// "; blkAllocCnt: " << nBlocksAllocated <<
|
|
// "; compressedByteCnt: " << outputLen << std::endl;
|
|
|
|
compress::CompressInterface::initHdr(hdrs, width, colDataType, m_compressionType);
|
|
compress::CompressInterface::setBlockCount(hdrs, nBlocksAllocated);
|
|
compress::CompressInterface::setLBIDByIndex(hdrs, startLBID, 0);
|
|
|
|
// Store compression pointers in the header
|
|
std::vector<uint64_t> ptrs;
|
|
ptrs.push_back(CompressInterface::HDR_BUF_LEN * 2);
|
|
ptrs.push_back(outputLen + (CompressInterface::HDR_BUF_LEN * 2));
|
|
compress::CompressInterface::storePtrs(ptrs, hdrs);
|
|
|
|
RETURN_ON_ERROR(writeHeaders(pFile, hdrs));
|
|
|
|
// Write the compressed data
|
|
size_t writtenLen = pFile->write(compressedOutput, outputLen);
|
|
if (writtenLen != outputLen)
|
|
return ERR_FILE_WRITE;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Fill specified compressed extent with empty value chunks.
|
|
* PARAMETERS:
|
|
* oid - OID for relevant column
|
|
* colWidth - width in bytes of this column
|
|
* emptyVal - empty value to be used in filling empty chunks
|
|
* dbRoot - DBRoot of extent to be filled
|
|
* partition - partition of extent to be filled
|
|
* segment - segment file number of extent to be filled
|
|
* colDataType - Column data type
|
|
* hwm - proposed new HWM of filled in extent
|
|
* segFile - (out) name of updated segment file
|
|
* failedTask - (out) if error occurs, this is the task that failed
|
|
* RETURN:
|
|
* returns NO_ERROR if success.
|
|
***********************************************************/
|
|
int FileOp::fillCompColumnExtentEmptyChunks(OID oid, int colWidth, const uint8_t* emptyVal, uint16_t dbRoot,
|
|
uint32_t partition, uint16_t segment,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType, HWM hwm,
|
|
std::string& segFile, std::string& failedTask)
|
|
{
|
|
int rc = NO_ERROR;
|
|
segFile.clear();
|
|
failedTask.clear();
|
|
|
|
// Open the file and read the headers with the compression chunk pointers
|
|
// @bug 5572 - HDFS usage: incorporate *.tmp file backup flag
|
|
IDBDataFile* pFile = openFile(oid, dbRoot, partition, segment, segFile, "r+b", DEFAULT_COLSIZ, true);
|
|
|
|
if (!pFile)
|
|
{
|
|
failedTask = "Opening file";
|
|
ostringstream oss;
|
|
oss << "oid: " << oid << " with path " << segFile;
|
|
logging::Message::Args args;
|
|
args.add("Error opening file ");
|
|
args.add(oss.str());
|
|
args.add("");
|
|
args.add("");
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0001);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
char hdrs[CompressInterface::HDR_BUF_LEN * 2];
|
|
rc = readHeaders(pFile, hdrs);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Reading headers";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
|
|
int userPadBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
|
|
|
|
std::unique_ptr<CompressInterface> compressor(compress::getCompressInterfaceByType(
|
|
compress::CompressInterface::getCompressionType(hdrs), userPadBytes));
|
|
|
|
CompChunkPtrList chunkPtrs;
|
|
int rcComp = compress::CompressInterface::getPtrList(hdrs, chunkPtrs);
|
|
|
|
if (rcComp != 0)
|
|
{
|
|
failedTask = "Getting header ptrs";
|
|
closeFile(pFile);
|
|
return ERR_COMP_PARSE_HDRS;
|
|
}
|
|
|
|
// Nothing to do if the proposed HWM is < the current block count
|
|
uint64_t blkCount = compress::CompressInterface::getBlockCount(hdrs);
|
|
|
|
if (blkCount > (hwm + 1))
|
|
{
|
|
closeFile(pFile);
|
|
return NO_ERROR;
|
|
}
|
|
|
|
const unsigned int ROWS_PER_EXTENT = BRMWrapper::getInstance()->getInstance()->getExtentRows();
|
|
const unsigned int ROWS_PER_CHUNK = CompressInterface::UNCOMPRESSED_INBUF_LEN / colWidth;
|
|
const unsigned int CHUNKS_PER_EXTENT = ROWS_PER_EXTENT / ROWS_PER_CHUNK;
|
|
|
|
// If this is an abbreviated extent, we first expand to a full extent
|
|
// @bug 4340 - support moving the DBRoot with a single abbrev extent
|
|
if ((chunkPtrs.size() == 1) &&
|
|
((blkCount * BYTE_PER_BLOCK) == (uint64_t)(INITIAL_EXTENT_ROWS_TO_DISK * colWidth)))
|
|
{
|
|
if (getLogger())
|
|
{
|
|
std::ostringstream oss;
|
|
oss << "Converting abbreviated partial extent to full extent for"
|
|
<< ": OID-" << oid << "; DBRoot-" << dbRoot << "; part-" << partition << "; seg-" << segment
|
|
<< "; file-" << segFile << "; wid-" << colWidth << "; oldBlkCnt-" << blkCount << "; newBlkCnt-"
|
|
<< ((ROWS_PER_EXTENT * colWidth) / BYTE_PER_BLOCK);
|
|
getLogger()->logMsg(oss.str(), MSGLVL_INFO2);
|
|
}
|
|
|
|
off64_t endHdrsOffset = pFile->tell();
|
|
rc = expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, colWidth, colDataType);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Expanding abbreviated extent";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
|
|
CompChunkPtr chunkOutPtr;
|
|
rc = expandAbbrevColumnChunk(pFile, emptyVal, colWidth, chunkPtrs[0], chunkOutPtr, hdrs);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Expanding abbreviated chunk";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
|
|
chunkPtrs[0] = chunkOutPtr; // update chunkPtrs with new chunk size
|
|
|
|
rc = setFileOffset(pFile, endHdrsOffset);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Positioning file to end of headers";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
|
|
// Update block count to reflect a full extent
|
|
blkCount = (ROWS_PER_EXTENT * colWidth) / BYTE_PER_BLOCK;
|
|
compress::CompressInterface::setBlockCount(hdrs, blkCount);
|
|
}
|
|
|
|
// Calculate the number of empty chunks we need to add to fill this extent
|
|
unsigned numChunksToFill = 0;
|
|
ldiv_t ldivResult = ldiv(chunkPtrs.size(), CHUNKS_PER_EXTENT);
|
|
|
|
if (ldivResult.rem != 0)
|
|
{
|
|
numChunksToFill = CHUNKS_PER_EXTENT - ldivResult.rem;
|
|
}
|
|
|
|
#if 0
|
|
std::cout << "Number of allocated blocks: " <<
|
|
compressor.getBlockCount(hdrs) << std::endl;
|
|
std::cout << "Pointer Header Size (in bytes): " <<
|
|
(compressor.getHdrSize(hdrs) -
|
|
CompressInterface::HDR_BUF_LEN) << std::endl;
|
|
std::cout << "Chunk Pointers (offset,length): " << std::endl;
|
|
|
|
for (unsigned k = 0; k < chunkPtrs.size(); k++)
|
|
{
|
|
std::cout << " " << k << ". " << chunkPtrs[k].first <<
|
|
" , " << chunkPtrs[k].second << std::endl;
|
|
}
|
|
|
|
std::cout << std::endl;
|
|
std::cout << "Number of chunks to fill in: " << numChunksToFill <<
|
|
std::endl << std::endl;
|
|
#endif
|
|
|
|
off64_t endOffset = 0;
|
|
|
|
// Fill in or add necessary remaining empty chunks
|
|
if (numChunksToFill > 0)
|
|
{
|
|
const int IN_BUF_LEN = CompressInterface::UNCOMPRESSED_INBUF_LEN;
|
|
const int OUT_BUF_LEN = compressor->maxCompressedSize(IN_BUF_LEN) + userPadBytes +
|
|
compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE;
|
|
|
|
// Allocate buffer, and store in scoped_array to insure it's deletion.
|
|
// Create scope {...} to manage deletion of buffers
|
|
{
|
|
char* toBeCompressedBuf = new char[IN_BUF_LEN];
|
|
unsigned char* compressedBuf = new unsigned char[OUT_BUF_LEN];
|
|
boost::scoped_array<char> toBeCompressedInputPtr(toBeCompressedBuf);
|
|
boost::scoped_array<unsigned char> compressedOutputPtr(compressedBuf);
|
|
|
|
// Compress and then pad the compressed chunk
|
|
setEmptyBuf((unsigned char*)toBeCompressedBuf, IN_BUF_LEN, emptyVal, colWidth);
|
|
size_t outputLen = OUT_BUF_LEN;
|
|
rcComp = compressor->compressBlock(toBeCompressedBuf, IN_BUF_LEN, compressedBuf, outputLen);
|
|
|
|
if (rcComp != 0)
|
|
{
|
|
failedTask = "Compressing chunk";
|
|
closeFile(pFile);
|
|
return ERR_COMP_COMPRESS;
|
|
}
|
|
|
|
toBeCompressedInputPtr.reset(); // release memory
|
|
|
|
rcComp = compressor->padCompressedChunks(compressedBuf, outputLen, OUT_BUF_LEN);
|
|
|
|
if (rcComp != 0)
|
|
{
|
|
failedTask = "Padding compressed chunk";
|
|
closeFile(pFile);
|
|
return ERR_COMP_PAD_DATA;
|
|
}
|
|
|
|
// Position file to write empty chunks; default to end of headers
|
|
// in case there are no chunks listed in the header
|
|
off64_t startOffset = pFile->tell();
|
|
|
|
if (chunkPtrs.size() > 0)
|
|
{
|
|
startOffset = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second;
|
|
rc = setFileOffset(pFile, startOffset);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Positioning file to begin filling chunks";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// Write chunks needed to fill out the current extent, add chunk ptr
|
|
for (unsigned k = 0; k < numChunksToFill; k++)
|
|
{
|
|
rc = writeFile(pFile, (unsigned char*)compressedBuf, outputLen);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Writing a chunk";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
|
|
CompChunkPtr compChunk(startOffset, outputLen);
|
|
chunkPtrs.push_back(compChunk);
|
|
startOffset = pFile->tell();
|
|
}
|
|
} // end of scope for boost scoped array pointers
|
|
|
|
endOffset = pFile->tell();
|
|
|
|
// Update the compressed chunk pointers in the header
|
|
std::vector<uint64_t> ptrs;
|
|
|
|
for (unsigned i = 0; i < chunkPtrs.size(); i++)
|
|
{
|
|
ptrs.push_back(chunkPtrs[i].first);
|
|
}
|
|
|
|
ptrs.push_back(chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second);
|
|
compress::CompressInterface::storePtrs(ptrs, hdrs);
|
|
|
|
rc = writeHeaders(pFile, hdrs);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Writing headers";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
} // end of "numChunksToFill > 0"
|
|
else
|
|
{
|
|
// if no chunks to add, then set endOffset to truncate the db file
|
|
// strictly based on the chunks that are already in the file
|
|
if (chunkPtrs.size() > 0)
|
|
{
|
|
endOffset = chunkPtrs[chunkPtrs.size() - 1].first + chunkPtrs[chunkPtrs.size() - 1].second;
|
|
}
|
|
}
|
|
|
|
// Truncate the file to release unused space for the extent we just filled
|
|
if (endOffset > 0)
|
|
{
|
|
rc = truncateFile(pFile, endOffset);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
failedTask = "Truncating file";
|
|
closeFile(pFile);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
closeFile(pFile);
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Expand first chunk in pFile from an abbreviated chunk for an abbreviated
|
|
* extent to a full compressed chunk for a full extent.
|
|
* PARAMETERS:
|
|
* pFile - file to be updated
|
|
* colWidth - width in bytes of this column
|
|
* emptyVal - empty value to be used in filling empty chunks
|
|
* chunkInPtr - chunk pointer referencing first (abbrev) chunk
|
|
* chunkOutPtr- (out) updated chunk ptr referencing first (full) chunk
|
|
* RETURN:
|
|
* returns NO_ERROR if success.
|
|
***********************************************************/
|
|
int FileOp::expandAbbrevColumnChunk(IDBDataFile* pFile, const uint8_t* emptyVal, int colWidth,
|
|
const CompChunkPtr& chunkInPtr, CompChunkPtr& chunkOutPtr,
|
|
const char* hdrs)
|
|
{
|
|
int userPadBytes = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
|
|
auto realCompressionType = m_compressionType;
|
|
if (hdrs)
|
|
{
|
|
realCompressionType = compress::CompressInterface::getCompressionType(hdrs);
|
|
}
|
|
std::unique_ptr<CompressInterface> compressor(
|
|
compress::getCompressInterfaceByType(realCompressionType, userPadBytes));
|
|
|
|
const int IN_BUF_LEN = CompressInterface::UNCOMPRESSED_INBUF_LEN;
|
|
const int OUT_BUF_LEN = compressor->maxCompressedSize(IN_BUF_LEN) + userPadBytes +
|
|
compress::CompressInterface::COMPRESSED_CHUNK_INCREMENT_SIZE;
|
|
|
|
char* toBeCompressedBuf = new char[IN_BUF_LEN];
|
|
boost::scoped_array<char> toBeCompressedPtr(toBeCompressedBuf);
|
|
|
|
setEmptyBuf((unsigned char*)toBeCompressedBuf, IN_BUF_LEN, emptyVal, colWidth);
|
|
|
|
RETURN_ON_ERROR(setFileOffset(pFile, chunkInPtr.first, SEEK_SET));
|
|
|
|
char* compressedInBuf = new char[chunkInPtr.second];
|
|
boost::scoped_array<char> compressedInBufPtr(compressedInBuf);
|
|
RETURN_ON_ERROR(readFile(pFile, (unsigned char*)compressedInBuf, chunkInPtr.second));
|
|
|
|
// Uncompress an "abbreviated" chunk into our 4MB buffer
|
|
size_t outputLen = IN_BUF_LEN;
|
|
int rc = compressor->uncompressBlock(compressedInBuf, chunkInPtr.second, (unsigned char*)toBeCompressedBuf,
|
|
outputLen);
|
|
|
|
if (rc != 0)
|
|
{
|
|
return ERR_COMP_UNCOMPRESS;
|
|
}
|
|
|
|
compressedInBufPtr.reset(); // release memory
|
|
|
|
RETURN_ON_ERROR(setFileOffset(pFile, chunkInPtr.first, SEEK_SET));
|
|
|
|
unsigned char* compressedOutBuf = new unsigned char[OUT_BUF_LEN];
|
|
boost::scoped_array<unsigned char> compressedOutBufPtr(compressedOutBuf);
|
|
|
|
// Compress the data we just read, as a "full" 4MB chunk
|
|
outputLen = OUT_BUF_LEN;
|
|
rc = compressor->compressBlock(reinterpret_cast<char*>(toBeCompressedBuf), IN_BUF_LEN, compressedOutBuf,
|
|
outputLen);
|
|
|
|
if (rc != 0)
|
|
{
|
|
return ERR_COMP_COMPRESS;
|
|
}
|
|
|
|
// Round up the compressed chunk size
|
|
rc = compressor->padCompressedChunks(compressedOutBuf, outputLen, OUT_BUF_LEN);
|
|
|
|
if (rc != 0)
|
|
{
|
|
return ERR_COMP_PAD_DATA;
|
|
}
|
|
|
|
RETURN_ON_ERROR(writeFile(pFile, compressedOutBuf, outputLen));
|
|
|
|
chunkOutPtr.first = chunkInPtr.first;
|
|
chunkOutPtr.second = outputLen;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write headers to a compressed column file.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* hdr (in) - header pointers to be written
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::writeHeaders(IDBDataFile* pFile, const char* hdr) const
|
|
{
|
|
RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_SET));
|
|
|
|
// Write the headers
|
|
if (pFile->write(hdr, CompressInterface::HDR_BUF_LEN * 2) != CompressInterface::HDR_BUF_LEN * 2)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write headers to a compressed column or dictionary file.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* controlHdr (in) - control header to be written
|
|
* pointerHdr (in) - pointer header to be written
|
|
* ptrHdrSize (in) - size (in bytes) of pointer header
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE or ERR_FILE_SEEK if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::writeHeaders(IDBDataFile* pFile, const char* controlHdr, const char* pointerHdr,
|
|
uint64_t ptrHdrSize) const
|
|
{
|
|
RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_SET));
|
|
|
|
// Write the control header
|
|
if (pFile->write(controlHdr, CompressInterface::HDR_BUF_LEN) != CompressInterface::HDR_BUF_LEN)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
|
|
// Write the pointer header
|
|
if (pFile->write(pointerHdr, ptrHdrSize) != (ssize_t)ptrHdrSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write out (initialize) an extent in a dictionary store file.
|
|
* A mutex is used for each DBRoot, to prevent contention between
|
|
* threads, because if multiple threads are creating extents on
|
|
* the same DBRoot at the same time, the extents can become
|
|
* fragmented. It is best to only create one extent at a time
|
|
* on each DBRoot.
|
|
* This function can be used to initialize an entirely new extent, or
|
|
* to finish initializing an extent that has already been started.
|
|
* nBlocks controls how many 8192-byte blocks are to be written out.
|
|
* If bOptExtension is set then method first checks config for
|
|
* DBRootX.Prealloc. If it is disabled then it skips disk space
|
|
* preallocation.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* dbRoot (in) - DBRoot of pFile
|
|
* nBlocks (in) - number of blocks to be written for an extent
|
|
* blockHdrInit(in) - data used to initialize each block
|
|
* blockHdrInitSize(in) - number of bytes in blockHdrInit
|
|
* bExpandExtent (in) - Expand existing extent, or initialize a new one
|
|
* bOptExtension(in) - skip full extent preallocation.
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::initDctnryExtent(IDBDataFile* pFile, uint16_t dbRoot, int nBlocks, unsigned char* blockHdrInit,
|
|
int blockHdrInitSize, bool bExpandExtent, bool bOptExtension, int64_t lbid)
|
|
{
|
|
// @bug5769 Don't initialize extents or truncate db files on HDFS
|
|
if (idbdatafile::IDBPolicy::useHdfs())
|
|
{
|
|
if (m_compressionType)
|
|
updateDctnryExtent(pFile, nBlocks, lbid);
|
|
|
|
// Synchronize to avoid write buffer pile up too much, which could cause
|
|
// controllernode to timeout later when it needs to save a snapshot.
|
|
pFile->flush();
|
|
}
|
|
else
|
|
{
|
|
// Create vector of mutexes used to serialize extent access per DBRoot
|
|
initDbRootExtentMutexes();
|
|
|
|
// MCOL-498 Skip the huge preallocations if the option is set
|
|
// for the dbroot. This check is skiped for abbreviated extent.
|
|
// IMO it is better to check bool then to call a function.
|
|
// CS uses non-compressed dict files for its system catalog so
|
|
// CS doesn't optimize non-compressed dict creation.
|
|
if (bOptExtension)
|
|
{
|
|
bOptExtension = (idbdatafile::IDBPolicy::PreallocSpaceDisabled(dbRoot) && m_compressionType)
|
|
? bOptExtension
|
|
: false;
|
|
}
|
|
// Reduce number of blocks allocated for abbreviated extents thus
|
|
// CS writes less when creates a new table. This couldn't be zero
|
|
// b/c Snappy compressed file format doesn't tolerate empty files.
|
|
int realNBlocks = (bOptExtension && nBlocks <= MAX_INITIAL_EXTENT_BLOCKS_TO_DISK) ? 1 : nBlocks;
|
|
|
|
// Determine the number of blocks in each call to fwrite(), and the
|
|
// number of fwrite() calls to make, based on this. In other words,
|
|
// we put a cap on the "writeSize" so that we don't allocate and write
|
|
// an entire extent at once for the 64M row extents. If we are
|
|
// expanding an abbreviated 64M extent, we may not have an even
|
|
// multiple of MAX_NBLOCKS to write; remWriteSize is the number of
|
|
// blocks above and beyond loopCount*MAX_NBLOCKS.
|
|
int writeSize = realNBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size
|
|
int loopCount = 1;
|
|
int remWriteSize = 0;
|
|
|
|
if (realNBlocks > MAX_NBLOCKS) // 64M row extent size
|
|
{
|
|
writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
|
|
loopCount = realNBlocks / MAX_NBLOCKS;
|
|
remWriteSize = realNBlocks - (loopCount * MAX_NBLOCKS);
|
|
}
|
|
|
|
// Allocate a buffer, initialize it, and use it to create the extent
|
|
idbassert(dbRoot > 0);
|
|
|
|
#ifdef PROFILE
|
|
if (bExpandExtent)
|
|
Stats::startParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT);
|
|
else
|
|
Stats::startParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT);
|
|
#endif
|
|
|
|
boost::mutex::scoped_lock lk(m_DbRootAddExtentMutexes[dbRoot]);
|
|
|
|
#ifdef PROFILE
|
|
if (bExpandExtent)
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_EXPAND_DCT_EXTENT);
|
|
else
|
|
Stats::stopParseEvent(WE_STATS_WAIT_TO_CREATE_DCT_EXTENT);
|
|
#endif
|
|
// Skip space preallocation if configured so
|
|
// fallback to sequential write otherwise.
|
|
// Couldn't avoid preallocation for full extents,
|
|
// e.g. ADD COLUMN DDL b/c CS has to fill the file
|
|
// with empty magics.
|
|
if (!bOptExtension)
|
|
{
|
|
// Allocate buffer, and store in scoped_array to insure it's deletion.
|
|
// Create scope {...} to manage deletion of writeBuf.
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_INIT_DCT_EXTENT);
|
|
#endif
|
|
|
|
unsigned char* writeBuf = new unsigned char[writeSize];
|
|
boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
|
|
|
|
memset(writeBuf, 0, writeSize);
|
|
|
|
for (int i = 0; i < realNBlocks; i++)
|
|
{
|
|
memcpy(writeBuf + (i * BYTE_PER_BLOCK), blockHdrInit, blockHdrInitSize);
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_INIT_DCT_EXTENT);
|
|
|
|
if (bExpandExtent)
|
|
Stats::startParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
|
|
else
|
|
Stats::startParseEvent(WE_STATS_CREATE_DCT_EXTENT);
|
|
#endif
|
|
|
|
if (remWriteSize > 0)
|
|
{
|
|
if (pFile->write(writeBuf, remWriteSize) != remWriteSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
|
|
for (int j = 0; j < loopCount; j++)
|
|
{
|
|
if (pFile->write(writeBuf, writeSize) != writeSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
// CS doesn't account flush timings.
|
|
#ifdef PROFILE
|
|
if (bExpandExtent)
|
|
Stats::stopParseEvent(WE_STATS_EXPAND_DCT_EXTENT);
|
|
else
|
|
Stats::stopParseEvent(WE_STATS_CREATE_DCT_EXTENT);
|
|
#endif
|
|
}
|
|
} // preallocation fallback end
|
|
|
|
// MCOL-498 CS has to set a number of blocs in the chunk header
|
|
if (m_compressionType)
|
|
{
|
|
updateDctnryExtent(pFile, nBlocks, lbid);
|
|
}
|
|
pFile->flush();
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Create a vector containing the mutexes used to serialize
|
|
* extent creation per DBRoot. Serializing extent creation
|
|
* helps to prevent disk fragmentation.
|
|
***********************************************************/
|
|
/* static */
|
|
void FileOp::initDbRootExtentMutexes()
|
|
{
|
|
boost::mutex::scoped_lock lk(m_createDbRootMutexes);
|
|
|
|
if (m_DbRootAddExtentMutexes.size() == 0)
|
|
{
|
|
std::vector<uint16_t> rootIds;
|
|
Config::getRootIdList(rootIds);
|
|
|
|
for (size_t i = 0; i < rootIds.size(); i++)
|
|
{
|
|
m_DbRootAddExtentMutexes.emplace(std::piecewise_construct,
|
|
std::forward_as_tuple(rootIds[i]),
|
|
std::forward_as_tuple());
|
|
}
|
|
}
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write out (reinitialize) a partial extent in a column file.
|
|
* A mutex is not used to prevent contention between threads,
|
|
* because the extent should already be in place on disk; so
|
|
* disk fragmentation is not an issue.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* startOffset(in)-file offset where we are to begin writing blocks
|
|
* nBlocks (in) - number of blocks to be written to the extent
|
|
* emptyVal(in) - empty value to be used for column data values
|
|
* width (in) - width of the applicable column
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::reInitPartialColumnExtent(IDBDataFile* pFile, long long startOffset, int nBlocks,
|
|
const uint8_t* emptyVal, int width)
|
|
{
|
|
int rc = setFileOffset(pFile, startOffset, SEEK_SET);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
if (nBlocks == 0)
|
|
return NO_ERROR;
|
|
|
|
// Determine the number of blocks in each call to fwrite(), and the
|
|
// number of fwrite() calls to make, based on this. In other words,
|
|
// we put a cap on the "writeSize" so that we don't allocate and write
|
|
// an entire extent at once for the 64M row extents.
|
|
int writeSize = nBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size
|
|
int loopCount = 0;
|
|
int remainderSize = writeSize;
|
|
|
|
if (nBlocks > MAX_NBLOCKS) // 64M row extent size
|
|
{
|
|
writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
|
|
loopCount = nBlocks / MAX_NBLOCKS;
|
|
remainderSize = nBlocks - (loopCount * MAX_NBLOCKS);
|
|
}
|
|
|
|
// Allocate a buffer, initialize it, and use it to initialize the extent
|
|
// Store in scoped_array to insure it's deletion.
|
|
// Create scope {...} to manage deletion of writeBuf.
|
|
{
|
|
unsigned char* writeBuf = new unsigned char[writeSize];
|
|
boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
|
|
|
|
setEmptyBuf(writeBuf, writeSize, emptyVal, width);
|
|
|
|
for (int j = 0; j < loopCount; j++)
|
|
{
|
|
if (pFile->write(writeBuf, writeSize) != writeSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
|
|
if (remainderSize > 0)
|
|
{
|
|
if (pFile->write(writeBuf, remainderSize) != remainderSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Synchronize here to avoid write buffer pile up too much, which could
|
|
// cause controllernode to timeout later when it needs to save a snapshot.
|
|
pFile->flush();
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write out (reinitialize) a partial extent in a dictionary store file.
|
|
* A mutex is not used to prevent contention between threads,
|
|
* because the extent should already be in place on disk; so
|
|
* disk fragmentation is not an issue.
|
|
* PARAMETERS:
|
|
* pFile (in) - IDBDataFile* of column segment file to be written to
|
|
* startOffset(in)-file offset where we are to begin writing blocks
|
|
* nBlocks (in) - number of blocks to be written to the extent
|
|
* blockHdrInit(in) - data used to initialize each block
|
|
* blockHdrInitSize(in) - number of bytes in blockHdrInit
|
|
* RETURN:
|
|
* returns ERR_FILE_WRITE if an error occurs,
|
|
* else returns NO_ERROR.
|
|
***********************************************************/
|
|
int FileOp::reInitPartialDctnryExtent(IDBDataFile* pFile, long long startOffset, int nBlocks,
|
|
unsigned char* blockHdrInit, int blockHdrInitSize)
|
|
{
|
|
int rc = setFileOffset(pFile, startOffset, SEEK_SET);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
if (nBlocks == 0)
|
|
return NO_ERROR;
|
|
|
|
// Determine the number of blocks in each call to fwrite(), and the
|
|
// number of fwrite() calls to make, based on this. In other words,
|
|
// we put a cap on the "writeSize" so that we don't allocate and write
|
|
// an entire extent at once for the 64M row extents.
|
|
int writeSize = nBlocks * BYTE_PER_BLOCK; // 1M and 8M row extent size
|
|
int loopCount = 0;
|
|
int remainderSize = writeSize;
|
|
|
|
if (nBlocks > MAX_NBLOCKS) // 64M row extent size
|
|
{
|
|
writeSize = MAX_NBLOCKS * BYTE_PER_BLOCK;
|
|
loopCount = nBlocks / MAX_NBLOCKS;
|
|
remainderSize = nBlocks - (loopCount * MAX_NBLOCKS);
|
|
nBlocks = MAX_NBLOCKS;
|
|
}
|
|
|
|
// Allocate a buffer, initialize it, and use it to initialize the extent
|
|
// Store in scoped_array to insure it's deletion.
|
|
// Create scope {...} to manage deletion of writeBuf.
|
|
{
|
|
unsigned char* writeBuf = new unsigned char[writeSize];
|
|
boost::scoped_array<unsigned char> writeBufPtr(writeBuf);
|
|
|
|
memset(writeBuf, 0, writeSize);
|
|
|
|
for (int i = 0; i < nBlocks; i++)
|
|
{
|
|
memcpy(writeBuf + (i * BYTE_PER_BLOCK), blockHdrInit, blockHdrInitSize);
|
|
}
|
|
|
|
for (int j = 0; j < loopCount; j++)
|
|
{
|
|
if (pFile->write(writeBuf, writeSize) != writeSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
|
|
if (remainderSize > 0)
|
|
{
|
|
if (pFile->write(writeBuf, remainderSize) != remainderSize)
|
|
{
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Synchronize here to avoid write buffer pile up too much, which could
|
|
// cause controllernode to timeout later when it needs to save a snapshot.
|
|
pFile->flush();
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* fileSize (out) - file size in bytes
|
|
* RETURN:
|
|
* error code
|
|
***********************************************************/
|
|
int FileOp::getFileSize(IDBDataFile* pFile, long long& fileSize) const
|
|
{
|
|
fileSize = 0;
|
|
|
|
if (pFile == NULL)
|
|
return ERR_FILE_NULL;
|
|
|
|
fileSize = pFile->size();
|
|
|
|
if (fileSize < 0)
|
|
{
|
|
fileSize = 0;
|
|
return ERR_FILE_STAT;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Get file size using file id
|
|
* PARAMETERS:
|
|
* fid - column OID
|
|
* dbroot - DBRoot of applicable segment file
|
|
* partition - partition of applicable segment file
|
|
* segment - segment of applicable segment file
|
|
* fileSize (out) - current file size for requested segment file
|
|
* RETURN:
|
|
* NO_ERROR if okay, else an error return code.
|
|
***********************************************************/
|
|
int FileOp::getFileSize(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment,
|
|
long long& fileSize) const
|
|
{
|
|
fileSize = 0;
|
|
|
|
char fileName[FILE_NAME_SIZE];
|
|
RETURN_ON_ERROR(getFileName(fid, fileName, dbRoot, partition, segment));
|
|
|
|
fileSize = IDBPolicy::size(fileName);
|
|
|
|
if (fileSize < 0)
|
|
{
|
|
fileSize = 0;
|
|
return ERR_FILE_STAT;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Check whether it is a directory
|
|
* PARAMETERS:
|
|
* dirName - directory name
|
|
* RETURN:
|
|
* true if it is, false otherwise
|
|
***********************************************************/
|
|
bool FileOp::isDir(const char* dirName) const
|
|
{
|
|
return IDBPolicy::isDir(dirName);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Convert an oid to a filename
|
|
* PARAMETERS:
|
|
* fid - fid
|
|
* fullFileName - file name
|
|
* bCreateDir - whether need to create a directory
|
|
* dbRoot - DBRoot where file is to be located; 1->DBRoot1,
|
|
* 2->DBRoot2, etc. If bCreateDir is false, meaning we
|
|
* are not creating the file but only searching for an
|
|
* existing file, then dbRoot can be 0, and oid2FileName
|
|
* will search all the DBRoots for the applicable filename.
|
|
* partition - Partition number to be used in filepath subdirectory
|
|
* segment - Segment number to be used in filename
|
|
* RETURN:
|
|
* NO_ERROR if success, other if fail
|
|
***********************************************************/
|
|
int FileOp::oid2FileName(FID fid, char* fullFileName, bool bCreateDir, uint16_t dbRoot, uint32_t partition,
|
|
uint16_t segment) const
|
|
{
|
|
#ifdef SHARED_NOTHING_DEMO_2
|
|
|
|
if (fid >= 10000)
|
|
{
|
|
char root[FILE_NAME_SIZE];
|
|
Config::getSharedNothingRoot(root);
|
|
sprintf(fullFileName, "%s/FILE%d", root, fid);
|
|
return NO_ERROR;
|
|
}
|
|
|
|
#endif
|
|
|
|
// Need this stub to use ColumnOp::writeRow in the unit tests
|
|
#ifdef WITH_UNIT_TESTS
|
|
if (fid == 42)
|
|
{
|
|
sprintf(fullFileName, "./versionbuffer");
|
|
return NO_ERROR;
|
|
}
|
|
#endif
|
|
|
|
/* If is a version buffer file, the format is different. */
|
|
if (fid < 1000)
|
|
{
|
|
/* Get the dbroot #
|
|
* Get the root of that dbroot
|
|
* Add "/versionbuffer.cdf"
|
|
*/
|
|
BRM::DBRM dbrm;
|
|
int _dbroot = dbrm.getDBRootOfVBOID(fid);
|
|
|
|
if (_dbroot < 0)
|
|
return ERR_INVALID_VBOID;
|
|
|
|
snprintf(fullFileName, FILE_NAME_SIZE, "%s/versionbuffer.cdf", Config::getDBRootByNum(_dbroot).c_str());
|
|
return NO_ERROR;
|
|
}
|
|
|
|
// Get hashed part of the filename. This is the tail-end of the filename path,
|
|
// excluding the DBRoot.
|
|
char tempFileName[FILE_NAME_SIZE];
|
|
char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
|
|
RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, partition, segment)));
|
|
|
|
// see if file exists in specified DBRoot; return if found
|
|
if (fullFileName == nullptr)
|
|
{
|
|
return ERR_INTERNAL;
|
|
}
|
|
|
|
if (dbRoot > 0)
|
|
{
|
|
sprintf(fullFileName, "%s/%s", Config::getDBRootByNum(dbRoot).c_str(), tempFileName);
|
|
|
|
// std::cout << "oid2FileName() OID: " << fid <<
|
|
// " searching for file: " << fullFileName <<std::endl;
|
|
// if (access(fullFileName, R_OK) == 0) return NO_ERROR;
|
|
//@Bug 5397
|
|
if (IDBPolicy::exists(fullFileName))
|
|
return NO_ERROR;
|
|
|
|
// file wasn't found, user doesn't want dirs to be created, we're done
|
|
if (!bCreateDir)
|
|
return NO_ERROR;
|
|
|
|
// std::cout << "oid2FileName() OID: " << fid <<
|
|
// " creating file: " << fullFileName <<std::endl;
|
|
}
|
|
else
|
|
{
|
|
// Now try to find the file in each of the DBRoots.
|
|
std::vector<std::string> dbRootPathList;
|
|
Config::getDBRootPathList(dbRootPathList);
|
|
|
|
for (unsigned i = 0; i < dbRootPathList.size(); i++)
|
|
{
|
|
sprintf(fullFileName, "%s/%s", dbRootPathList[i].c_str(), tempFileName);
|
|
|
|
// found it, nothing more to do, return
|
|
// if (access(fullFileName, R_OK) == 0) return NO_ERROR;
|
|
//@Bug 5397
|
|
if (IDBPolicy::exists(fullFileName))
|
|
return NO_ERROR;
|
|
}
|
|
|
|
// file wasn't found, user didn't specify DBRoot so we can't create
|
|
return ERR_FILE_NOT_EXIST;
|
|
}
|
|
|
|
std::stringstream aDirName;
|
|
for (size_t i = 0; i < MaxDirLevels; i++)
|
|
{
|
|
if (i == 0)
|
|
{
|
|
aDirName << Config::getDBRootByNum(dbRoot).c_str() << "/" << dbDir[i];
|
|
}
|
|
else
|
|
{
|
|
aDirName << "/" << dbDir[i];
|
|
}
|
|
if (!isDir(aDirName.str().c_str()))
|
|
RETURN_ON_ERROR(createDir(aDirName.str().c_str()));
|
|
|
|
{
|
|
std::ostringstream ossChown;
|
|
if (chownDataPath(aDirName.str()))
|
|
return ERR_FILE_CHOWN;
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
void FileOp::getFileNameForPrimProc(FID fid, char* fullFileName, uint16_t dbRoot, uint32_t partition,
|
|
uint16_t segment) const
|
|
{
|
|
string dbRootPath = Config::getDBRootByNum(dbRoot);
|
|
if (dbRootPath.empty())
|
|
{
|
|
ostringstream oss;
|
|
oss << "(dbroot " << dbRoot << " offline)";
|
|
dbRootPath = oss.str();
|
|
}
|
|
|
|
// different filenames for the version buffer files
|
|
if (fid < 1000)
|
|
snprintf(fullFileName, FILE_NAME_SIZE, "%s/versionbuffer.cdf", dbRootPath.c_str());
|
|
else
|
|
snprintf(fullFileName, FILE_NAME_SIZE, "%s/%03u.dir/%03u.dir/%03u.dir/%03u.dir/%03u.dir/FILE%03d.cdf",
|
|
dbRootPath.c_str(), fid >> 24, (fid & 0x00ff0000) >> 16, (fid & 0x0000ff00) >> 8,
|
|
fid & 0x000000ff, partition, segment);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Search for directory path associated with specified OID.
|
|
* If the OID is a version buffer file, it returns the whole
|
|
* filename.
|
|
* PARAMETERS:
|
|
* fid - (in) OID to search for
|
|
* pFile - (out) OID directory path (including DBRoot) that is found
|
|
* RETURN:
|
|
* NO_ERROR if OID dir path found, else returns ERR_FILE_NOT_EXIST
|
|
***********************************************************/
|
|
int FileOp::oid2DirName(FID fid, char* oidDirName) const
|
|
{
|
|
char tempFileName[FILE_NAME_SIZE];
|
|
char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
|
|
|
|
/* If is a version buffer file, the format is different. */
|
|
if (fid < 1000)
|
|
{
|
|
/* Get the dbroot #
|
|
* Get the root of that dbroot
|
|
*/
|
|
BRM::DBRM dbrm;
|
|
int _dbroot = dbrm.getDBRootOfVBOID(fid);
|
|
|
|
if (_dbroot < 0)
|
|
return ERR_INVALID_VBOID;
|
|
|
|
snprintf(oidDirName, FILE_NAME_SIZE, "%s", Config::getDBRootByNum(_dbroot).c_str());
|
|
return NO_ERROR;
|
|
}
|
|
|
|
|
|
if (oidDirName == nullptr)
|
|
{
|
|
return ERR_INTERNAL;
|
|
}
|
|
|
|
RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, 0, 0)));
|
|
|
|
// Now try to find the directory in each of the DBRoots.
|
|
std::vector<std::string> dbRootPathList;
|
|
Config::getDBRootPathList(dbRootPathList);
|
|
|
|
for (unsigned i = 0; i < dbRootPathList.size(); i++)
|
|
{
|
|
sprintf(oidDirName, "%s/%s/%s/%s/%s", dbRootPathList[i].c_str(), dbDir[0], dbDir[1], dbDir[2], dbDir[3]);
|
|
|
|
// found it, nothing more to do, return
|
|
//@Bug 5397. use the new way to check
|
|
if (IDBPolicy::exists(oidDirName))
|
|
return NO_ERROR;
|
|
}
|
|
|
|
return ERR_FILE_NOT_EXIST;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Construct directory path for the specified fid (OID), DBRoot, and
|
|
* partition number. Directory path need not exist, nor is it created.
|
|
* PARAMETERS:
|
|
* fid - (in) OID of interest
|
|
* dbRoot - (in) DBRoot of interest
|
|
* partition - (in) partition of interest
|
|
* dirName - (out) constructed directory path
|
|
* RETURN:
|
|
* NO_ERROR if path is successfully constructed.
|
|
***********************************************************/
|
|
int FileOp::getDirName(FID fid, uint16_t dbRoot, uint32_t partition, std::string& dirName) const
|
|
{
|
|
char tempFileName[FILE_NAME_SIZE];
|
|
char dbDir[MAX_DB_DIR_LEVEL][MAX_DB_DIR_NAME_SIZE];
|
|
|
|
RETURN_ON_ERROR((Convertor::oid2FileName(fid, tempFileName, dbDir, partition, 0)));
|
|
|
|
std::string rootPath = Config::getDBRootByNum(dbRoot);
|
|
std::ostringstream oss;
|
|
oss << rootPath << '/' << dbDir[0] << '/' << dbDir[1] << '/' << dbDir[2] << '/' << dbDir[3] << '/'
|
|
<< dbDir[4];
|
|
dirName = oss.str();
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Open a file
|
|
* PARAMETERS:
|
|
* fileName - file name with complete path
|
|
* pFile - file handle
|
|
* RETURN:
|
|
* true if exists, false otherwise
|
|
***********************************************************/
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
|
IDBDataFile* FileOp::openFile(const char* fileName, const char* mode, const int ioColSize,
|
|
bool useTmpSuffix) const
|
|
{
|
|
IDBDataFile* pFile;
|
|
errno = 0;
|
|
|
|
unsigned opts;
|
|
|
|
if (ioColSize > 0)
|
|
opts = IDBDataFile::USE_VBUF;
|
|
else
|
|
opts = IDBDataFile::USE_NOVBUF;
|
|
|
|
if ((useTmpSuffix) && idbdatafile::IDBPolicy::useHdfs())
|
|
opts |= IDBDataFile::USE_TMPFILE;
|
|
|
|
pFile =
|
|
IDBDataFile::open(IDBPolicy::getType(fileName, IDBPolicy::WRITEENG), fileName, mode, opts, ioColSize);
|
|
|
|
if (pFile == NULL)
|
|
{
|
|
int errRc = errno;
|
|
std::ostringstream oss;
|
|
std::string errnoMsg;
|
|
Convertor::mapErrnoToString(errRc, errnoMsg);
|
|
oss << "FileOp::openFile(): fopen(" << fileName << ", " << mode << "): errno = " << errRc << ": "
|
|
<< errnoMsg;
|
|
logging::Message::Args args;
|
|
args.add(oss.str());
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0006);
|
|
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_ERROR, logging::M0006);
|
|
}
|
|
|
|
return pFile;
|
|
}
|
|
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
|
IDBDataFile* FileOp::openFile(FID fid, uint16_t dbRoot, uint32_t partition, uint16_t segment,
|
|
std::string& segFile, const char* mode, int ioColSize, bool useTmpSuffix) const
|
|
{
|
|
char fileName[FILE_NAME_SIZE];
|
|
int rc;
|
|
|
|
// fid2FileName( fileName, fid );
|
|
RETURN_ON_WE_ERROR((rc = getFileName(fid, fileName, dbRoot, partition, segment)), NULL);
|
|
|
|
// disable buffering for versionbuffer file
|
|
if (fid < 1000)
|
|
ioColSize = 0;
|
|
|
|
IDBDataFile* pF = openFile(fileName, mode, ioColSize, useTmpSuffix);
|
|
|
|
segFile = fileName;
|
|
|
|
return pF;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Read a portion of file to a buffer
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* readBuf - read buffer
|
|
* readSize - the size to read
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NULL if file handle is NULL
|
|
* ERR_FILE_READ if something wrong in reading the file
|
|
***********************************************************/
|
|
int FileOp::readFile(IDBDataFile* pFile, unsigned char* readBuf, int readSize) const
|
|
{
|
|
if (pFile != NULL)
|
|
{
|
|
int bc = pFile->read(readBuf, readSize);
|
|
if (bc != readSize)
|
|
{
|
|
// MCOL-498 EOF if a next block is empty
|
|
if (bc == 0)
|
|
{
|
|
return ERR_FILE_EOF;
|
|
}
|
|
return ERR_FILE_READ;
|
|
}
|
|
}
|
|
else
|
|
return ERR_FILE_NULL;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* Reads contents of headers from "pFile" and stores into "hdrs".
|
|
***********************************************************/
|
|
int FileOp::readHeaders(IDBDataFile* pFile, char* hdrs) const
|
|
{
|
|
RETURN_ON_ERROR(setFileOffset(pFile, 0));
|
|
RETURN_ON_ERROR(
|
|
readFile(pFile, reinterpret_cast<unsigned char*>(hdrs), (CompressInterface::HDR_BUF_LEN * 2)));
|
|
int rc = compress::CompressInterface::verifyHdr(hdrs);
|
|
|
|
if (rc != 0)
|
|
{
|
|
return ERR_COMP_VERIFY_HDRS;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* Reads contents of headers from "pFile" and stores into "hdr1" and "hdr2".
|
|
***********************************************************/
|
|
int FileOp::readHeaders(IDBDataFile* pFile, char* hdr1, char* hdr2) const
|
|
{
|
|
unsigned char* hdrPtr = reinterpret_cast<unsigned char*>(hdr1);
|
|
RETURN_ON_ERROR(setFileOffset(pFile, 0));
|
|
RETURN_ON_ERROR(readFile(pFile, hdrPtr, CompressInterface::HDR_BUF_LEN));
|
|
|
|
int ptrSecSize = compress::CompressInterface::getHdrSize(hdrPtr) - CompressInterface::HDR_BUF_LEN;
|
|
return readFile(pFile, reinterpret_cast<unsigned char*>(hdr2), ptrSecSize);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION: No change Old signature
|
|
* Read a portion of file to a buffer
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* offset - file offset
|
|
* origin - can be SEEK_SET, or SEEK_CUR, or SEEK_END
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NULL if file handle is NULL
|
|
* ERR_FILE_SEEK if something wrong in setting the position
|
|
***********************************************************/
|
|
int FileOp::setFileOffset(IDBDataFile* pFile, long long offset, int origin) const
|
|
{
|
|
int rc;
|
|
long long fboOffset = offset; // workaround solution to pass leakcheck error
|
|
|
|
if (pFile == NULL)
|
|
return ERR_FILE_NULL;
|
|
|
|
if (offset < 0)
|
|
return ERR_FILE_FBO_NEG;
|
|
|
|
rc = pFile->seek(fboOffset, origin);
|
|
|
|
if (rc)
|
|
return ERR_FILE_SEEK;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Read a portion of file to a buffer
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* offset - file offset
|
|
* origin - can be SEEK_SET, or SEEK_CUR, or SEEK_END
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NULL if file handle is NULL
|
|
* ERR_FILE_SEEK if something wrong in setting the position
|
|
***********************************************************/
|
|
int FileOp::setFileOffsetBlock(IDBDataFile* pFile, uint64_t lbid, int origin) const
|
|
{
|
|
long long fboOffset = 0;
|
|
int fbo = 0;
|
|
|
|
// only when fboFlag is false, we get in here
|
|
uint16_t dbRoot;
|
|
uint32_t partition;
|
|
uint16_t segment;
|
|
RETURN_ON_ERROR(BRMWrapper::getInstance()->getFboOffset(lbid, dbRoot, partition, segment, fbo));
|
|
fboOffset = ((long long)fbo) * (long)BYTE_PER_BLOCK;
|
|
|
|
return setFileOffset(pFile, fboOffset, origin);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Truncate file to the specified size.
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* fileSize - size of file in bytes.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NULL if file handle is NULL
|
|
* ERR_FILE_SEEK if something wrong in setting the position
|
|
***********************************************************/
|
|
int FileOp::truncateFile(IDBDataFile* pFile, long long fileSize) const
|
|
{
|
|
if (pFile == NULL)
|
|
return ERR_FILE_NULL;
|
|
|
|
if (pFile->truncate(fileSize) != 0)
|
|
return ERR_FILE_TRUNCATE;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write a buffer to a file at at current location
|
|
* PARAMETERS:
|
|
* pFile - file handle
|
|
* writeBuf - write buffer
|
|
* writeSize - the write size
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NULL if file handle is NULL
|
|
* ERR_FILE_WRITE if something wrong in writing to the file
|
|
***********************************************************/
|
|
int FileOp::writeFile(IDBDataFile* pFile, const unsigned char* writeBuf, int writeSize) const
|
|
{
|
|
if (pFile != NULL)
|
|
{
|
|
if (pFile->write(writeBuf, writeSize) != writeSize)
|
|
return ERR_FILE_WRITE;
|
|
}
|
|
else
|
|
return ERR_FILE_NULL;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Determine whether the applicable filesystem has room to add the
|
|
* specified number of blocks (where the blocks contain BYTE_PER_BLOCK
|
|
* bytes).
|
|
* PARAMETERS:
|
|
* fileName - file whose file system is to be checked. Does not have to
|
|
* be a complete file name. Dir path is sufficient.
|
|
* nBlock - number of 8192-byte blocks to be added
|
|
* RETURN:
|
|
* true if there is room for the blocks or it can not be determined;
|
|
* false if file system usage would exceed allowable threshold
|
|
***********************************************************/
|
|
bool FileOp::isDiskSpaceAvail(const std::string& fileName, int nBlocks) const
|
|
{
|
|
bool bSpaceAvail = true;
|
|
|
|
unsigned maxDiskUsage = Config::getMaxFileSystemDiskUsage();
|
|
|
|
if (maxDiskUsage < 100) // 100% means to disable the check
|
|
{
|
|
struct statfs fStats;
|
|
int rc = statfs(fileName.c_str(), &fStats);
|
|
|
|
if (rc == 0)
|
|
{
|
|
double totalBlocks = fStats.f_blocks;
|
|
double blksToAlloc = (double)(nBlocks * BYTE_PER_BLOCK) / fStats.f_bsize;
|
|
double freeBlocks = fStats.f_bavail - blksToAlloc;
|
|
|
|
if ((((totalBlocks - freeBlocks) / totalBlocks) * 100.0) > maxDiskUsage)
|
|
bSpaceAvail = false;
|
|
|
|
// std::cout << "isDiskSpaceAvail" <<
|
|
//": totalBlocks: " << totalBlocks <<
|
|
//"; blkSize: " << fStats.f_bsize <<
|
|
//"; nBlocks: " << nBlocks <<
|
|
//"; freeBlks: " << freeBlocks <<
|
|
//"; pctUsed: " << (((totalBlocks-freeBlocks)/totalBlocks)*100.0) <<
|
|
//"; bAvail: " << bSpaceAvail << std::endl;
|
|
}
|
|
|
|
}
|
|
|
|
return bSpaceAvail;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Virtual default functions follow; placeholders for derived class if they want
|
|
// to override (see ColumnOpCompress1 and DctnryCompress1 in /wrapper).
|
|
//------------------------------------------------------------------------------
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Expand current abbreviated extent to a full extent for column segment file
|
|
// associated with pFile. Function leaves fileposition at end of file after
|
|
// extent is expanded.
|
|
//------------------------------------------------------------------------------
|
|
int FileOp::expandAbbrevColumnExtent(
|
|
IDBDataFile* pFile, // FILE ptr to file where abbrev extent is to be expanded
|
|
uint16_t dbRoot, // The DBRoot of the file with the abbreviated extent
|
|
const uint8_t* emptyVal, // Empty value to be used in expanding the extent
|
|
int width, // Width of the column (in bytes)
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType) // Column data type.
|
|
{
|
|
// Based on extent size, see how many blocks to add to fill the extent
|
|
int blksToAdd =
|
|
(((int)BRMWrapper::getInstance()->getExtentRows() - INITIAL_EXTENT_ROWS_TO_DISK) / BYTE_PER_BLOCK) *
|
|
width;
|
|
|
|
// Make sure there is enough disk space to expand the extent.
|
|
RETURN_ON_ERROR(setFileOffset(pFile, 0, SEEK_END));
|
|
|
|
// TODO-will have to address this DiskSpaceAvail check at some point
|
|
if (!isDiskSpaceAvail(Config::getDBRootByNum(dbRoot), blksToAdd))
|
|
{
|
|
return ERR_FILE_DISK_SPACE;
|
|
}
|
|
|
|
// Add blocks to turn the abbreviated extent into a full extent.
|
|
int rc = FileOp::initColumnExtent(pFile, dbRoot, blksToAdd, emptyVal, width, colDataType,
|
|
false, // existing file
|
|
true, // expand existing extent
|
|
false, // n/a since not adding new extent
|
|
true); // optimize segment file extension
|
|
|
|
return rc;
|
|
}
|
|
|
|
void FileOp::setTransId(const TxnID& transId)
|
|
{
|
|
m_transId = transId;
|
|
}
|
|
|
|
void FileOp::setBulkFlag(bool isBulkLoad)
|
|
{
|
|
m_isBulk = isBulkLoad;
|
|
}
|
|
|
|
int FileOp::flushFile(int rc, std::map<FID, FID>& oids)
|
|
{
|
|
return NO_ERROR;
|
|
}
|
|
|
|
int FileOp::updateColumnExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid)
|
|
{
|
|
return NO_ERROR;
|
|
}
|
|
|
|
int FileOp::updateDctnryExtent(IDBDataFile* pFile, int nBlocks, int64_t lbid)
|
|
{
|
|
return NO_ERROR;
|
|
}
|
|
|
|
void FileOp::setFixFlag(bool isFix)
|
|
{
|
|
m_isFix = isFix;
|
|
}
|
|
|
|
// Small note. We call chownFileDir in couple places to chown of the
|
|
// target file and call in oid2Filename() chowns directories created
|
|
bool FileOp::chownDataPath(const std::string& fileName) const
|
|
{
|
|
std::ostringstream error;
|
|
idbdatafile::IDBFileSystem& fs = IDBPolicy::getFs(fileName);
|
|
if (chownPath(error, fileName, fs))
|
|
{
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add(error.str());
|
|
message.format(args);
|
|
logging::LoggingID lid(SUBSYSTEM_ID_WE_BULK);
|
|
logging::MessageLog ml(lid);
|
|
ml.logErrorMessage(message);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
} // namespace WriteEngine
|