/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*
 * $Id: we_bulkrollbackfilecompressed.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
 */

#include "we_bulkrollbackfilecompressed.h"

#include <sstream>
#include <boost/scoped_array.hpp>
#include <boost/filesystem/path.hpp>

#include "we_define.h"
#include "we_fileop.h"
#include "we_bulkrollbackmgr.h"
#include "we_convertor.h"
#include "messageids.h"
#include "IDBDataFile.h"
#include "IDBPolicy.h"
using namespace idbdatafile;
using namespace compress;
using namespace execplan;

namespace
{
const char* DATA_DIR_SUFFIX = "_data";
}

namespace WriteEngine
{
//------------------------------------------------------------------------------
// BulkRollbackFileCompressed constructor
//------------------------------------------------------------------------------
BulkRollbackFileCompressed::BulkRollbackFileCompressed(BulkRollbackMgr* mgr) : BulkRollbackFile(mgr)
{
  compress::initializeCompressorPool(fCompressorPool);
}

//------------------------------------------------------------------------------
// BulkRollbackFileCompressed destructor
//------------------------------------------------------------------------------
BulkRollbackFileCompressed::~BulkRollbackFileCompressed()
{
}

//------------------------------------------------------------------------------
// Truncate the specified database segment file to the extent specified by
// the given file offset.  Also updates the header(s) as well.
//
// columnOID      - OID of segment file to be truncated
// dbRoot         - DBRoot of segment file to be truncated
// partNum        - Partition number of segment file to be truncated
// segNum         - Segment number of segment file to be truncated
// fileSizeBlocks - Number of raw data blocks to be left in the file.
//                  Remainder of file is to be truncated.
//------------------------------------------------------------------------------
void BulkRollbackFileCompressed::truncateSegmentFile(OID columnOID, uint32_t dbRoot, uint32_t partNum,
                                                     uint32_t segNum, long long fileSizeBlocks)
{
  std::ostringstream msgText1;
  msgText1 << "Truncating compressed column file"
              ": dbRoot-"
           << dbRoot << "; part#-" << partNum << "; seg#-" << segNum << "; rawTotBlks-" << fileSizeBlocks;
  fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, columnOID, msgText1.str());

  std::string segFile;
  IDBDataFile* pFile = fDbFile.openFile(columnOID, dbRoot, partNum, segNum, segFile);

  if (pFile == 0)
  {
    std::ostringstream oss;
    oss << "Error opening compressed column segment file to rollback "
           "extents from DB for"
        << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum;

    throw WeException(oss.str(), ERR_FILE_OPEN);
  }

  // Read and parse the header pointers
  char hdrs[CompressInterface::HDR_BUF_LEN * 2];
  ;
  CompChunkPtrList chunkPtrs;
  std::string errMsg;
  int rc = loadColumnHdrPtrs(pFile, hdrs, chunkPtrs, errMsg);

  if (rc != NO_ERROR)
  {
    std::ostringstream oss;
    oss << "Error reading compressed column ptr headers from DB for"
        << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum << "; " << errMsg;

    fDbFile.closeFile(pFile);
    throw WeException(oss.str(), rc);
  }

  // Locate the chunk containing the last block we intend to keep
  unsigned int blockOffset = fileSizeBlocks - 1;
  unsigned int chunkIndex = 0;
  unsigned int blkOffsetInChunk = 0;

  auto fCompressor =
      compress::getCompressorByType(fCompressorPool, compress::CompressInterface::getCompressionType(hdrs));
  if (!fCompressor)
  {
    std::ostringstream oss;
    oss << "Error, wrong compression type for segment file"
        << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum << ";";
    throw WeException(oss.str(), ERR_COMP_WRONG_COMP_TYPE);
  }

  fCompressor->locateBlock(blockOffset, chunkIndex, blkOffsetInChunk);

  // Truncate the extra extents that are to be aborted
  if (chunkIndex < chunkPtrs.size())
  {
    long long fileSizeBytes = chunkPtrs[chunkIndex].first + chunkPtrs[chunkIndex].second;

    std::ostringstream msgText2;
    msgText2 << "Compressed column file"
                ": dbRoot-"
             << dbRoot << "; part#-" << partNum << "; seg#-" << segNum << "; truncated to " << fileSizeBytes
             << " bytes";
    fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, columnOID, msgText2.str());

    // Drop off any trailing pointers (that point beyond the last block)
    compress::CompressInterface::setBlockCount(hdrs, fileSizeBlocks);
    std::vector<uint64_t> ptrs;

    for (unsigned i = 0; i <= chunkIndex; i++)
    {
      ptrs.push_back(chunkPtrs[i].first);
    }

    ptrs.push_back(chunkPtrs[chunkIndex].first + chunkPtrs[chunkIndex].second);
    compress::CompressInterface::storePtrs(ptrs, hdrs);

    rc = fDbFile.writeHeaders(pFile, hdrs);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error writing compressed column headers to DB for"
          << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; " << ec.errorString(rc);

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }

    // Finally, we truncate the data base column segment file
    rc = fDbFile.truncateFile(pFile, fileSizeBytes);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error truncating compressed column extents from DB for"
          << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; " << ec.errorString(rc);

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }
  }  // end of (chunkIndex < chunkPtrs.size())

  fDbFile.closeFile(pFile);
}

//------------------------------------------------------------------------------
// Reinitialize a column segment extent (in the db file) to empty values,
// following the HWM.  Remaining extents in the file are truncated.
// Also updates the header(s) as well.
//
// columnOID      - OID of segment file to be reinitialized
// dbRoot         - DBRoot of segment file to be reinitialized
// partNum        - Partition number of segment file to be reinitialized
// segNum         - Segment number of segment file to be reinitialized
// startOffsetBlk - File offset (after the HWM block), at which the file is
//                  to be reinitialized.  Value is in raw data blocks.
// nBlocks        - Number of blocks to be reinitialized
// colType        - Data type of the applicable column
// colWidth       - Width in bytes, of the applicable column
// restoreHwmChk  - Specifies whether HWM chunk is to be restored.
//------------------------------------------------------------------------------
void BulkRollbackFileCompressed::reInitTruncColumnExtent(OID columnOID, uint32_t dbRoot, uint32_t partNum,
                                                         uint32_t segNum, long long startOffsetBlk,
                                                         int nBlocks,
                                                         CalpontSystemCatalog::ColDataType colType,
                                                         uint32_t colWidth, bool restoreHwmChk)
{
  long long startOffset = startOffsetBlk * BYTE_PER_BLOCK;

  std::ostringstream msgText1;
  msgText1 << "Reinit HWM compressed column extent in db file"
           << ": dbRoot-" << dbRoot << "; part#-" << partNum << "; seg#-" << segNum << "; rawOffset(bytes)-"
           << startOffset << "; rawFreeBlks-" << nBlocks;
  fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, columnOID, msgText1.str());

  std::string segFile;
  IDBDataFile* pFile = fDbFile.openFile(columnOID, dbRoot, partNum, segNum, segFile);

  if (pFile == 0)
  {
    std::ostringstream oss;
    oss << "Error opening compressed column segment file to rollback "
           "extents from DB for"
        << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum;

    throw WeException(oss.str(), ERR_FILE_OPEN);
  }

  // Read and parse the header pointers
  char hdrs[CompressInterface::HDR_BUF_LEN * 2];
  CompChunkPtrList chunkPtrs;
  std::string errMsg;
  int rc = loadColumnHdrPtrs(pFile, hdrs, chunkPtrs, errMsg);

  if (rc != NO_ERROR)
  {
    std::ostringstream oss;
    oss << "Error reading compressed column ptr headers from DB for"
        << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum << "; " << errMsg;

    fDbFile.closeFile(pFile);
    throw WeException(oss.str(), rc);
  }

  // Locate the chunk containing the last block we intend to keep
  unsigned int blockOffset = startOffsetBlk - 1;
  unsigned int chunkIndex = 0;
  unsigned int blkOffsetInChunk = 0;

  auto fCompressor =
      compress::getCompressorByType(fCompressorPool, compress::CompressInterface::getCompressionType(hdrs));
  if (!fCompressor)
  {
    std::ostringstream oss;
    oss << "Error, wrong compression type for segment file"
        << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum << ";";
    throw WeException(oss.str(), ERR_COMP_WRONG_COMP_TYPE);
  }

  fCompressor->locateBlock(blockOffset, chunkIndex, blkOffsetInChunk);

  if (chunkIndex < chunkPtrs.size())
  {
    // Read backup copy of HWM chunk and restore it's contents
    uint64_t restoredChunkLen = 0;
    uint64_t restoredFileSize = 0;

    if (restoreHwmChk)
    {
      rc = restoreHWMChunk(pFile, columnOID, partNum, segNum, chunkPtrs[chunkIndex].first, restoredChunkLen,
                           restoredFileSize, errMsg);

      if (rc != NO_ERROR)
      {
        std::ostringstream oss;
        oss << "Error restoring HWM chunk for"
            << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
            << segNum << "; blkoff-" << blockOffset << "; " << errMsg;

        fDbFile.closeFile(pFile);
        throw WeException(oss.str(), rc);
      }
    }
    else
    {
      restoredChunkLen = chunkPtrs[chunkIndex].second;

      // leave truncated to last chunk if no extra blocks needed
      if (nBlocks == 0)
        restoredFileSize = chunkPtrs[chunkIndex].first + chunkPtrs[chunkIndex].second;
      else
        restoredFileSize = (chunkPtrs[chunkIndex].first + chunkPtrs[chunkIndex].second) +
                           (uint64_t)(nBlocks * BYTE_PER_BLOCK);
    }

    // nBlocks is based on full extents, but if database file only has an
    // abbreviated extent, then we reset nBlocks to reflect the size of a
    // file with a single abbreviated extent.
    // (Only the 1st extent in part0, seg0 employs an abbreviated extent.)
    bool bAbbreviatedExtent = false;

    // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
    if ((partNum == 0) && (segNum == 0))
    {
      long long nBytesInAbbrevExtent = INITIAL_EXTENT_ROWS_TO_DISK * colWidth;

      if (startOffset <= nBytesInAbbrevExtent)
      {
        nBlocks = (nBytesInAbbrevExtent - startOffset) / BYTE_PER_BLOCK;
        bAbbreviatedExtent = true;
      }
    }

    long long fileSizeBytes = restoredFileSize;

    std::ostringstream msgText2;
    msgText2 << "HWM compressed column file"
                ": dbRoot-"
             << dbRoot << "; part#-" << partNum << "; seg#-" << segNum;

    if (bAbbreviatedExtent)  // log adjusted nBlock count for abbrev extent
      msgText2 << "; rawFreeBlks-" << nBlocks << " (abbrev)";

    msgText2 << "; restoredChunk-" << restoredChunkLen << " bytes";

    if (!restoreHwmChk)
      msgText2 << " (no change)";

    msgText2 << "; truncated to " << fileSizeBytes << " bytes";
    fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, columnOID, msgText2.str());

    // Initialize the remainder of the extent after the HWM chunk.
    // Just doing an ftruncate() reinits the file to 0's, which may or may
    // not actually reserve disk space if ftruncate is growing the file.
    // So reinit the blocks by calling reInitPartialColumnExtent() to help
    // avoid disk fragmentation.  Be careful not to init > 1 extent, be-
    // cause that is the limit on what that function was intended to do.
    const unsigned BLKS_PER_EXTENT = (BRMWrapper::getInstance()->getExtentRows() * colWidth) / BYTE_PER_BLOCK;
    long long nBlocksToInit =
        (fileSizeBytes - (chunkPtrs[chunkIndex].first + restoredChunkLen)) / BYTE_PER_BLOCK;

    if (nBlocksToInit > BLKS_PER_EXTENT)
      nBlocksToInit = BLKS_PER_EXTENT;  // don't init > 1 full extent

    if (nBlocksToInit > 0)
    {
      const uint8_t* emptyVal = fDbFile.getEmptyRowValue(colType, colWidth);
      rc = fDbFile.reInitPartialColumnExtent(pFile, (chunkPtrs[chunkIndex].first + restoredChunkLen),
                                             nBlocksToInit, emptyVal, colWidth);

      if (rc != NO_ERROR)
      {
        WErrorCodes ec;
        std::ostringstream oss;
        oss << "Error clearing HWM column extent from DB for"
               ": OID-"
            << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-" << segNum
            << "; " << ec.errorString(rc);

        fDbFile.closeFile(pFile);
        throw WeException(oss.str(), rc);
      }
    }

    // Drop off any trailing pointers (that point beyond the last block).
    // Watch for the special case where we are restoring a db file as an
    // empty file (chunkindex=0 and restoredChunkLen=0); in this case we
    // just restore the first pointer (set to 8192).
    compress::CompressInterface::setBlockCount(hdrs, (startOffsetBlk + nBlocks));
    std::vector<uint64_t> newPtrs;

    if ((chunkIndex > 0) || (restoredChunkLen > 0))
    {
      for (unsigned int i = 0; i <= chunkIndex; i++)
      {
        newPtrs.push_back(chunkPtrs[i].first);
      }
    }

    newPtrs.push_back(chunkPtrs[chunkIndex].first + restoredChunkLen);
    compress::CompressInterface::storePtrs(newPtrs, hdrs);

    rc = fDbFile.writeHeaders(pFile, hdrs);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error writing compressed column headers to DB for"
          << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; " << ec.errorString(rc);

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }

    // Finally, we truncate the data base column segment file
    rc = fDbFile.truncateFile(pFile, fileSizeBytes);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error truncating compressed column extents from DB for"
          << ": OID-" << columnOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; " << ec.errorString(rc);

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }
  }  // end of (chunkIndex < chunkPtrs.size())

  fDbFile.closeFile(pFile);
}

//------------------------------------------------------------------------------
// Load header pointer data for compressed column file.
//
// pFile     - FILE ptr to be used in reading header data.
// hdrs      - (out) Raw header data.
// chunkPtrs - (out) Chunk ptrs extracted from raw header data.
// errMsg    - (out) Error message if applicable.
//------------------------------------------------------------------------------
int BulkRollbackFileCompressed::loadColumnHdrPtrs(IDBDataFile* pFile, char* hdrs, CompChunkPtrList& chunkPtrs,
                                                  std::string& errMsg) const
{
  // Read the header pointers
  int rc = fDbFile.readHeaders(pFile, hdrs);

  if (rc != NO_ERROR)
  {
    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Header read error: " << ec.errorString(rc);
    errMsg = oss.str();

    return rc;
  }

  // Parse the header pointers
  int rc1 = compress::CompressInterface::getPtrList(hdrs, chunkPtrs);

  if (rc1 != 0)
  {
    rc = ERR_METADATABKUP_COMP_PARSE_HDRS;

    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Header parsing error (" << rc1 << "): " << ec.errorString(rc);
    errMsg = oss.str();

    return rc;
  }

  return NO_ERROR;
}

//------------------------------------------------------------------------------
// Reinitialize a dictionary segment extent (in the db file) to empty blocks,
// following the HWM.  Remaining extents in the file are truncated.
// Also updates the header(s) as well.
//
// dStoreOID      - OID of segment store file to be reinitialized
// dbRoot         - DBRoot of segment file to be reinitialized
// partNum        - Partition number of segment file to be reinitialized
// segNum         - Segment number of segment file to be reinitialized
// startOffsetBlk - Starting block (after the HWM block), at which the file is
//                  to be reinitialized.  Value is in raw data blocks.
// nBlocks        - Number of blocks to be reinitialized
//------------------------------------------------------------------------------
void BulkRollbackFileCompressed::reInitTruncDctnryExtent(OID dStoreOID, uint32_t dbRoot, uint32_t partNum,
                                                         uint32_t segNum, long long startOffsetBlk,
                                                         int nBlocks)
{
  long long startOffset = startOffsetBlk * BYTE_PER_BLOCK;

  std::ostringstream msgText1;
  msgText1 << "Reinit HWM compressed dictionary store extent in db file"
              ": dbRoot-"
           << dbRoot << "; part#-" << partNum << "; seg#-" << segNum << "; rawOffset(bytes)-" << startOffset
           << "; rawFreeBlks-" << nBlocks;
  fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, dStoreOID, msgText1.str());

  std::string segFile;
  IDBDataFile* pFile = fDbFile.openFile(dStoreOID, dbRoot, partNum, segNum, segFile);

  if (pFile == 0)
  {
    std::ostringstream oss;
    oss << "Error opening compressed dictionary store segment file to "
           "rollback extents from DB for"
        << ": OID-" << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum;

    throw WeException(oss.str(), ERR_FILE_OPEN);
  }

  char controlHdr[CompressInterface::HDR_BUF_LEN];
  CompChunkPtrList chunkPtrs;
  uint64_t ptrHdrSize;
  std::string errMsg;
  int rc = loadDctnryHdrPtrs(pFile, controlHdr, chunkPtrs, ptrHdrSize, errMsg);

  if (rc != NO_ERROR)
  {
    std::ostringstream oss;
    oss << "Error reading compressed dctnry ptr headers from DB for"
        << ": OID-" << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum << "; " << errMsg;

    fDbFile.closeFile(pFile);
    throw WeException(oss.str(), rc);
  }

  // Locate the chunk containing the last block we intend to keep
  unsigned int blockOffset = startOffsetBlk - 1;
  unsigned int chunkIndex = 0;
  unsigned int blkOffsetInChunk = 0;

  auto fCompressor = compress::getCompressorByType(
      fCompressorPool, compress::CompressInterface::getCompressionType(controlHdr));
  if (!fCompressor)
  {
    std::ostringstream oss;
    oss << "Error, wrong compression type for segment file"
        << ": OID-" << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
        << segNum << ";";
    throw WeException(oss.str(), ERR_COMP_WRONG_COMP_TYPE);
  }

  fCompressor->locateBlock(blockOffset, chunkIndex, blkOffsetInChunk);

  if (chunkIndex < chunkPtrs.size())
  {
    // Read backup copy of HWM chunk and restore it's contents
    uint64_t restoredChunkLen = 0;
    uint64_t restoredFileSize = 0;
    rc = restoreHWMChunk(pFile, dStoreOID, partNum, segNum, chunkPtrs[chunkIndex].first, restoredChunkLen,
                         restoredFileSize, errMsg);

    if (rc == ERR_FILE_NOT_EXIST)
    {
      std::ostringstream msgText3;
      msgText3 << "No restore needed to Compressed dictionary file"
               << ": dbRoot-" << dbRoot << "; part#-" << partNum << "; seg#-" << segNum;
      fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, dStoreOID, msgText3.str());

      fDbFile.closeFile(pFile);
      return;
    }

    if (rc != NO_ERROR)
    {
      std::ostringstream oss;
      oss << "Error restoring HWM chunk for"
          << ": OID-" << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; blkoff-" << blockOffset << "; " << errMsg;

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }

    // nBlocks is based on full extents, but if database file only has an
    // abbreviated extent, then we reset nBlocks to reflect the file size.
    // (Unlike column files which only employ an abbreviated extent for the
    // 1st extent in part0, seg0, all store files start with abbrev extent)
    bool bAbbreviatedExtent = false;
    const uint32_t PSEUDO_COL_WIDTH = 8;  // simulated col width for dctnry
    long long nBytesInAbbrevExtent = INITIAL_EXTENT_ROWS_TO_DISK * PSEUDO_COL_WIDTH;

    if (startOffset <= nBytesInAbbrevExtent)
    {
      nBlocks = (nBytesInAbbrevExtent - startOffset) / BYTE_PER_BLOCK;
      bAbbreviatedExtent = true;
    }

    long long fileSizeBytes = restoredFileSize;

    std::ostringstream msgText2;
    msgText2 << "HWM compressed dictionary file"
                ": dbRoot-"
             << dbRoot << "; part#-" << partNum << "; seg#-" << segNum;

    if (bAbbreviatedExtent)  // log adjusted nBlock count for abbrev extent
      msgText2 << "; rawFreeBlks-" << nBlocks << " (abbrev)";

    msgText2 << "; restoredChunk-" << restoredChunkLen << " bytes"
             << "; truncated to " << fileSizeBytes << " bytes";
    fMgr->logAMessage(logging::LOG_TYPE_INFO, logging::M0075, dStoreOID, msgText2.str());

    // Initialize the remainder of the extent after the HWM chunk
    // Just doing an ftruncate() reinits the file to 0's, which may or may
    // not actually reserve disk space if ftruncate is growing the file.
    // So reinit the blocks by calling reInitPartialDctnryExtent() to help
    // avoid disk fragmentation.  Be careful not to init > 1 extent, be-
    // cause that is the limit on what that function was intended to do.
    const unsigned BLKS_PER_EXTENT =
        (BRMWrapper::getInstance()->getExtentRows() * PSEUDO_COL_WIDTH) / BYTE_PER_BLOCK;
    long long nBlocksToInit =
        (fileSizeBytes - (chunkPtrs[chunkIndex].first + restoredChunkLen)) / BYTE_PER_BLOCK;

    if (nBlocksToInit > BLKS_PER_EXTENT)
      nBlocksToInit = BLKS_PER_EXTENT;  // don't init > 1 full extent

    if (nBlocksToInit > 0)
    {
      rc = fDbFile.reInitPartialDctnryExtent(pFile, (chunkPtrs[chunkIndex].first + restoredChunkLen),
                                             nBlocksToInit, fDctnryHdr, DCTNRY_HEADER_SIZE);

      if (rc != NO_ERROR)
      {
        WErrorCodes ec;
        std::ostringstream oss;
        oss << "Error clearing HWM dictionary store extent from DB for"
               ": OID-"
            << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-" << segNum
            << "; " << ec.errorString(rc);

        fDbFile.closeFile(pFile);
        throw WeException(oss.str(), rc);
      }
    }

    // Drop off any trailing pointers (that point beyond the last block).
    // Watch for the special case where we are restoring a db file as an
    // empty file (chunkindex=0 and restoredChunkLen=0); in this case we
    // just restore the first pointer (set to 8192).
    compress::CompressInterface::setBlockCount(controlHdr, (startOffsetBlk + nBlocks));
    std::vector<uint64_t> newPtrs;

    if ((chunkIndex > 0) || (restoredChunkLen > 0))
    {
      for (unsigned int i = 0; i <= chunkIndex; i++)
      {
        newPtrs.push_back(chunkPtrs[i].first);
      }
    }

    newPtrs.push_back(chunkPtrs[chunkIndex].first + restoredChunkLen);
    char* pointerHdr = new char[ptrHdrSize];
    compress::CompressInterface::storePtrs(newPtrs, pointerHdr, ptrHdrSize);

    rc = fDbFile.writeHeaders(pFile, controlHdr, pointerHdr, ptrHdrSize);
    delete[] pointerHdr;

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error writing compressed dictionary headers to DB for"
          << ": OID-" << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; " << ec.errorString(rc);

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }

    // Finally, we truncate the data base dictionary store segment file
    rc = fDbFile.truncateFile(pFile, fileSizeBytes);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error truncating compressed dictionary store extents "
             "from DB file for"
          << ": OID-" << dStoreOID << "; DbRoot-" << dbRoot << "; partition-" << partNum << "; segment-"
          << segNum << "; " << ec.errorString(rc);

      fDbFile.closeFile(pFile);
      throw WeException(oss.str(), rc);
    }
  }  // end of (chunkIndex < chunkPtrs.size())

  fDbFile.closeFile(pFile);
}

//------------------------------------------------------------------------------
// Load header pointer data for compressed dictionary file.
//
// pFile     - FILE ptr to be used in reading header data.
// controlHdr- (out) Raw data from control header.
// chunkPtrs - (out) Chunk ptrs extracted from raw header data.
// ptrHdrSize- (out) Size of pointer header.
// errMsg    - (out) Error message if applicable.
//------------------------------------------------------------------------------
int BulkRollbackFileCompressed::loadDctnryHdrPtrs(IDBDataFile* pFile, char* controlHdr,
                                                  CompChunkPtrList& chunkPtrs, uint64_t& ptrHdrSize,
                                                  std::string& errMsg) const
{
  int rc = fDbFile.readFile(pFile, (unsigned char*)controlHdr, CompressInterface::HDR_BUF_LEN);

  if (rc != NO_ERROR)
  {
    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Control header read error: " << ec.errorString(rc);
    errMsg = oss.str();

    return rc;
  }

  int rc1 = compress::CompressInterface::verifyHdr(controlHdr);

  if (rc1 != 0)
  {
    rc = ERR_METADATABKUP_COMP_VERIFY_HDRS;

    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Control header verify error (" << rc1 << "): " << ec.errorString(rc);
    errMsg = oss.str();

    return rc;
  }

  uint64_t hdrSize = compress::CompressInterface::getHdrSize(controlHdr);
  ptrHdrSize = hdrSize - CompressInterface::HDR_BUF_LEN;
  char* pointerHdr = new char[ptrHdrSize];

  rc = fDbFile.readFile(pFile, (unsigned char*)pointerHdr, ptrHdrSize);

  if (rc != NO_ERROR)
  {
    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Pointer header read error: " << ec.errorString(rc);
    errMsg = oss.str();
    delete[] pointerHdr;

    return rc;
  }

  // Parse the header pointers
  rc1 = compress::CompressInterface::getPtrList(pointerHdr, ptrHdrSize, chunkPtrs);
  delete[] pointerHdr;

  if (rc1 != 0)
  {
    rc = ERR_METADATABKUP_COMP_PARSE_HDRS;

    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Pointer header parsing error (" << rc1 << "): " << ec.errorString(rc);
    errMsg = oss.str();

    return rc;
  }

  return NO_ERROR;
}

//------------------------------------------------------------------------------
// Restore the HWM chunk back to the contents saved in the backup file.
//
// pFile                          - FILE* to segment file being reinitialized
// columnOID                      - OID of segment file to be reinitialized
// partNum                        - Partition num of seg file to reinitialize
// segNum                         - Segment num of seg file to reinitialize
// fileOffsetByteForRestoredChunk - Offset to pFile where restored chunk is to
//                                  be written
// restoredChunkLen (out)         - Length of restored chunk (in bytes)
// restoredFileSize (out)         - Size of file (in bytes) when backup was made
// errMsg (out)                   - Error msg if error returned
//------------------------------------------------------------------------------
int BulkRollbackFileCompressed::restoreHWMChunk(IDBDataFile* pFile, OID columnOID, uint32_t partNum,
                                                uint32_t segNum, uint64_t fileOffsetByteForRestoredChunk,
                                                uint64_t& restoredChunkLen, uint64_t& restoredFileSize,
                                                std::string& errMsg)
{
  restoredChunkLen = 0;
  restoredFileSize = 0;

  // Open the backup HWM chunk file
  std::ostringstream oss;
  oss << "/" << columnOID << ".p" << partNum << ".s" << segNum;
  std::string bulkRollbackSubPath(fMgr->getMetaFileName());
  bulkRollbackSubPath += DATA_DIR_SUFFIX;
  bulkRollbackSubPath += oss.str();

  if (!IDBPolicy::exists(bulkRollbackSubPath.c_str()))
  {
    std::ostringstream oss;
    oss << "Backup file does not exist: " << bulkRollbackSubPath;
    errMsg = oss.str();

    return ERR_FILE_NOT_EXIST;
  }

  IDBDataFile* backupFile =
      IDBDataFile::open(IDBPolicy::getType(bulkRollbackSubPath.c_str(), IDBPolicy::WRITEENG),
                        bulkRollbackSubPath.c_str(), "rb", 0, pFile->colWidth());

  if (!backupFile)
  {
    int errrc = errno;

    std::string eMsg;
    Convertor::mapErrnoToString(errrc, eMsg);
    std::ostringstream oss;
    oss << "Error opening backup file " << bulkRollbackSubPath << "; " << eMsg;
    errMsg = oss.str();

    return ERR_METADATABKUP_COMP_OPEN_BULK_BKUP;
  }

  // Read the chunk length and file size
  uint64_t sizeHdr[2];
  size_t bytesRead = readFillBuffer(backupFile, (char*)sizeHdr, sizeof(uint64_t) * 2);

  if (bytesRead != sizeof(uint64_t) * 2)
  {
    int errrc = errno;

    std::string eMsg;
    Convertor::mapErrnoToString(errrc, eMsg);
    std::ostringstream oss;
    oss << "Error reading chunk length from backup file " << bulkRollbackSubPath << "; " << eMsg;
    errMsg = oss.str();

    delete backupFile;
    return ERR_METADATABKUP_COMP_READ_BULK_BKUP;
  }

  restoredChunkLen = sizeHdr[0];
  restoredFileSize = sizeHdr[1];

  // Position the destination offset in the DB file
  int rc = fDbFile.setFileOffset(pFile, fileOffsetByteForRestoredChunk, SEEK_SET);

  if (rc != NO_ERROR)
  {
    WErrorCodes ec;
    std::ostringstream oss;
    oss << "Error setting column file offset"
        << "; offset-" << fileOffsetByteForRestoredChunk << "; " << ec.errorString(rc);
    errMsg = oss.str();

    delete backupFile;
    return rc;
  }

  // Copy backup version of chunk back to DB, unless chunk length is 0
  // in which case we have nothing to copy.
  if (restoredChunkLen > 0)
  {
    // Read the HWM chunk to be restored
    unsigned char* chunk = new unsigned char[restoredChunkLen];
    boost::scoped_array<unsigned char> scopedChunk(chunk);
    bytesRead = readFillBuffer(backupFile, (char*)chunk, restoredChunkLen);

    if (bytesRead != restoredChunkLen)
    {
      int errrc = errno;

      std::string eMsg;
      Convertor::mapErrnoToString(errrc, eMsg);
      std::ostringstream oss;
      oss << "Error reading chunk data from backup file " << bulkRollbackSubPath << "; size-"
          << restoredChunkLen << ": " << eMsg;
      errMsg = oss.str();

      delete backupFile;
      return ERR_METADATABKUP_COMP_READ_BULK_BKUP;
    }

    // Write/restore the HWM chunk to the applicable database file
    rc = fDbFile.writeFile(pFile, chunk, restoredChunkLen);

    if (rc != NO_ERROR)
    {
      WErrorCodes ec;
      std::ostringstream oss;
      oss << "Error writing to column file"
          << "; offset-" << fileOffsetByteForRestoredChunk << "; bytes-" << restoredChunkLen << "; "
          << ec.errorString(rc);
      errMsg = oss.str();

      delete backupFile;
      return rc;
    }
  }

  delete backupFile;

  return NO_ERROR;
}

//------------------------------------------------------------------------------
// Return true/false depending on whether the applicable backup chunk file can
// be found to restore a backed up compressed chunk back into a db file.  If
// the backup file is not found, we assume that it's because one was not created
// and thus not needed.
//------------------------------------------------------------------------------
bool BulkRollbackFileCompressed::doWeReInitExtent(OID columnOID, uint32_t dbRoot, uint32_t partNum,
                                                  uint32_t segNum) const
{
  std::ostringstream oss;
  oss << "/" << columnOID << ".p" << partNum << ".s" << segNum;
  std::string bulkRollbackSubPath(fMgr->getMetaFileName());
  bulkRollbackSubPath += DATA_DIR_SUFFIX;
  bulkRollbackSubPath += oss.str();

  if (!IDBPolicy::exists(bulkRollbackSubPath.c_str()))
  {
    return false;
  }

  return true;
}

//------------------------------------------------------------------------------
// Read requested number of bytes from the specified pFile into "buffer".
// Added this function as part of hdfs port, because IDBDataFile::read()
// may not return all the requested data in the first call to read().
//------------------------------------------------------------------------------
size_t BulkRollbackFileCompressed::readFillBuffer(IDBDataFile* pFile, char* buffer, size_t bytesReq) const
{
  char* pBuf = buffer;
  ssize_t nBytes;
  size_t bytesToRead = bytesReq;
  size_t totalBytesRead = 0;

  while (1)
  {
    nBytes = pFile->read(pBuf, bytesToRead);

    if (nBytes > 0)
      totalBytesRead += nBytes;
    else
      break;

    if ((size_t)nBytes == bytesToRead)
      break;

    pBuf += nBytes;
    bytesToRead = bytesToRead - (size_t)nBytes;
  }

  return totalBytesRead;
}
}  // namespace WriteEngine