mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-21 19:45:56 +03:00
2653 lines
90 KiB
C++
2653 lines
90 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
Copyright (C) 2019 MariaDB Corporation.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: we_chunkmanager.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
|
|
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/time.h>
|
|
#include <iostream>
|
|
#include <cstdio>
|
|
#include <ctime>
|
|
//#define NDEBUG
|
|
#include <cassert>
|
|
using namespace std;
|
|
|
|
#include <boost/scoped_array.hpp>
|
|
#include <boost/scoped_ptr.hpp>
|
|
|
|
#include "logger.h"
|
|
#include "cacheutils.h"
|
|
|
|
#include "we_chunkmanager.h"
|
|
|
|
#include "we_macro.h"
|
|
#include "we_brm.h"
|
|
#include "we_config.h"
|
|
#include "we_confirmhdfsdbfile.h"
|
|
#include "we_fileop.h"
|
|
#include "../dictionary/we_dctnry.h"
|
|
#include "we_stats.h"
|
|
using namespace execplan;
|
|
|
|
#include "IDBDataFile.h"
|
|
#include "IDBPolicy.h"
|
|
#include "cloudio/SMFileSystem.h"
|
|
using namespace idbdatafile;
|
|
|
|
namespace
|
|
{
|
|
// Function to compare 2 ChunkData pointers.
|
|
bool chunkDataPtrLessCompare(WriteEngine::ChunkData* p1, WriteEngine::ChunkData* p2)
|
|
{
|
|
return (p1->fChunkId) < (p2->fChunkId);
|
|
}
|
|
|
|
} // namespace
|
|
|
|
namespace WriteEngine
|
|
{
|
|
extern int NUM_BLOCKS_PER_INITIAL_EXTENT; // defined in we_dctnry.cpp
|
|
extern WErrorCodes ec; // defined in we_log.cpp
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Search for the specified chunk in fChunkList.
|
|
//------------------------------------------------------------------------------
|
|
ChunkData* CompFileData::findChunk(int64_t id) const
|
|
{
|
|
ChunkData* pChunkData = NULL;
|
|
|
|
for (list<ChunkData*>::const_iterator lit = fChunkList.begin(); lit != fChunkList.end(); ++lit)
|
|
{
|
|
if ((*lit)->fChunkId == id)
|
|
{
|
|
pChunkData = *lit;
|
|
break;
|
|
}
|
|
}
|
|
|
|
return pChunkData;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// ChunkManager constructor
|
|
//------------------------------------------------------------------------------
|
|
ChunkManager::ChunkManager()
|
|
: fMaxActiveChunkNum(100)
|
|
, fLenCompressed(0)
|
|
, fIsBulkLoad(false)
|
|
, fDropFdCache(false)
|
|
, fIsInsert(false)
|
|
, fIsHdfs(IDBPolicy::useHdfs())
|
|
, fFileOp(0)
|
|
, fSysLogger(NULL)
|
|
, fTransId(-1)
|
|
, fLocalModuleId(Config::getLocalModuleID())
|
|
, fFs(fIsHdfs ? IDBFileSystem::getFs(IDBDataFile::HDFS)
|
|
: IDBPolicy::useCloud() ? IDBFileSystem::getFs(IDBDataFile::CLOUD)
|
|
: IDBFileSystem::getFs(IDBDataFile::BUFFERED))
|
|
{
|
|
fUserPaddings = Config::getNumCompressedPadBlks() * BYTE_PER_BLOCK;
|
|
compress::initializeCompressorPool(fCompressorPool, fUserPaddings);
|
|
|
|
COMPRESSED_CHUNK_SIZE =
|
|
compress::CompressInterface::getMaxCompressedSizeGeneric(UNCOMPRESSED_CHUNK_SIZE) + 64 + 3 + 8 * 1024;
|
|
|
|
fMaxCompressedBufSize = COMPRESSED_CHUNK_SIZE + fUserPaddings;
|
|
fBufCompressed = new char[fMaxCompressedBufSize];
|
|
fSysLogger = new logging::Logger(SUBSYSTEM_ID_WE);
|
|
logging::MsgMap msgMap;
|
|
msgMap[logging::M0080] = logging::Message(logging::M0080);
|
|
fSysLogger->msgMap(msgMap);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// ChunkManager destructor
|
|
//------------------------------------------------------------------------------
|
|
ChunkManager::~ChunkManager()
|
|
{
|
|
std::map<FID, FID> columnOids;
|
|
cleanUp(columnOids);
|
|
|
|
delete[] fBufCompressed;
|
|
fBufCompressed = NULL;
|
|
|
|
delete fSysLogger;
|
|
fSysLogger = NULL;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Log a message into the DML recovery log.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::writeLog(TxnID txnId, string backUpFileType, string filename, string& aDMLLogFileName,
|
|
int64_t size, int64_t offset) const
|
|
{
|
|
// Get log file name
|
|
if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
|
|
return ERR_DML_LOG_NAME;
|
|
|
|
// Open file
|
|
boost::scoped_ptr<IDBDataFile> aDMLLogFile;
|
|
|
|
try
|
|
{
|
|
aDMLLogFile.reset(IDBDataFile::open(IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
|
|
aDMLLogFileName.c_str(), "a+b", 0));
|
|
|
|
if (!aDMLLogFile)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened (no exception thrown)";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_OPEN_DML_LOG;
|
|
}
|
|
}
|
|
catch (exception& e)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened: " << e.what();
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_OPEN_DML_LOG;
|
|
}
|
|
|
|
// Write the log
|
|
ostringstream entry;
|
|
entry << backUpFileType << '\n' << filename << '\n' << size << '\n' << offset << '\n';
|
|
string tmp = entry.str();
|
|
aDMLLogFile->seek(0, SEEK_END);
|
|
aDMLLogFile->tell();
|
|
aDMLLogFile->write(tmp.c_str(), tmp.size());
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
int ChunkManager::removeBackups(TxnID txnId)
|
|
{
|
|
// HDFS update/delete is handled differently
|
|
if (fIsHdfs || fIsBulkLoad)
|
|
return NO_ERROR;
|
|
|
|
string aDMLLogFileName;
|
|
|
|
if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
|
|
return ERR_DML_LOG_NAME;
|
|
|
|
if (IDBPolicy::exists(aDMLLogFileName.c_str()))
|
|
{
|
|
boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
|
|
IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "r", 0));
|
|
|
|
if (aDMLLogFile) // need recover
|
|
{
|
|
ssize_t fileSize = aDMLLogFile->size();
|
|
boost::scoped_array<char> buf(new char[fileSize]);
|
|
|
|
if (aDMLLogFile->read(buf.get(), fileSize) != fileSize)
|
|
return ERR_FILE_READ;
|
|
|
|
std::istringstream strstream(string(buf.get(), fileSize));
|
|
std::string backUpFileType;
|
|
std::string filename;
|
|
int64_t size;
|
|
int64_t offset;
|
|
|
|
while (strstream >> backUpFileType >> filename >> size >> offset)
|
|
{
|
|
if (backUpFileType.compare("tmp") == 0)
|
|
{
|
|
filename += ".tmp";
|
|
IDBPolicy::remove(filename.c_str());
|
|
}
|
|
else
|
|
{
|
|
std::string backFileName(filename);
|
|
|
|
if (backUpFileType.compare("chk") == 0)
|
|
backFileName += ".chk";
|
|
else
|
|
backFileName += ".hdr";
|
|
|
|
IDBPolicy::remove(backFileName.c_str());
|
|
}
|
|
}
|
|
|
|
aDMLLogFile.reset(); // closes the file in IDBDataFile destructor.
|
|
|
|
IDBPolicy::remove(aDMLLogFileName.c_str());
|
|
}
|
|
else
|
|
{
|
|
return ERR_OPEN_DML_LOG;
|
|
}
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get/Return IDBDataFile* for specified OID, root, partition, and segment.
|
|
// Function is to be used to open column files.
|
|
// If the IDBDataFile* is not found, then a segment file will be opened using the
|
|
// mode (mode) and I/O buffer size (size) that is given. Name of the resulting
|
|
// file is returned in filename.
|
|
//
|
|
// For Bulk HDFS usage:
|
|
// If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
|
|
//------------------------------------------------------------------------------
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
|
IDBDataFile* ChunkManager::getFilePtr(const Column& column, uint16_t root, uint32_t partition,
|
|
uint16_t segment, string& filename, const char* mode, int size,
|
|
bool useTmpSuffix, bool isReadOnly) const
|
|
{
|
|
CompFileData* fileData = getFileData(column.dataFile.fid, root, partition, segment, filename, mode, size,
|
|
column.colDataType, column.colWidth, useTmpSuffix,
|
|
false, isReadOnly);
|
|
return (fileData ? fileData->fFilePtr : NULL);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get/Return IDBDataFile* for specified OID, root, partition, and segment.
|
|
// Function is to be used to open dictionary store files.
|
|
// If the IDBDataFile* is not found, then a segment file will be opened using the
|
|
// mode (mode) and I/O buffer size (size) that is given. Name of the resulting
|
|
// file is returned in filename.
|
|
//
|
|
// For Bulk HDFS usage:
|
|
// If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
|
|
//------------------------------------------------------------------------------
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
|
IDBDataFile* ChunkManager::getFilePtr(const FID& fid, uint16_t root, uint32_t partition, uint16_t segment,
|
|
string& filename, const char* mode, int size,
|
|
bool useTmpSuffix) const
|
|
{
|
|
CompFileData* fileData =
|
|
getFileData(fid, root, partition, segment, filename, mode, size, CalpontSystemCatalog::VARCHAR, 8,
|
|
useTmpSuffix, true); // hard code (varchar, 8) are dummy values for dictionary file
|
|
return (fileData ? fileData->fFilePtr : NULL);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get/Return IDBDataFile* by the given `filename`.
|
|
// OID, partition and segment are needed for a file cache.
|
|
// Function is to be used to open column/dict segment file.
|
|
// If the IDBDataFile* is not found, then a segment file will be opened using
|
|
// the mode (mode) and I/O buffer size (size) that is given.
|
|
//------------------------------------------------------------------------------
|
|
IDBDataFile* ChunkManager::getFilePtrByName(const std::string& filename, FID& fid, uint16_t root,
|
|
uint32_t partition, uint16_t segment,
|
|
execplan::CalpontSystemCatalog::ColDataType colDataType,
|
|
uint32_t colWidth, const char* mode, int32_t size,
|
|
bool useTmpSuffix, bool isDict) const
|
|
{
|
|
CompFileData* fileData = getFileDataByName(filename, fid, root, partition, segment, mode, size, colDataType,
|
|
colWidth, useTmpSuffix, isDict);
|
|
return (fileData ? fileData->fFilePtr : NULL);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get/Return CompFileData* for specified column OID, root, partition, and
|
|
// segment. If the IDBDataFile* is not found, then a segment file will be opened
|
|
// using the mode (mode) and I/O buffer size (size) that is given. Name of
|
|
// the resulting file is returned in filename.
|
|
// If the CompFileData* needs to be created, it will also be created and
|
|
// inserted into the fFileMap and fFilePtrMap for later use.
|
|
//
|
|
// For Bulk HDFS usage:
|
|
// If useTmpSuffix flag is set, then IDBDataFile will use *.tmp for output.
|
|
//------------------------------------------------------------------------------
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
|
CompFileData* ChunkManager::getFileData(const FID& fid, uint16_t root, uint32_t partition, uint16_t segment,
|
|
string& filename, const char* mode, int size,
|
|
const CalpontSystemCatalog::ColDataType colDataType, int colWidth,
|
|
bool useTmpSuffix, bool dctnry,
|
|
bool isReadOnly) const
|
|
{
|
|
FileID fileID(fid, root, partition, segment);
|
|
map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
|
|
|
|
WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition << " seg:"
|
|
<< segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ") << "found." << endl;)
|
|
|
|
// Get CompFileData pointer for existing Column or Dictionary store file
|
|
if (mit != fFileMap.end())
|
|
{
|
|
filename = mit->second->fFileName;
|
|
return mit->second;
|
|
}
|
|
|
|
// New CompFileData pointer needs to be created
|
|
char name[FILE_NAME_SIZE];
|
|
if (fFileOp->getFileName(fid, name, root, partition, segment) != NO_ERROR)
|
|
return NULL;
|
|
|
|
// Initialize the given `filename`.
|
|
filename = name;
|
|
return getFileData_(fileID, filename, mode, size, colDataType, colWidth, useTmpSuffix, dctnry, isReadOnly);
|
|
}
|
|
|
|
CompFileData* ChunkManager::getFileDataByName(const std::string& filename, const FID& fid, uint16_t root,
|
|
uint32_t partition, uint16_t segment, const char* mode,
|
|
int size, const CalpontSystemCatalog::ColDataType colDataType,
|
|
int colWidth, bool useTmpSuffix, bool dctnry) const
|
|
{
|
|
FileID fileID(fid, root, partition, segment);
|
|
map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
|
|
|
|
WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition << " seg:"
|
|
<< segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ") << "found." << endl;)
|
|
|
|
// Get CompFileData pointer for existing Column or Dictionary store file
|
|
if (mit != fFileMap.end())
|
|
return mit->second;
|
|
|
|
return getFileData_(fileID, filename, mode, size, colDataType, colWidth, useTmpSuffix, dctnry);
|
|
}
|
|
|
|
|
|
CompFileData* ChunkManager::getFileData_(const FileID& fileID, const string& filename, const char* mode,
|
|
int size, const CalpontSystemCatalog::ColDataType colDataType,
|
|
int colWidth, bool useTmpSuffix, bool dctnry, bool isReadOnly) const
|
|
{
|
|
CompFileData* fileData = new CompFileData(fileID, fileID.fFid, colDataType, colWidth, isReadOnly);
|
|
fileData->fFileName = filename;
|
|
|
|
if (openFile(fileData, mode, colWidth, useTmpSuffix, __LINE__) != NO_ERROR)
|
|
{
|
|
WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;)
|
|
delete fileData;
|
|
return NULL;
|
|
}
|
|
|
|
fileData->fDctnryCol = dctnry;
|
|
WE_COMP_DBG(cout << "open file* " << name << endl;)
|
|
|
|
// get the control data in header.
|
|
if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
|
|
COMPRESSED_FILE_HEADER_UNIT, __LINE__) != NO_ERROR)
|
|
{
|
|
WE_COMP_DBG(cout << "Failed to read control header." << endl;)
|
|
delete fileData;
|
|
return NULL;
|
|
}
|
|
|
|
// make sure the header is valid
|
|
if (compress::CompressInterface::verifyHdr(fileData->fFileHeader.fControlData) != 0)
|
|
{
|
|
WE_COMP_DBG(cout << "Invalid header." << endl;)
|
|
delete fileData;
|
|
return NULL;
|
|
}
|
|
|
|
const int32_t headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData);
|
|
const int32_t ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
|
|
// Save segment file compression type.
|
|
fileData->fCompressionType =
|
|
compress::CompressInterface::getCompressionType(fileData->fFileHeader.fControlData);
|
|
|
|
if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
|
|
{
|
|
// >8K header, dictionary width > 128
|
|
fileData->fFileHeader.fPtrSection = new char[ptrSecSize];
|
|
fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
|
|
}
|
|
|
|
// read in the pointer section in header
|
|
if (readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection, ptrSecSize,
|
|
__LINE__) != NO_ERROR)
|
|
{
|
|
WE_COMP_DBG(cout << "Failed to read pointer header." << endl;)
|
|
delete fileData;
|
|
return NULL;
|
|
}
|
|
|
|
fFileMap.insert(make_pair(fileID, fileData));
|
|
// cout << "Insert into fFilemap root:partition:seg:fileID = " <<root<<":"<< partition<<":"<<
|
|
// segment<<":"<<fid<<endl;
|
|
fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
|
|
return fileData;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Return new IDBDataFile* for specified dictionary OID, root, partition, segment, and
|
|
// width. A new segment file will be opened using the mode (mode) and I/O
|
|
// buffer size (size) that is given. Name of the resulting file is returned
|
|
// in filename.
|
|
// A corresponding CompFileData* is created and inserted into fFileMap and
|
|
// fFilePtrMap for later use.
|
|
//------------------------------------------------------------------------------
|
|
IDBDataFile* ChunkManager::createDctnryFile(const FID& fid, int64_t width, uint16_t root, uint32_t partition,
|
|
uint16_t segment, const char* filename, const char* mode,
|
|
int size, BRM::LBID_t lbid)
|
|
{
|
|
FileID fileID(fid, root, partition, segment);
|
|
CompFileData* fileData = new CompFileData(fileID, fid, CalpontSystemCatalog::VARCHAR, width);
|
|
fileData->fFileName = filename;
|
|
|
|
if (openFile(fileData, mode, width, false, __LINE__) != NO_ERROR) // @bug 5572 HDFS tmp file
|
|
{
|
|
WE_COMP_DBG(cout << "Failed to open " << fileData->fFileName << " ." << endl;)
|
|
delete fileData;
|
|
return NULL;
|
|
}
|
|
|
|
fileData->fDctnryCol = true;
|
|
WE_COMP_DBG(cout << "create file* " << filename << endl;)
|
|
int hdrSize = calculateHeaderSize(width);
|
|
int ptrSecSize = hdrSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
|
|
if (ptrSecSize > COMPRESSED_FILE_HEADER_UNIT)
|
|
{
|
|
// >8K header, dictionary width > 128
|
|
fileData->fFileHeader.fPtrSection = new char[ptrSecSize];
|
|
fileData->fFileHeader.fLongPtrSectData.reset(fileData->fFileHeader.fPtrSection);
|
|
}
|
|
|
|
// Dictionary store extent width == 0. See more details in function
|
|
// `createDictStoreExtent`.
|
|
compress::CompressInterface::initHdr(fileData->fFileHeader.fControlData, fileData->fFileHeader.fPtrSection,
|
|
/*colWidth=*/0, fileData->fColDataType, fFileOp->compressionType(),
|
|
hdrSize);
|
|
compress::CompressInterface::setLBIDByIndex(fileData->fFileHeader.fControlData, lbid, 0);
|
|
// Save compression type.
|
|
fileData->fCompressionType = fFileOp->compressionType();
|
|
|
|
if (writeHeader(fileData, __LINE__) != NO_ERROR)
|
|
{
|
|
WE_COMP_DBG(cout << "Failed to write header." << endl;)
|
|
delete fileData;
|
|
return NULL;
|
|
}
|
|
|
|
//@Bug 4977 remove log file
|
|
removeBackups(fTransId);
|
|
fFileMap.insert(make_pair(fileID, fileData));
|
|
fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
|
|
return fileData->fFilePtr;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Read the block for the specified fbo, from pFile's applicable chunk, and
|
|
// into readBuf.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::readBlock(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
|
|
|
|
if (fpIt == fFilePtrMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
// find the chunk ID and offset in the chunk
|
|
lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
|
|
ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
|
|
|
|
WE_COMP_DBG(cout << "fbo:" << fbo << " chunk id:" << offset.quot << " offset:" << offset.rem
|
|
<< " chunkData*:" << chunkData << endl;)
|
|
|
|
int rc = NO_ERROR;
|
|
|
|
// chunk is not already uncompressed
|
|
if (chunkData == NULL)
|
|
rc = fetchChunkFromFile(pFile, offset.quot, chunkData);
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
// copy the data at fbo to readBuf
|
|
memcpy(readBuf, chunkData->fBufUnCompressed + offset.rem, BYTE_PER_BLOCK);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Write writeBuf to the block for the specified fbo, within pFile's applicable
|
|
// chunk.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::saveBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
|
|
{
|
|
WE_COMP_DBG(cout << "save block fbo:" << fbo << endl;)
|
|
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
|
|
|
|
if (fpIt == fFilePtrMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
// find the chunk ID and offset in the chunk
|
|
lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
|
|
ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
|
|
|
|
int rc = NO_ERROR;
|
|
|
|
// chunk is not already read in
|
|
if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, offset.quot, chunkData)) != NO_ERROR))
|
|
return rc;
|
|
|
|
WE_COMP_DBG(cout << "fbo:" << fbo << " chunk id:" << offset.quot << " offset:" << offset.rem << " saved @"
|
|
<< (&(chunkData->fBufUnCompressed) + offset.rem) << endl;)
|
|
|
|
memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK);
|
|
chunkData->fWriteToFile = true;
|
|
|
|
// if the chunk is full for insert, flush it
|
|
// cout << "current offset.rem/8192 = " << offset.rem/8192 << endl;
|
|
if (fIsInsert && (offset.rem == MAXOFFSET_PER_CHUNK))
|
|
{
|
|
if (((rc = writeChunkToFile(fpIt->second, chunkData)) == NO_ERROR) &&
|
|
((rc = writeHeader(fpIt->second, __LINE__)) == NO_ERROR))
|
|
{
|
|
// cout << "saveblock flushed the full chunk"<<endl;
|
|
pFile->flush();
|
|
|
|
//@Bug 4977 remove log file
|
|
removeBackups(fTransId);
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Flush all pending chunks to their corresponding segment files.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::flushChunks(int rc, const std::map<FID, FID>& columOids)
|
|
{
|
|
// shall fail the the statement if failed here
|
|
WE_COMP_DBG(cout << "flushChunks." << endl;)
|
|
|
|
int k = fFilePtrMap.size();
|
|
std::map<FID, FID>::const_iterator it;
|
|
|
|
if ((rc == NO_ERROR) && fIsInsert)
|
|
{
|
|
while (k-- > 0 && rc == NO_ERROR)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
|
|
// sort the chunk list first
|
|
CompFileData* fileData = i->second;
|
|
it = columOids.find(fileData->fFid);
|
|
|
|
if (it != columOids.end())
|
|
{
|
|
list<ChunkData*>& chunkList = fileData->fChunkList;
|
|
chunkList.sort(chunkDataPtrLessCompare);
|
|
list<ChunkData*>::iterator j = chunkList.begin();
|
|
|
|
while (j != chunkList.end())
|
|
{
|
|
if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR)
|
|
break;
|
|
|
|
// write chunk to file removes the written chunk from the list
|
|
j = chunkList.begin();
|
|
}
|
|
|
|
if (rc != NO_ERROR)
|
|
break;
|
|
|
|
// finally update the header
|
|
if ((rc = writeHeader(fileData, __LINE__)) != NO_ERROR)
|
|
break;
|
|
|
|
//@Bug 4977 remove log file
|
|
removeBackups(fTransId);
|
|
|
|
// closeFile invalidates the iterator
|
|
closeFile(fileData);
|
|
}
|
|
}
|
|
}
|
|
else if (rc == NO_ERROR)
|
|
{
|
|
while (k-- > 0 && rc == NO_ERROR)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
|
|
// sort the chunk list first
|
|
CompFileData* fileData = i->second;
|
|
|
|
list<ChunkData*>& chunkList = fileData->fChunkList;
|
|
chunkList.sort(chunkDataPtrLessCompare);
|
|
list<ChunkData*>::iterator j = chunkList.begin();
|
|
|
|
while (j != chunkList.end())
|
|
{
|
|
if ((rc = writeChunkToFile(fileData, *j)) != NO_ERROR)
|
|
break;
|
|
|
|
// write chunk to file removes the written chunk from the list
|
|
j = chunkList.begin();
|
|
}
|
|
|
|
if (rc != NO_ERROR)
|
|
break;
|
|
|
|
// finally update the header
|
|
if (!fileData->fReadOnly && (rc = writeHeader(fileData, __LINE__)) != NO_ERROR)
|
|
break;
|
|
|
|
//@Bug 4977 remove log file
|
|
removeBackups(fTransId);
|
|
|
|
// closeFile invalidates the iterator
|
|
closeFile(fileData);
|
|
}
|
|
}
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
cleanUp(columOids);
|
|
return rc;
|
|
}
|
|
|
|
// fActiveChunks.clear();
|
|
// fFileMap.clear();
|
|
// fFilePtrMap.clear();
|
|
|
|
if (fDropFdCache)
|
|
{
|
|
cacheutils::dropPrimProcFdCache();
|
|
fDropFdCache = false;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Load and uncompress the requested chunk (id) for the specified file (pFile),
|
|
// into chunkData.
|
|
// id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE
|
|
// If the active chunk list is already full, then we flush the oldest pending
|
|
// chunk to disk, to make room for fetching the requested chunk.
|
|
// If the header ptr for the requested chunk is 0 (or has length 0), then
|
|
// chunkData is initialized with a new empty chunk.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::fetchChunkFromFile(IDBDataFile* pFile, int64_t id, ChunkData*& chunkData)
|
|
{
|
|
// return value
|
|
int rc = NO_ERROR;
|
|
|
|
// remove the oldest one if the max active chunk number is reached.
|
|
WE_COMP_DBG(cout << "fActiveChunks.size:" << fActiveChunks.size() << endl;)
|
|
// cout << "fetchChunkFromFile1: pFile = " << pFile << endl;
|
|
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
|
|
|
|
if (fpIt == fFilePtrMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
CompFileData* fileData = fpIt->second;
|
|
|
|
if (fActiveChunks.size() >= fMaxActiveChunkNum)
|
|
{
|
|
list<std::pair<FileID, ChunkData*> >::iterator lIt = fActiveChunks.begin();
|
|
|
|
if (!fIsBulkLoad && !(fpIt->second->fDctnryCol))
|
|
{
|
|
while ((lIt->first == fpIt->second->fFileID) && (lIt != fActiveChunks.end()))
|
|
lIt++;
|
|
}
|
|
|
|
if (lIt != fActiveChunks.end())
|
|
{
|
|
map<FileID, CompFileData*>::iterator fIt = fFileMap.find(lIt->first);
|
|
|
|
if (fIt == fFileMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
if ((rc = writeChunkToFile(fIt->second, lIt->second)) != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "write inactive chunk to file failed:" << fIt->second->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
if (!fIt->second->fReadOnly && (rc = writeHeader(fIt->second, __LINE__)) != NO_ERROR)
|
|
{
|
|
// logged by writeHeader
|
|
return rc;
|
|
}
|
|
|
|
//@Bug 4977 remove the log files
|
|
removeBackups(fTransId);
|
|
}
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF);
|
|
#endif
|
|
// get a new ChunkData object
|
|
chunkData = new ChunkData(id);
|
|
pFile = fileData->fFilePtr; // update to get the reopened file ptr.
|
|
fileData->fChunkList.push_back(chunkData);
|
|
fActiveChunks.push_back(make_pair(fileData->fFileID, chunkData));
|
|
|
|
// read the compressed chunk from file
|
|
uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
|
|
|
|
if (ptrs[id] && ptrs[id + 1]) // compressed chunk data exists
|
|
{
|
|
// safety check
|
|
if (ptrs[id] >= ptrs[id + 1])
|
|
{
|
|
logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_WRONG_PTR;
|
|
}
|
|
|
|
unsigned int chunkSize = (ptrs[id + 1] - ptrs[id]);
|
|
|
|
if ((rc = setFileOffset(pFile, fileData->fFileName, ptrs[id], __LINE__)) != NO_ERROR ||
|
|
(rc = readFile(pFile, fileData->fFileName, fBufCompressed, chunkSize, __LINE__)) != NO_ERROR)
|
|
{
|
|
// logged by setFileOffset/readFile
|
|
return rc;
|
|
}
|
|
|
|
// uncompress the read in buffer
|
|
size_t dataLen = sizeof(chunkData->fBufUnCompressed);
|
|
|
|
auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType);
|
|
if (!fCompressor)
|
|
{
|
|
return ERR_COMP_WRONG_COMP_TYPE;
|
|
}
|
|
|
|
if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize,
|
|
(unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
|
|
{
|
|
if (fIsFix)
|
|
{
|
|
uint64_t blocks = 512;
|
|
|
|
if (id == 0)
|
|
{
|
|
char* hdr = fileData->fFileHeader.fControlData;
|
|
|
|
if (compress::CompressInterface::getBlockCount(hdr) < 512)
|
|
blocks = 256;
|
|
}
|
|
|
|
dataLen = 8192 * blocks;
|
|
|
|
// load the uncompressed buffer with empty values.
|
|
char* buf = chunkData->fBufUnCompressed;
|
|
chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
|
|
|
|
if (fileData->fDctnryCol)
|
|
initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
|
|
else
|
|
initializeColumnChunk(buf, fileData);
|
|
}
|
|
else
|
|
{
|
|
logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_UNCOMPRESS;
|
|
}
|
|
}
|
|
|
|
//@bug 3313-Remove validation that incorrectly fails for long string store files
|
|
// WE_COMP_DBG(cout << "chunk uncompressed to " << dataLen << endl;)
|
|
// if (dataLen < (id+1) * BYTE_PER_BLOCK)
|
|
// {
|
|
// logMessage(ERR_COMP_UNCOMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
|
|
// return ERR_COMP_UNCOMPRESS;
|
|
// }
|
|
|
|
chunkData->fLenUnCompressed = dataLen;
|
|
}
|
|
else // new chunk
|
|
{
|
|
if (id == 0 && ptrs[id] == 0) // if the 1st ptr is not set for new extent
|
|
{
|
|
ptrs[0] = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData);
|
|
}
|
|
|
|
// load the uncompressed buffer with empty values.
|
|
char* buf = chunkData->fBufUnCompressed;
|
|
chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
|
|
|
|
if (fileData->fDctnryCol)
|
|
initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
|
|
else
|
|
initializeColumnChunk(buf, fileData);
|
|
}
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_INIT_BUF);
|
|
#endif
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initialize a column based chunk with the applicable empty values.
|
|
//------------------------------------------------------------------------------
|
|
void ChunkManager::initializeColumnChunk(char* buf, CompFileData* fileData)
|
|
{
|
|
int size = UNCOMPRESSED_CHUNK_SIZE;
|
|
const uint8_t* emptyVal = fFileOp->getEmptyRowValue(fileData->fColDataType, fileData->fColWidth);
|
|
fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, fileData->fColWidth);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Initialize a dictionary based chunk with empty blocks.
|
|
//------------------------------------------------------------------------------
|
|
void ChunkManager::initializeDctnryChunk(char* buf, int size)
|
|
{
|
|
Dctnry* dctnry = dynamic_cast<Dctnry*>(fFileOp);
|
|
memset(buf, 0, size);
|
|
char* end = buf + size;
|
|
|
|
while (buf < end)
|
|
{
|
|
dctnry->copyDctnryHeader(buf);
|
|
buf += BYTE_PER_BLOCK;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Compress and write the requested chunk (id) for fileData, to disk.
|
|
// id is: (fbo*BYTE_PER_BLOCK)/UNCOMPRESSED_CHUNK_SIZE
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::writeChunkToFile(CompFileData* fileData, int64_t id)
|
|
{
|
|
ChunkData* chunkData = fileData->findChunk(id);
|
|
|
|
if (!chunkData)
|
|
{
|
|
logMessage(ERR_COMP_CHUNK_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_CHUNK_NOT_FOUND;
|
|
}
|
|
|
|
return writeChunkToFile(fileData, chunkData);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Compress and write the given chunk for fileData to disk.
|
|
// If the chunk has been flagged for writing (fWriteToFile is true),
|
|
// then subsequent chunks in the file are shifted down as needed, if the com-
|
|
// pressed chunk will not fit in the currently available embedded free space.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::writeChunkToFile(CompFileData* fileData, ChunkData* chunkData)
|
|
{
|
|
WE_COMP_DBG(cout << "write chunk id=" << chunkData->fChunkId << " data "
|
|
<< ((chunkData->fWriteToFile) ? "changed" : "NOT changed") << endl;)
|
|
|
|
int rc = NO_ERROR; // return value
|
|
bool needReallocateChunks = false;
|
|
int64_t spaceAvl = 0;
|
|
|
|
if (chunkData->fWriteToFile)
|
|
{
|
|
#ifdef PROFILE
|
|
Stats::startParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS);
|
|
#endif
|
|
// compress the chunk before writing it to file
|
|
fLenCompressed = fMaxCompressedBufSize;
|
|
|
|
auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType);
|
|
if (!fCompressor)
|
|
{
|
|
return ERR_COMP_WRONG_COMP_TYPE;
|
|
}
|
|
|
|
if (fCompressor->compressBlock((char*)chunkData->fBufUnCompressed, chunkData->fLenUnCompressed,
|
|
(unsigned char*)fBufCompressed, fLenCompressed) != 0)
|
|
{
|
|
logMessage(ERR_COMP_COMPRESS, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_COMPRESS;
|
|
}
|
|
|
|
WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to " << fLenCompressed;)
|
|
|
|
// Removed padding code here, will add padding for the last chunk.
|
|
// The existing chunks are already correctly aligned, use the padding to absort chunk
|
|
// size increase when update. This improves the performance with less chunk shifting.
|
|
|
|
#ifdef PROFILE
|
|
Stats::stopParseEvent(WE_STATS_COMPRESS_DCT_COMPRESS);
|
|
#endif
|
|
|
|
// need more work if the new compressed buffer is larger
|
|
uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
|
|
ChunkId chunkId = chunkData->fChunkId;
|
|
|
|
if (ptrs[chunkId + 1] > 0)
|
|
spaceAvl = (ptrs[chunkId + 1] - ptrs[chunkId]);
|
|
|
|
WE_COMP_DBG(cout << ", available space:" << spaceAvl;)
|
|
|
|
bool lastChunk = true;
|
|
// usable chunkIds are 0 .. POINTERS_IN_HEADER-2
|
|
// [chunkId+0] is the start offset of current chunk.
|
|
// [chunkId+1] is the start offset of next chunk, the offset diff is current chunk size.
|
|
// [chunkId+2] is 0 or not indicates if the next chunk exists.
|
|
int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData);
|
|
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
int64_t usablePtrIds = (ptrSecSize / sizeof(uint64_t)) - 2;
|
|
|
|
if (chunkId < usablePtrIds) // make sure [chunkId+2] has valid value
|
|
lastChunk = (ptrs[(chunkId + 2)] == 0);
|
|
|
|
WE_COMP_DBG(cout << ", last chunk:" << (lastChunk ? "true" : "false") << endl;)
|
|
|
|
if (spaceAvl < 0)
|
|
{
|
|
logMessage(ERR_COMP_WRONG_PTR, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_WRONG_PTR;
|
|
}
|
|
|
|
if ((int64_t)fLenCompressed <= spaceAvl)
|
|
{
|
|
// There is enough sapce.
|
|
if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR)
|
|
{
|
|
// log in writeCompressedChunk by setFileOffset and writeFile
|
|
return rc;
|
|
}
|
|
}
|
|
else if (lastChunk)
|
|
{
|
|
// add padding space if the chunk is written first time
|
|
if (fCompressor->padCompressedChunks((unsigned char*)fBufCompressed, fLenCompressed,
|
|
fMaxCompressedBufSize) != 0)
|
|
{
|
|
WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padding failed." << endl;)
|
|
|
|
logMessage(ERR_COMP_PAD_DATA, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_PAD_DATA;
|
|
}
|
|
|
|
WE_COMP_DBG(cout << "Last chunk:" << chunkId << ", padded to " << fLenCompressed;)
|
|
|
|
// This is the last chunk, safe to write any length of data.
|
|
//@Bug 3888. Assign the error code
|
|
if ((rc = writeCompressedChunk(fileData, ptrs[chunkId], spaceAvl)) != NO_ERROR)
|
|
{
|
|
// log in writeCompressedChunk by setFileOffset and writeFile
|
|
return rc;
|
|
}
|
|
|
|
// Update the current chunk size.
|
|
ptrs[chunkId + 1] = ptrs[chunkId] + fLenCompressed;
|
|
}
|
|
else
|
|
{
|
|
needReallocateChunks = true;
|
|
}
|
|
}
|
|
|
|
if (!needReallocateChunks)
|
|
{
|
|
fActiveChunks.remove(make_pair(fileData->fFileID, chunkData));
|
|
fileData->fChunkList.remove(chunkData);
|
|
delete chunkData;
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Compressed data does not fit, caused a chunk shifting @line:" << __LINE__
|
|
<< " filename:" << fileData->fFileName << ", chunkId:" << chunkData->fChunkId
|
|
<< " data size:" << fLenCompressed << "/available:" << spaceAvl << " -- shifting ";
|
|
|
|
if ((rc = reallocateChunks(fileData)) == NO_ERROR)
|
|
{
|
|
oss << "SUCCESS";
|
|
logMessage(oss.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
else
|
|
{
|
|
oss << "FAILED";
|
|
logMessage(oss.str(), logging::LOG_TYPE_CRITICAL);
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Write the current compressed data in fBufCompressed to the specified segment
|
|
// file offset (offset) and file (fileData). For DML usage, "size" specifies
|
|
// how many bytes to backup, for error recovery. cpimport.bin does it's own
|
|
// backup and error recovery, so "size" is not applicable for bulk import usage.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::writeCompressedChunk(CompFileData* fileData, int64_t offset, int64_t size)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
if (!fIsBulkLoad && !fIsHdfs)
|
|
{
|
|
// backup current chunk to chk file
|
|
string chkFileName(fileData->fFileName + ".chk");
|
|
string aDMLLogFileName;
|
|
unsigned char* buf = new unsigned char[size];
|
|
|
|
if (((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__)) == NO_ERROR) &&
|
|
((rc = readFile(fileData->fFilePtr, fileData->fFileName, buf, size, __LINE__)) == NO_ERROR))
|
|
{
|
|
IDBDataFile* chkFilePtr = IDBDataFile::open(
|
|
IDBPolicy::getType(chkFileName.c_str(), IDBPolicy::WRITEENG), chkFileName.c_str(), "w+b", 0);
|
|
|
|
if (chkFilePtr)
|
|
{
|
|
rc = writeFile(chkFilePtr, chkFileName, buf, size, __LINE__);
|
|
delete chkFilePtr;
|
|
}
|
|
|
|
delete[] buf;
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
IDBPolicy::remove(chkFileName.c_str());
|
|
return rc;
|
|
}
|
|
|
|
// log the chunk information for recovery
|
|
rc = writeLog(fTransId, "chk", fileData->fFileName, aDMLLogFileName, size, offset);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "log " << fileData->fFileName << ".chk to DML logfile failed.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_INFO);
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
// write out the compressed data + padding
|
|
if ((rc == NO_ERROR) && ((rc = writeCompressedChunk_(fileData, offset)) == NO_ERROR))
|
|
{
|
|
if ((fileData->fFilePtr)->flush() != 0) //@Bug3162.
|
|
{
|
|
rc = ERR_FILE_WRITE;
|
|
ostringstream oss;
|
|
oss << "Failed to flush " << fileData->fFileName << " @line: " << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// write out the compressed data + padding
|
|
rc = writeCompressedChunk_(fileData, offset);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Actually write the current compressed data in fBufCompressed to the specified
|
|
// segment file offset (offset) and file (fileData)
|
|
//------------------------------------------------------------------------------
|
|
inline int ChunkManager::writeCompressedChunk_(CompFileData* fileData, int64_t offset)
|
|
{
|
|
int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, offset, __LINE__);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
return writeFile(fileData->fFilePtr, fileData->fFileName, fBufCompressed, fLenCompressed, __LINE__);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Open the specified segment file (fileData) using the given mode.
|
|
// ln is the source code line number of the code invoking this operation
|
|
// (ex __LINE__); this is used for logging error messages.
|
|
//
|
|
// useTmpSuffix controls whether HDFS file is opened with USE_TMPFILE bit set.
|
|
// Typically set for bulk load, single insert, and batch insert, when adding
|
|
// rows to an "existing" file.
|
|
// Typically always set for DML update and delete.
|
|
//
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag to API
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::openFile(CompFileData* fileData, const char* mode, int colWidth, bool useTmpSuffix,
|
|
int ln) const
|
|
{
|
|
int rc = NO_ERROR;
|
|
unsigned opts = IDBDataFile::USE_VBUF;
|
|
IDBPolicy::Contexts ctxt = IDBPolicy::WRITEENG;
|
|
|
|
if (fIsHdfs)
|
|
{
|
|
if (useTmpSuffix)
|
|
{
|
|
if (!fIsBulkLoad)
|
|
{
|
|
// keep a DML log for confirm or cleanup the .tmp file
|
|
string aDMLLogFileName;
|
|
|
|
if ((rc = writeLog(fTransId, "tmp", fileData->fFileName, aDMLLogFileName, 0)) != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to put " << fileData->fFileName << " into DML log.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
opts |= IDBDataFile::USE_TMPFILE;
|
|
}
|
|
}
|
|
|
|
fileData->fFilePtr = IDBDataFile::open(IDBPolicy::getType(fileData->fFileName.c_str(), ctxt),
|
|
fileData->fFileName.c_str(), mode, opts, colWidth);
|
|
|
|
if (fileData->fFilePtr == NULL)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to open compressed data file " << fileData->fFileName << " @line: " << ln;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
rc = ERR_COMP_OPEN_FILE;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Set the file offset for the specified segment file (fileData) using the
|
|
// given offset.
|
|
// ln is the source code line number of the code invoking this operation
|
|
// (ex __LINE__); this is used for logging error messages. Likewise, filename
|
|
// is used for logging any error message.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::setFileOffset(IDBDataFile* pFile, const string& fileName, off64_t offset, int ln) const
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
if (pFile->seek(offset, SEEK_SET) != 0)
|
|
rc = ERR_COMP_SET_OFFSET;
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to set offset in compressed data file " << fileName << " @line: " << ln
|
|
<< " offset:" << offset;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Read the requested number of bytes (size) from the specified file pFile.
|
|
// ln is the source code line number of the code invoking this operation
|
|
// (ex __LINE__); this is used for logging error messages. Likewise, filename
|
|
// is used for logging any error message.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::readFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const
|
|
{
|
|
size_t bytes = pFile->read(buf, size);
|
|
|
|
if (bytes != size)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to read from compressed data file " << fileName << " @line: " << ln
|
|
<< " read/expect:" << bytes << "/" << size;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_COMP_READ_FILE;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Write the requested number of bytes (size) to the specified file pFile.
|
|
// ln is the source code line number of the code invoking this operation
|
|
// (ex __LINE__); this is used for logging error messages. Likewise, filename
|
|
// is used for logging any error message.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::writeFile(IDBDataFile* pFile, const string& fileName, void* buf, size_t size, int ln) const
|
|
{
|
|
size_t bytes = pFile->write(buf, size);
|
|
|
|
if (bytes != size)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to write to compressed data file " << fileName << " @line: " << ln
|
|
<< " written/expect:" << bytes << "/" << size;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_COMP_WRITE_FILE;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Close the specified segment file (fileData), and remove the
|
|
// corresponding CompFileData reference from fFileMap and fFilePtrMap.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::closeFile(CompFileData* fileData)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
WE_COMP_DBG(cout << "closing file:" << fileData->fFileName << endl;)
|
|
fFileMap.erase(fileData->fFileID);
|
|
fFilePtrMap.erase(fileData->fFilePtr);
|
|
|
|
if (fileData->fFilePtr)
|
|
delete fileData->fFilePtr;
|
|
|
|
delete fileData;
|
|
fileData = NULL;
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Write the chunk pointers headers for the specified file (fileData).
|
|
// ln is the source code line number of the code invoking this operation
|
|
// (ex __LINE__); this is used for logging error messages. For DML usage,
|
|
// backup for recovery is also performed. This step is skipped for cpimport.bin
|
|
// as bulk import performs its own backup and recovery operations.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::writeHeader(CompFileData* fileData, int ln)
|
|
{
|
|
int rc = NO_ERROR;
|
|
int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData);
|
|
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
|
|
if (!fIsHdfs && !fIsBulkLoad)
|
|
{
|
|
// write a backup header
|
|
string hdrFileName(fileData->fFileName + ".hdr");
|
|
string aDMLLogFileName;
|
|
IDBDataFile* hdrFilePtr = IDBDataFile::open(IDBPolicy::getType(hdrFileName.c_str(), IDBPolicy::WRITEENG),
|
|
hdrFileName.c_str(), "w+b", 0, fileData->fColWidth);
|
|
|
|
if (hdrFilePtr)
|
|
{
|
|
rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT,
|
|
__LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
rc = writeFile(hdrFilePtr, hdrFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__);
|
|
|
|
delete hdrFilePtr;
|
|
}
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
// log the header information for recovery
|
|
rc = writeLog(fTransId, "hdr", fileData->fFileName, aDMLLogFileName, headerSize);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "log " << fileData->fFileName << ".hdr to DML logfile failed.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
|
|
if ((rc == NO_ERROR) && (rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR)
|
|
{
|
|
(fileData->fFilePtr)->flush();
|
|
}
|
|
}
|
|
else
|
|
{
|
|
IDBPolicy::remove(hdrFileName.c_str());
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if ((rc = writeHeader_(fileData, ptrSecSize)) == NO_ERROR)
|
|
{
|
|
(fileData->fFilePtr)->flush();
|
|
}
|
|
}
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "write header failed: " << fileData->fFileName << "call from line:" << ln;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Write the chunk pointers headers for the specified file (fileData).
|
|
// ln is the source code line number of the code invoking this operation
|
|
// (ex __LINE__); this is used for logging error messages. For DML usage,
|
|
// backup for recovery is also performed. This step is skipped for cpimport.bin
|
|
// as bulk import performs its own backup and recovery operations.
|
|
//------------------------------------------------------------------------------
|
|
inline int ChunkManager::writeHeader_(CompFileData* fileData, int ptrSecSize)
|
|
{
|
|
int rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, 0, __LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
rc = writeFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
|
|
COMPRESSED_FILE_HEADER_UNIT, __LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
rc = writeFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection, ptrSecSize,
|
|
__LINE__);
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// For the specified segment file (pFile), read in an abbreviated/compressed
|
|
// chunk extent, uncompress, and expand to a full chunk for a full extent.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::expandAbbrevColumnExtent(IDBDataFile* pFile, const uint8_t* emptyVal, int width)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
|
|
|
|
if (i == fFilePtrMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
int rc = NO_ERROR;
|
|
// fetch the initial chunk if not already done.
|
|
ChunkData* chunkData = (i->second)->findChunk(0);
|
|
|
|
if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR))
|
|
return rc;
|
|
|
|
// buf points to the end of existing data
|
|
char* buf = chunkData->fBufUnCompressed + chunkData->fLenUnCompressed;
|
|
int size = UNCOMPRESSED_CHUNK_SIZE - chunkData->fLenUnCompressed;
|
|
fFileOp->setEmptyBuf((unsigned char*)buf, size, emptyVal, width);
|
|
chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
|
|
chunkData->fWriteToFile = true;
|
|
//(writeChunkToFile(i->second, chunkData));
|
|
//(writeHeader(i->second, __LINE__));
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// For column segment file:
|
|
// Increment the block count stored in the chunk header used to track how many
|
|
// blocks are allocated to the corresponding segment file.
|
|
//------------------------------------------------------------------------------
|
|
// same here as for dict.
|
|
int ChunkManager::updateColumnExtent(IDBDataFile* pFile, int addBlockCount, int64_t lbid)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
|
|
|
|
if (i == fFilePtrMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
CompFileData* pFileData = i->second;
|
|
|
|
if (!pFileData)
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
int rc = NO_ERROR;
|
|
char* hdr = pFileData->fFileHeader.fControlData;
|
|
compress::CompressInterface::setBlockCount(hdr,
|
|
compress::CompressInterface::getBlockCount(hdr) + addBlockCount);
|
|
compress::CompressInterface::setLBIDByIndex(hdr, lbid, 1);
|
|
|
|
ChunkData* chunkData = (pFileData)->findChunk(0);
|
|
|
|
if (chunkData != NULL)
|
|
{
|
|
if ((rc = writeChunkToFile(pFileData, chunkData)) == NO_ERROR)
|
|
{
|
|
rc = writeHeader(pFileData, __LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
//@Bug 4977 remove log files
|
|
removeBackups(fTransId);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "write chunk to file failed when updateColumnExtent: " << pFileData->fFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
}
|
|
|
|
pFile->flush();
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// For dictionary store segment file:
|
|
// Increment the block count stored in the chunk header used to track how many
|
|
// blocks are allocated to the corresponding segment file.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::updateDctnryExtent(IDBDataFile* pFile, int addBlockCount, BRM::LBID_t lbid)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.find(pFile);
|
|
|
|
if (i == fFilePtrMap.end())
|
|
{
|
|
logMessage(ERR_COMP_FILE_NOT_FOUND, logging::LOG_TYPE_ERROR, __LINE__);
|
|
return ERR_COMP_FILE_NOT_FOUND;
|
|
}
|
|
|
|
int rc = NO_ERROR;
|
|
// fetch the initial chunk if not already done.
|
|
ChunkData* chunkData = (i->second)->findChunk(0);
|
|
|
|
if ((chunkData == NULL) && ((rc = fetchChunkFromFile(pFile, 0, chunkData)) != NO_ERROR))
|
|
return rc; // logged by fetchChunkFromFile
|
|
|
|
char* hdr = i->second->fFileHeader.fControlData;
|
|
char* uncompressedBuf = chunkData->fBufUnCompressed;
|
|
int currentBlockCount = compress::CompressInterface::getBlockCount(hdr);
|
|
|
|
// Bug 3203, write out the compressed initial extent.
|
|
if (currentBlockCount == 0)
|
|
{
|
|
int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK;
|
|
initializeDctnryChunk(uncompressedBuf, initSize);
|
|
chunkData->fWriteToFile = true;
|
|
|
|
if ((rc = writeChunkToFile(i->second, chunkData)) == NO_ERROR)
|
|
{
|
|
rc = writeHeader(i->second, __LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
//@Bug 4977 remove the log file
|
|
removeBackups(fTransId);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "write chunk to file failed when updateDctnryExtent: " << i->second->fFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
}
|
|
else if (currentBlockCount == NUM_BLOCKS_PER_INITIAL_EXTENT)
|
|
{
|
|
int initSize = NUM_BLOCKS_PER_INITIAL_EXTENT * BYTE_PER_BLOCK;
|
|
int incrSize = UNCOMPRESSED_CHUNK_SIZE - initSize;
|
|
initializeDctnryChunk(uncompressedBuf + initSize, incrSize);
|
|
uint64_t* ptrs = reinterpret_cast<uint64_t*>(i->second->fFileHeader.fPtrSection);
|
|
ptrs[1] = 0; // the compressed chunk size is unknown
|
|
}
|
|
|
|
if (rc == NO_ERROR)
|
|
compress::CompressInterface::setBlockCount(
|
|
hdr, compress::CompressInterface::getBlockCount(hdr) + addBlockCount);
|
|
|
|
if (currentBlockCount)
|
|
{
|
|
// Append to the end.
|
|
uint64_t lbidCount = compress::CompressInterface::getLBIDCount(hdr);
|
|
compress::CompressInterface::setLBIDByIndex(hdr, lbid, lbidCount);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Close any open segment files, and free up memory.
|
|
//------------------------------------------------------------------------------
|
|
void ChunkManager::cleanUp(const std::map<FID, FID>& columOids)
|
|
{
|
|
WE_COMP_DBG(cout << "cleanUp with " << fActiveChunks.size() << " active chunk(s)." << endl;)
|
|
std::map<FID, FID>::const_iterator it;
|
|
map<IDBDataFile*, CompFileData*>::iterator i = fFilePtrMap.begin();
|
|
|
|
while (i != fFilePtrMap.end())
|
|
{
|
|
CompFileData* fileData = i->second;
|
|
|
|
it = columOids.find(fileData->fFid);
|
|
|
|
if (fIsInsert && it != columOids.end())
|
|
{
|
|
list<ChunkData*>& chunks = fileData->fChunkList;
|
|
|
|
for (list<ChunkData*>::iterator j = chunks.begin(); j != chunks.end(); ++j)
|
|
delete *j;
|
|
|
|
delete fileData->fFilePtr;
|
|
fFileMap.erase(fileData->fFileID);
|
|
fFilePtrMap.erase(i++);
|
|
|
|
delete fileData;
|
|
}
|
|
else if (!fIsInsert || (columOids.size() == 0))
|
|
{
|
|
list<ChunkData*>& chunks = fileData->fChunkList;
|
|
|
|
for (list<ChunkData*>::iterator j = chunks.begin(); j != chunks.end(); ++j)
|
|
delete *j;
|
|
|
|
delete fileData->fFilePtr;
|
|
fFileMap.erase(fileData->fFileID);
|
|
fFilePtrMap.erase(i++);
|
|
|
|
delete fileData;
|
|
}
|
|
else
|
|
{
|
|
i++;
|
|
}
|
|
}
|
|
|
|
if (fDropFdCache)
|
|
{
|
|
cacheutils::dropPrimProcFdCache();
|
|
fDropFdCache = false;
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Read "n" blocks from pFile starting at fbo, into readBuf.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::readBlocks(IDBDataFile* pFile, unsigned char* readBuf, uint64_t fbo, size_t n)
|
|
{
|
|
WE_COMP_DBG(cout << "backup blocks fbo:" << fbo << " num:" << n << " file:" << pFile << endl;)
|
|
|
|
// safety check
|
|
if (pFile == NULL || n < 1)
|
|
{
|
|
return -1;
|
|
}
|
|
|
|
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
|
|
|
|
if (fpIt == fFilePtrMap.end())
|
|
{
|
|
return -1;
|
|
}
|
|
|
|
// the n blocks may cross more than one chunk
|
|
// find the chunk ID and offset of the 1st fbo
|
|
lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
|
|
int idx = offset.quot; // current chunk id
|
|
int rem = offset.rem; // offset in current chunk
|
|
int num = UNCOMPRESSED_CHUNK_SIZE - rem; // # of bytes available in current chunk
|
|
int left = n * BYTE_PER_BLOCK; // # of bytest to be read
|
|
// # of bytes to be read from current chunk
|
|
num = (left > num) ? num : left;
|
|
|
|
do
|
|
{
|
|
ChunkData* chunkData = (fpIt->second)->findChunk(idx);
|
|
|
|
WE_COMP_DBG(cout << "id:" << idx << " ofst:" << rem << " num:" << num << " left:" << left << endl;)
|
|
|
|
// chunk is not already uncompressed
|
|
if (chunkData == NULL)
|
|
{
|
|
if (fetchChunkFromFile(pFile, idx, chunkData) != NO_ERROR)
|
|
{
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
// copy the data at fbo to readBuf
|
|
memcpy(readBuf, chunkData->fBufUnCompressed + rem, num);
|
|
|
|
// prepare for the next read
|
|
readBuf += num;
|
|
rem = 0;
|
|
left -= num;
|
|
num = (left > UNCOMPRESSED_CHUNK_SIZE) ? UNCOMPRESSED_CHUNK_SIZE : left;
|
|
idx++;
|
|
} while (left > 0);
|
|
|
|
return n;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Write the a block (writeBuf) into the fbo block of the specified file.
|
|
// Updated chunk is not flushed to disk but left pending in the applicable
|
|
// CompFileData object.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::restoreBlock(IDBDataFile* pFile, const unsigned char* writeBuf, uint64_t fbo)
|
|
{
|
|
WE_COMP_DBG(cout << "restore blocks fbo:" << fbo << " file:" << pFile << endl;)
|
|
|
|
// safety check
|
|
if (pFile == NULL)
|
|
return -1;
|
|
|
|
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
|
|
|
|
if (fpIt == fFilePtrMap.end())
|
|
return -1;
|
|
|
|
// the n blocks may cross more than one chunk
|
|
// find the chunk ID and offset of the 1st fbo
|
|
lldiv_t offset = lldiv(fbo * BYTE_PER_BLOCK, UNCOMPRESSED_CHUNK_SIZE);
|
|
ChunkData* chunkData = (fpIt->second)->findChunk(offset.quot);
|
|
WE_COMP_DBG(cout << "id:" << offset.quot << " ofst:" << offset.rem << endl;)
|
|
|
|
// chunk is not already uncompressed
|
|
if (chunkData == NULL)
|
|
{
|
|
if (fetchChunkFromFile(pFile, offset.quot, chunkData) != NO_ERROR)
|
|
return -1;
|
|
}
|
|
|
|
// copy the data to chunk buffer
|
|
memcpy(chunkData->fBufUnCompressed + offset.rem, writeBuf, BYTE_PER_BLOCK);
|
|
chunkData->fWriteToFile = true;
|
|
|
|
return BYTE_PER_BLOCK;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Get the allocated block count from the header, for the specified file (pFile)
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::getBlockCount(IDBDataFile* pFile)
|
|
{
|
|
map<IDBDataFile*, CompFileData*>::iterator fpIt = fFilePtrMap.find(pFile);
|
|
idbassert(fpIt != fFilePtrMap.end());
|
|
|
|
return compress::CompressInterface::getBlockCount(fpIt->second->fFileHeader.fControlData);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Set the FileOp pointer and dictionary flag
|
|
//------------------------------------------------------------------------------
|
|
void ChunkManager::fileOp(FileOp* fileOp)
|
|
{
|
|
fFileOp = fileOp;
|
|
|
|
if (fileOp)
|
|
{
|
|
setTransId(fileOp->getTransId());
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Calculate and return the size of the chunk pointer header for a column of the
|
|
// specified width.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::calculateHeaderSize(int width)
|
|
{
|
|
int headerUnits = 1;
|
|
|
|
// dictionary columns may need variable length header
|
|
if (width > 8)
|
|
{
|
|
int extentsPerFile = Config::getExtentsPerSegmentFile();
|
|
int rowsPerExtent = BRMWrapper::getInstance()->getExtentRows();
|
|
int rowsPerFile = rowsPerExtent * extentsPerFile;
|
|
int stringsPerBlock = 8180 / (width + 2); // 8180 = 8192 - 12
|
|
|
|
// BLOB is 1 string per block
|
|
if (stringsPerBlock == 0)
|
|
stringsPerBlock = 1;
|
|
|
|
int blocksNeeded = rowsPerFile / stringsPerBlock;
|
|
int blocksPerChunk = UNCOMPRESSED_CHUNK_SIZE / BYTE_PER_BLOCK;
|
|
lldiv_t chunks = lldiv(blocksNeeded, blocksPerChunk);
|
|
int chunksNeeded = chunks.quot + (chunks.rem ? 1 : 0); // round up
|
|
int ptrsNeeded = chunksNeeded + 1; // 1 more ptr for 0 ptr marking end
|
|
int ptrsIn4K = (4 * 1024) / sizeof(uint64_t);
|
|
lldiv_t hdrs = lldiv(ptrsNeeded, ptrsIn4K);
|
|
headerUnits = hdrs.quot + (hdrs.rem ? 1 : 0); // round up
|
|
|
|
// Always include odd number of 4K ptr headers, so that when we add the
|
|
// single 4K control header, the cumulative header space will be an even
|
|
// multiple of an 8K boundary.
|
|
if ((headerUnits % 2) == 0)
|
|
headerUnits++;
|
|
}
|
|
|
|
headerUnits++; // add the control data block
|
|
return (headerUnits * COMPRESSED_FILE_HEADER_UNIT);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Reallocate the chunks in a file to account for an expanding chunk that will
|
|
// not fit in the available embedded free space.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::reallocateChunks(CompFileData* fileData)
|
|
{
|
|
WE_COMP_DBG(cout << "reallocate chunks in " << fileData->fFileName << " (" << fileData->fFilePtr << ")"
|
|
<< endl;)
|
|
|
|
// return value
|
|
int rc = NO_ERROR;
|
|
|
|
// original file info
|
|
string origFileName = fileData->fFileName;
|
|
IDBDataFile* origFilePtr = fileData->fFilePtr;
|
|
origFilePtr->flush();
|
|
|
|
// back out the current pointers
|
|
int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData);
|
|
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
compress::CompChunkPtrList origPtrs;
|
|
|
|
if (compress::CompressInterface::getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, origPtrs) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Chunk shifting failed, file:" << origFileName << " -- invalid header.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_COMP_PARSE_HDRS;
|
|
}
|
|
|
|
// get the chunks already in memory
|
|
list<ChunkData*>& chunkList = fileData->fChunkList;
|
|
chunkList.sort(chunkDataPtrLessCompare);
|
|
list<ChunkData*>::iterator j = chunkList.begin();
|
|
int numOfChunks = origPtrs.size(); // number of chunks that contain user data
|
|
vector<ChunkData*> chunksTouched; // chunk data is being modified, and in memory
|
|
|
|
for (int i = 0; i < numOfChunks; i++)
|
|
chunksTouched.push_back(NULL);
|
|
|
|
// mark touched chunks
|
|
while (j != chunkList.end())
|
|
{
|
|
chunksTouched[(*j)->fChunkId] = *j;
|
|
j++;
|
|
}
|
|
|
|
// new file name and pointer
|
|
string rlcFileName(fileData->fFileName + ".rlc");
|
|
IDBDataFile* rlcFilePtr = IDBDataFile::open(IDBPolicy::getType(rlcFileName.c_str(), IDBPolicy::WRITEENG),
|
|
rlcFileName.c_str(), "w+b", 0, fileData->fColWidth);
|
|
|
|
if (!rlcFilePtr)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Chunk shifting failed, file:" << origFileName << " -- cannot open rlc file.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
// log the recover information here
|
|
string aDMLLogFileName;
|
|
rc = writeLog(fTransId, "rlc", fileData->fFileName, aDMLLogFileName);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "log " << fileData->fFileName << ".rlc to DML logfile failed.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
delete rlcFilePtr;
|
|
rlcFilePtr = NULL;
|
|
IDBPolicy::remove(rlcFileName.c_str());
|
|
return rc;
|
|
}
|
|
|
|
// !!! May conside to use mmap to speed up the copy !!!
|
|
// !!! copy the whole file and update the shifting part !!!
|
|
|
|
// store updated chunk pointers
|
|
uint64_t* ptrs = reinterpret_cast<uint64_t*>(fileData->fFileHeader.fPtrSection);
|
|
ptrs[0] = origPtrs[0].first; // the first chunk offset has no change.
|
|
|
|
// bug3913, file size 0 after reallocate.
|
|
// write the header, to be updated later, make sure there is someing in the file
|
|
if ((rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData,
|
|
COMPRESSED_FILE_HEADER_UNIT, __LINE__)) == NO_ERROR)
|
|
rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__);
|
|
|
|
int k = 0;
|
|
|
|
for (; k < numOfChunks && rc == NO_ERROR; k++)
|
|
{
|
|
uint64_t chunkSize = 0; // size of current chunk
|
|
unsigned char* buf = NULL; // output buffer
|
|
|
|
// Find the current chunk size, and allocate the data -- buf point to the data.
|
|
if (chunksTouched[k] == NULL)
|
|
{
|
|
// Chunks not touched will be copied to new file without being uncompressed first.
|
|
// cout << "reallocateChunks: chunk has not been updated" << endl;
|
|
chunkSize = origPtrs[k].second;
|
|
|
|
// read disk data into compressed data buffer
|
|
buf = (unsigned char*)fBufCompressed;
|
|
|
|
if ((rc = setFileOffset(origFilePtr, origFileName, origPtrs[k].first, __LINE__)) != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "set file offset failed @line:" << __LINE__ << "with retCode:" << rc
|
|
<< " filename:" << origFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
continue;
|
|
}
|
|
|
|
if ((rc = readFile(origFilePtr, origFileName, buf, chunkSize, __LINE__)) != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "readfile failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << origFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
continue;
|
|
}
|
|
}
|
|
else // chunksTouched[k] != NULL
|
|
{
|
|
// chunk has been updated, and in memory.
|
|
// cout << "reallocateChunks: chunk has been updated" << endl;
|
|
ChunkData* chunkData = chunksTouched[k];
|
|
fLenCompressed = fMaxCompressedBufSize;
|
|
|
|
auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType);
|
|
if (!fCompressor)
|
|
{
|
|
return ERR_COMP_WRONG_COMP_TYPE;
|
|
}
|
|
|
|
if ((rc = fCompressor->compressBlock((char*)chunkData->fBufUnCompressed, chunkData->fLenUnCompressed,
|
|
(unsigned char*)fBufCompressed, fLenCompressed)) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc
|
|
<< " filename:" << rlcFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
rc = ERR_COMP_COMPRESS;
|
|
continue;
|
|
}
|
|
|
|
WE_COMP_DBG(cout << "Chunk compressed from " << chunkData->fLenUnCompressed << " to "
|
|
<< fLenCompressed;)
|
|
|
|
// shifting chunk, add padding space
|
|
if ((rc = fCompressor->padCompressedChunks((unsigned char*)fBufCompressed, fLenCompressed,
|
|
fMaxCompressedBufSize)) != 0)
|
|
{
|
|
WE_COMP_DBG(cout << ", but padding failed." << endl;)
|
|
ostringstream oss;
|
|
oss << "Compress data failed @line:" << __LINE__ << "with retCode:" << rc
|
|
<< " filename:" << rlcFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
rc = ERR_COMP_PAD_DATA;
|
|
continue;
|
|
}
|
|
|
|
WE_COMP_DBG(cout << ", and padded to " << fLenCompressed;)
|
|
|
|
buf = (unsigned char*)fBufCompressed;
|
|
chunkSize = fLenCompressed;
|
|
}
|
|
|
|
// write is in sequence, no need to call setFileOffset
|
|
// cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr <<
|
|
// endl;
|
|
rc = writeFile(rlcFilePtr, rlcFileName, buf, chunkSize, __LINE__);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
// cout << "reallocateChunks: writing to temp file " << rlcFileName << " with fileptr " << rlcFilePtr <<
|
|
// " failed" << endl;
|
|
ostringstream oss;
|
|
oss << "write file failed @line:" << __LINE__ << "with retCode:" << rc << " filename:" << rlcFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
continue;
|
|
}
|
|
|
|
// Update the current chunk size.
|
|
ptrs[k + 1] = ptrs[k] + chunkSize;
|
|
}
|
|
|
|
// up to now, everything OK: rc == NO_ERROR
|
|
// remove all written chunks from active chunk list.
|
|
j = chunkList.begin();
|
|
|
|
while (j != chunkList.end())
|
|
{
|
|
ChunkData* chunkData = *j;
|
|
fActiveChunks.remove(make_pair(fileData->fFileID, chunkData));
|
|
fileData->fChunkList.remove(chunkData);
|
|
delete chunkData;
|
|
|
|
j = chunkList.begin();
|
|
}
|
|
|
|
// finally update the header
|
|
if (rc == NO_ERROR)
|
|
rc = setFileOffset(rlcFilePtr, rlcFileName, 0, __LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fControlData, COMPRESSED_FILE_HEADER_UNIT,
|
|
__LINE__);
|
|
|
|
if (rc == NO_ERROR)
|
|
rc = writeFile(rlcFilePtr, rlcFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
struct timeval tv;
|
|
gettimeofday(&tv, 0);
|
|
struct tm ltm;
|
|
localtime_r(reinterpret_cast<time_t*>(&tv.tv_sec), <m);
|
|
char tmText[24];
|
|
// this snprintf call causes a compiler warning b/c buffer size is less
|
|
// then maximum string size.
|
|
#if defined(__GNUC__) && __GNUC__ >= 7
|
|
#pragma GCC diagnostic push
|
|
#pragma GCC diagnostic ignored "-Wformat-truncation="
|
|
snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1,
|
|
ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec);
|
|
#pragma GCC diagnostic pop
|
|
#else
|
|
snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1,
|
|
ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec);
|
|
#endif
|
|
string dbgFileName(rlcFileName + tmText);
|
|
|
|
ostringstream oss;
|
|
oss << "Chunk shifting failed, file:" << origFileName;
|
|
|
|
if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0)
|
|
oss << ", rlc file is:" << dbgFileName;
|
|
|
|
// write out the header for debugging in case the header in rlc file is bad or not updated.
|
|
string rlcPtrFileName(dbgFileName + ".ptr");
|
|
|
|
IDBDataFile* rlcPtrFilePtr =
|
|
IDBDataFile::open(IDBPolicy::getType(rlcPtrFileName.c_str(), IDBPolicy::WRITEENG),
|
|
rlcPtrFileName.c_str(), "w+b", 0, fileData->fColWidth);
|
|
|
|
if (rlcPtrFilePtr &&
|
|
(writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData,
|
|
COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) &&
|
|
(writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection, ptrSecSize, __LINE__) ==
|
|
NO_ERROR))
|
|
{
|
|
oss << ", rlc file header in memory: " << rlcPtrFileName;
|
|
}
|
|
else
|
|
{
|
|
oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName;
|
|
}
|
|
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
closeFile(fileData);
|
|
delete rlcFilePtr;
|
|
rlcFilePtr = NULL;
|
|
|
|
if (rlcPtrFilePtr != NULL)
|
|
{
|
|
delete rlcPtrFilePtr;
|
|
rlcPtrFilePtr = NULL;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
// update the file pointer map w/ new file pointer
|
|
// cout << "realloc1: remove ptr = " << fileData->fFilePtr << endl;
|
|
fFilePtrMap.erase(fileData->fFilePtr);
|
|
delete fileData->fFilePtr;
|
|
fileData->fFilePtr = NULL;
|
|
delete rlcFilePtr;
|
|
rlcFilePtr = NULL;
|
|
|
|
// put reallocated file size here for logging purpose.
|
|
uint64_t fileSize = 0;
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
|
|
// @bug3913, keep the original file until the new file is properly renamed.
|
|
// 1. check the new file size is NOT 0, matching ptr[k].
|
|
// 2. mv the current to be backup.
|
|
// 3. rename the rlc file.
|
|
// 4. check the file size again.
|
|
// 5. verify each chunk.
|
|
// 5. rm the bak file or mv bak file back.
|
|
|
|
// check the new file size using two methods mostly for curiosity on 0 size file.
|
|
// They can be removed because all chunks are to be verified after rename.
|
|
if (IDBPolicy::size(rlcFileName.c_str()) != (int64_t)ptrs[k])
|
|
{
|
|
ostringstream oss;
|
|
oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize
|
|
<< ", filename:" << rlcFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
rc = ERR_COMP_RENAME_FILE;
|
|
}
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
if (fIsHdfs)
|
|
rc = swapTmpFile(rlcFileName, fileData->fFileName + ".tmp");
|
|
else
|
|
rc = swapTmpFile(rlcFileName, fileData->fFileName);
|
|
}
|
|
|
|
if ((rc == NO_ERROR) && (rc = openFile(fileData, "r+b", fileData->fColWidth, true, __LINE__)) ==
|
|
NO_ERROR) // @bug 5572 HDFS tmp file
|
|
{
|
|
fileSize = fileData->fFilePtr->size();
|
|
|
|
if (fileSize == ptrs[k])
|
|
{
|
|
rc = verifyChunksAfterRealloc(fileData);
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "Incorrect file size, expect:" << ptrs[k] << ", stat:" << fileSize
|
|
<< ", filename:" << fileData->fFileName;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
rc = ERR_COMP_RENAME_FILE;
|
|
}
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
fFilePtrMap.insert(make_pair(fileData->fFilePtr, fileData));
|
|
// cout << "realloc2: insert ptr = " << fileData->fFilePtr << endl;
|
|
// notify the PrimProc of unlinking original data file
|
|
fDropFdCache = true;
|
|
}
|
|
}
|
|
|
|
if (!fIsHdfs)
|
|
{
|
|
string bakFileName(fileData->fFileName + ".orig");
|
|
|
|
if (rc == NO_ERROR)
|
|
{
|
|
// unlink the original file (remove is portable)
|
|
if (fFs.remove(bakFileName.c_str()) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "remove backup file " << bakFileName << " failed: " << strerror(errno);
|
|
|
|
// not much we can do, log an info message for manual cleanup
|
|
logMessage(oss.str(), logging::LOG_TYPE_INFO);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// keep the bad file for debugging purpose
|
|
if (fFs.rename(fileData->fFileName.c_str(), rlcFileName.c_str()) == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "data file after chunk shifting failed verification.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
|
|
// roll back the bak file
|
|
if (fFs.rename(bakFileName.c_str(), fileData->fFileName.c_str()) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "rename " << bakFileName << " to " << fileData->fFileName << " failed: " << strerror(errno);
|
|
|
|
// must manually move it back
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!fIsHdfs)
|
|
{
|
|
if (rc == NO_ERROR)
|
|
{
|
|
// remove the log file
|
|
fFs.remove(aDMLLogFileName.c_str());
|
|
}
|
|
else
|
|
{
|
|
struct timeval tv;
|
|
gettimeofday(&tv, 0);
|
|
struct tm ltm;
|
|
localtime_r(reinterpret_cast<time_t*>(&tv.tv_sec), <m);
|
|
char tmText[24];
|
|
// this snprintf call causes a compiler warning b/c buffer size is less
|
|
// then maximum string size.
|
|
#if defined(__GNUC__) && __GNUC__ >= 7
|
|
#pragma GCC diagnostic push
|
|
#pragma GCC diagnostic ignored "-Wformat-truncation="
|
|
snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1,
|
|
ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec);
|
|
#pragma GCC diagnostic pop
|
|
#else
|
|
snprintf(tmText, sizeof(tmText), ".%04d%02d%02d%02d%02d%02d%06ld", ltm.tm_year + 1900, ltm.tm_mon + 1,
|
|
ltm.tm_mday, ltm.tm_hour, ltm.tm_min, ltm.tm_sec, tv.tv_usec);
|
|
#endif
|
|
string dbgFileName(rlcFileName + tmText);
|
|
|
|
ostringstream oss;
|
|
oss << "Chunk shifting failed, file:" << origFileName;
|
|
|
|
if (IDBPolicy::rename(rlcFileName.c_str(), dbgFileName.c_str()) == 0)
|
|
oss << ", rlc file is:" << dbgFileName;
|
|
|
|
// write out the header for debugging in case the header in rlc file is bad.
|
|
string rlcPtrFileName(dbgFileName + ".hdr");
|
|
IDBDataFile* rlcPtrFilePtr = IDBDataFile::open(
|
|
IDBPolicy::getType(rlcPtrFileName.c_str(), IDBPolicy::WRITEENG), rlcPtrFileName.c_str(), "w+b", 0);
|
|
|
|
if (rlcPtrFilePtr &&
|
|
(writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fControlData,
|
|
COMPRESSED_FILE_HEADER_UNIT, __LINE__) == NO_ERROR) &&
|
|
(writeFile(rlcPtrFilePtr, rlcPtrFileName, fileData->fFileHeader.fPtrSection, ptrSecSize,
|
|
__LINE__) == NO_ERROR))
|
|
{
|
|
oss << ", rlc file header in memory: " << rlcPtrFileName;
|
|
}
|
|
else
|
|
{
|
|
oss << ", possible incomplete rlc file header in memory: " << rlcPtrFileName;
|
|
}
|
|
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
closeFile(fileData);
|
|
|
|
if (rlcFilePtr != NULL)
|
|
{
|
|
delete rlcFilePtr;
|
|
rlcFilePtr = NULL;
|
|
}
|
|
|
|
if (rlcPtrFilePtr != NULL)
|
|
{
|
|
delete rlcPtrFilePtr;
|
|
rlcPtrFilePtr = NULL;
|
|
}
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Verify chunks can be uncompressed after a chunk shift.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::verifyChunksAfterRealloc(CompFileData* fileData)
|
|
{
|
|
int rc = NO_ERROR;
|
|
|
|
// read in the header
|
|
if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fControlData,
|
|
COMPRESSED_FILE_HEADER_UNIT, __LINE__)) != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to read control header from new " << fileData->fFileName << ", roll back";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
return rc;
|
|
}
|
|
|
|
// make sure the header is valid
|
|
if ((rc = compress::CompressInterface::verifyHdr(fileData->fFileHeader.fControlData)) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Invalid header in new " << fileData->fFileName << ", roll back";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
|
|
return rc;
|
|
}
|
|
|
|
int headerSize = compress::CompressInterface::getHdrSize(fileData->fFileHeader.fControlData);
|
|
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
|
|
// read in the pointer section in header
|
|
if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fileData->fFileHeader.fPtrSection, ptrSecSize,
|
|
__LINE__)) != NO_ERROR)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to read pointer header from new " << fileData->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
// get pointer list
|
|
compress::CompChunkPtrList ptrs;
|
|
|
|
if (compress::CompressInterface::getPtrList(fileData->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to parse pointer list from new " << fileData->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_COMP_PARSE_HDRS;
|
|
}
|
|
|
|
// now verify each chunk
|
|
ChunkData chunkData;
|
|
int numOfChunks = ptrs.size(); // number of chunks in the file
|
|
|
|
auto fCompressor = compress::getCompressorByType(fCompressorPool, fileData->fCompressionType);
|
|
if (!fCompressor)
|
|
{
|
|
return ERR_COMP_WRONG_COMP_TYPE;
|
|
}
|
|
|
|
for (int i = 0; i < numOfChunks && rc == NO_ERROR; i++)
|
|
{
|
|
unsigned int chunkSize = ptrs[i].second;
|
|
|
|
if ((rc = setFileOffset(fileData->fFilePtr, fileData->fFileName, ptrs[i].first, __LINE__)))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to setFileOffset new " << fileData->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
continue;
|
|
}
|
|
|
|
if ((rc = readFile(fileData->fFilePtr, fileData->fFileName, fBufCompressed, chunkSize, __LINE__)))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to read chunk from new " << fileData->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
continue;
|
|
}
|
|
|
|
// uncompress the read in buffer
|
|
size_t dataLen = sizeof(chunkData.fBufUnCompressed);
|
|
|
|
if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize,
|
|
(unsigned char*)chunkData.fBufUnCompressed, dataLen) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to uncompress chunk new " << fileData->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
rc = ERR_COMP_UNCOMPRESS;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Log an error message for the specified error code, and msg level.
|
|
//------------------------------------------------------------------------------
|
|
void ChunkManager::logMessage(int code, int level, int lineNum, int fromLine) const
|
|
{
|
|
ostringstream oss;
|
|
oss << ec.errorString(code) << " @line:" << lineNum;
|
|
|
|
if (fromLine != -1)
|
|
oss << " called from line:" << fromLine;
|
|
|
|
logMessage(oss.str(), level);
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Log the requested error message using the specified msg level.
|
|
//------------------------------------------------------------------------------
|
|
void ChunkManager::logMessage(const string& msg, int level) const
|
|
{
|
|
logging::Message::Args args;
|
|
args.add(msg);
|
|
|
|
fSysLogger->logMessage((logging::LOG_TYPE)level, logging::M0080, args,
|
|
// FIXME: store session id in class to pass on to LogginID...
|
|
logging::LoggingID(SUBSYSTEM_ID_WE, 0, fTransId));
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Replace the cdf file with the updated tmp file.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::swapTmpFile(const string& src, const string& dest)
|
|
{
|
|
// return value
|
|
int rc = NO_ERROR;
|
|
|
|
// if no change to the cdf, the tmp may not exist, no need to swap.
|
|
if (!fFs.exists(src.c_str()))
|
|
return rc;
|
|
|
|
ssize_t srcFileSize = IDBPolicy::size(src.c_str());
|
|
|
|
if (srcFileSize <= 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "swapTmpFile aborted. Source file size = " << srcFileSize;
|
|
logMessage(oss.str(), logging::LOG_TYPE_CRITICAL);
|
|
rc = ERR_COMP_RENAME_FILE;
|
|
|
|
return rc;
|
|
}
|
|
|
|
errno = 0;
|
|
// save the original file
|
|
string orig(dest + ".orig");
|
|
fFs.remove(orig.c_str()); // remove left overs
|
|
|
|
if (fFs.rename(dest.c_str(), orig.c_str()) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "rename " << dest << " to " << orig << " failed: " << strerror(errno);
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
rc = ERR_COMP_RENAME_FILE;
|
|
}
|
|
|
|
// rename the new file
|
|
if (rc == NO_ERROR && fFs.rename(src.c_str(), dest.c_str()) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "rename " << src << " to " << dest << " failed: " << strerror(errno);
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
rc = ERR_COMP_RENAME_FILE;
|
|
}
|
|
|
|
if (rc == NO_ERROR && fFs.remove(orig.c_str()) != 0)
|
|
rc = ERR_COMP_REMOVE_FILE;
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Construct a DML log file name based on transaction ID, etc.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::getDMLLogFileName(string& aDMLLogFileName, const TxnID& txnId) const
|
|
{
|
|
config::Config* config = config::Config::makeConfig();
|
|
string prefix = config->getConfig("SystemConfig", "DBRMRoot");
|
|
|
|
if (prefix.length() == 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":Need a valid DBRMRoot entry in Calpont configuation file";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_DML_LOG_NAME;
|
|
}
|
|
|
|
uint64_t pos = prefix.find_last_of("/");
|
|
|
|
if (pos != string::npos)
|
|
{
|
|
aDMLLogFileName = prefix.substr(0, pos + 1); // Get the file path
|
|
}
|
|
else
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":Cannot find the dbrm directory (" << prefix.c_str()
|
|
<< ") for the DML log file";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_DML_LOG_NAME;
|
|
}
|
|
|
|
ostringstream oss;
|
|
oss << txnId << "_" << fLocalModuleId;
|
|
aDMLLogFileName += "DMLLog_" + oss.str();
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// clear the DML log file
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::startTransaction(const TxnID& txnId) const
|
|
{
|
|
// this step is for HDFS update/delete only.
|
|
if (!fIsHdfs || fIsBulkLoad)
|
|
return NO_ERROR;
|
|
|
|
// Construct the DML log file name
|
|
string aDMLLogFileName;
|
|
|
|
if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
|
|
return ERR_DML_LOG_NAME;
|
|
|
|
// truncate the existing file
|
|
boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
|
|
IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "wb", 0));
|
|
|
|
if (!aDMLLogFile)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened.";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_OPEN_DML_LOG;
|
|
}
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Backup cdf file and replace the with the updated tmp file.
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::confirmTransaction(const TxnID& txnId) const
|
|
{
|
|
// return value
|
|
int rc = NO_ERROR;
|
|
|
|
// this step is for HDFS update/delete only.
|
|
if (!fIsHdfs || fIsBulkLoad)
|
|
return rc;
|
|
|
|
string aDMLLogFileName;
|
|
|
|
if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
|
|
return ERR_DML_LOG_NAME;
|
|
|
|
// Open log file
|
|
boost::scoped_ptr<IDBDataFile> aDMLLogFile;
|
|
aDMLLogFile.reset(IDBDataFile::open(IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
|
|
aDMLLogFileName.c_str(), "r", 0));
|
|
|
|
if (!aDMLLogFile)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_OPEN_DML_LOG;
|
|
}
|
|
|
|
ssize_t logSize = fFs.size(aDMLLogFileName.c_str());
|
|
boost::scoped_array<char> buf(new char[logSize]);
|
|
|
|
if (aDMLLogFile->read(buf.get(), logSize) != logSize)
|
|
return ERR_FILE_READ;
|
|
|
|
std::istringstream strstream(string(buf.get(), logSize));
|
|
std::string backUpFileType;
|
|
std::string filename;
|
|
int64_t size;
|
|
int64_t offset;
|
|
ConfirmHdfsDbFile confirmHdfs;
|
|
|
|
while (strstream >> backUpFileType >> filename >> size >> offset)
|
|
{
|
|
std::string confirmErrMsg;
|
|
rc = confirmHdfs.confirmDbFileChange(backUpFileType, filename, confirmErrMsg);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
logMessage(confirmErrMsg, logging::LOG_TYPE_ERROR);
|
|
break;
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Finalize the chages
|
|
// if success, remove the orig
|
|
// otherwise, move the orig back to cdf
|
|
//------------------------------------------------------------------------------
|
|
int ChunkManager::endTransaction(const TxnID& txnId, bool success) const
|
|
{
|
|
// return value
|
|
int rc = NO_ERROR;
|
|
|
|
// this step is for HDFS update/delete only.
|
|
if (!fIsHdfs || fIsBulkLoad)
|
|
return rc;
|
|
|
|
string aDMLLogFileName;
|
|
|
|
if (getDMLLogFileName(aDMLLogFileName, txnId) != NO_ERROR)
|
|
return ERR_DML_LOG_NAME;
|
|
|
|
// Open log file
|
|
boost::scoped_ptr<IDBDataFile> aDMLLogFile;
|
|
aDMLLogFile.reset(IDBDataFile::open(IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG),
|
|
aDMLLogFileName.c_str(), "r", 0));
|
|
|
|
if (!aDMLLogFile)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":File " << aDMLLogFileName << " can't be opened";
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_OPEN_DML_LOG;
|
|
}
|
|
|
|
ssize_t logSize = fFs.size(aDMLLogFileName.c_str());
|
|
ssize_t logRead = 0;
|
|
boost::scoped_array<char> buf(new char[logSize]);
|
|
|
|
if ((logRead = aDMLLogFile->read(buf.get(), logSize)) != logSize)
|
|
{
|
|
ostringstream oss;
|
|
oss << "trans " << txnId << ":File " << aDMLLogFileName << " filed to read: " << logRead << "/"
|
|
<< logSize;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_FILE_READ;
|
|
}
|
|
|
|
std::istringstream strstream(string(buf.get(), logSize));
|
|
std::string backUpFileType;
|
|
std::string filename;
|
|
int64_t size;
|
|
int64_t offset;
|
|
ConfirmHdfsDbFile confirmHdfs;
|
|
|
|
while (strstream >> backUpFileType >> filename >> size >> offset)
|
|
{
|
|
std::string finalizeErrMsg;
|
|
rc = confirmHdfs.endDbFileChange(backUpFileType, filename, success, finalizeErrMsg);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
logMessage(finalizeErrMsg, logging::LOG_TYPE_ERROR);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// final clean up or recover
|
|
if (rc == NO_ERROR)
|
|
rc = fFs.remove(aDMLLogFileName.c_str());
|
|
|
|
return rc;
|
|
}
|
|
|
|
int ChunkManager::checkFixLastDictChunk(const FID& fid, uint16_t root, uint32_t partition, uint16_t segment)
|
|
{
|
|
int rc = 0;
|
|
// Find the file info
|
|
FileID fileID(fid, root, partition, segment);
|
|
map<FileID, CompFileData*>::const_iterator mit = fFileMap.find(fileID);
|
|
|
|
WE_COMP_DBG(cout << "getFileData: fid:" << fid << " root:" << root << " part:" << partition << " seg:"
|
|
<< segment << " file* " << ((mit != fFileMap.end()) ? "" : "not ") << "found." << endl;)
|
|
|
|
// Get CompFileData pointer for existing Dictionary store file mit->second is CompFileData
|
|
if (mit != fFileMap.end())
|
|
{
|
|
int headerSize = compress::CompressInterface::getHdrSize(mit->second->fFileHeader.fControlData);
|
|
int ptrSecSize = headerSize - COMPRESSED_FILE_HEADER_UNIT;
|
|
|
|
// get pointer list
|
|
compress::CompChunkPtrList ptrs;
|
|
|
|
if (compress::CompressInterface::getPtrList(mit->second->fFileHeader.fPtrSection, ptrSecSize, ptrs) != 0)
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to parse pointer list from new " << mit->second->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return ERR_COMP_PARSE_HDRS;
|
|
}
|
|
|
|
// now verify last chunk
|
|
ChunkData* chunkData;
|
|
int numOfChunks = ptrs.size(); // number of chunks in the file
|
|
unsigned int chunkSize = ptrs[numOfChunks - 1].second;
|
|
|
|
if ((rc = setFileOffset(mit->second->fFilePtr, mit->second->fFileName, ptrs[numOfChunks - 1].first,
|
|
__LINE__)))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to setFileOffset new " << mit->second->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
if ((rc = readFile(mit->second->fFilePtr, mit->second->fFileName, fBufCompressed, chunkSize, __LINE__)))
|
|
{
|
|
ostringstream oss;
|
|
oss << "Failed to read chunk from new " << mit->second->fFileName << "@" << __LINE__;
|
|
logMessage(oss.str(), logging::LOG_TYPE_ERROR);
|
|
return rc;
|
|
}
|
|
|
|
// uncompress the read in buffer
|
|
chunkData = new ChunkData(numOfChunks - 1);
|
|
size_t dataLen = sizeof(chunkData->fBufUnCompressed);
|
|
|
|
auto fCompressor = compress::getCompressorByType(fCompressorPool, mit->second->fCompressionType);
|
|
if (!fCompressor)
|
|
{
|
|
return ERR_COMP_WRONG_COMP_TYPE;
|
|
}
|
|
|
|
if (fCompressor->uncompressBlock((char*)fBufCompressed, chunkSize,
|
|
(unsigned char*)chunkData->fBufUnCompressed, dataLen) != 0)
|
|
{
|
|
mit->second->fChunkList.push_back(chunkData);
|
|
fActiveChunks.push_back(make_pair(mit->second->fFileID, chunkData));
|
|
// replace this chunk with empty chunk
|
|
uint64_t blocks = 512;
|
|
|
|
if ((numOfChunks - 1) == 0)
|
|
{
|
|
char* hdr = mit->second->fFileHeader.fControlData;
|
|
|
|
if (compress::CompressInterface::getBlockCount(hdr) < 512)
|
|
blocks = 256;
|
|
}
|
|
|
|
dataLen = 8192 * blocks;
|
|
|
|
// load the uncompressed buffer with empty values.
|
|
char* buf = chunkData->fBufUnCompressed;
|
|
chunkData->fLenUnCompressed = UNCOMPRESSED_CHUNK_SIZE;
|
|
initializeDctnryChunk(buf, UNCOMPRESSED_CHUNK_SIZE);
|
|
chunkData->fLenUnCompressed = dataLen;
|
|
chunkData->fWriteToFile = true;
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
} // namespace WriteEngine
|
|
|