1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-04-18 21:44:02 +03:00

1804 lines
65 KiB
C++

/* Copyright (C) 2014 InfiniDB, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/******************************************************************************
* $Id: we_columninfo.cpp 4737 2013-08-14 20:45:46Z bwilkinson $
*
*******************************************************************************/
#include <cstdlib>
#include <sstream>
#include <unistd.h>
//#define NDEBUG
//#include <cassert>
#include <cctype>
#include "we_columninfo.h"
#include "we_log.h"
#include "we_stats.h"
#include "we_colopbulk.h"
#include "brmtypes.h"
#include "we_columnautoinc.h"
#include "we_dbrootextenttracker.h"
#include "we_brmreporter.h"
#include "we_tableinfo.h"
#include "IDBDataFile.h"
using namespace idbdatafile;
namespace
{
//------------------------------------------------------------------------------
// Do a fast ascii-hex-string to binary data conversion. This is done in-place.
// We take bytes 1 and 2 and put them back into byte 1; 3 and 4 into 2; etc.
// The length is adjusted by 1/2 and returned to the caller as the new length.
// If any invalid hex characters are present in the string (not 0-9,A-F, or
// a-f), then the string is considered invalid, and a null token will be used.
//------------------------------------------------------------------------------
unsigned int compactVarBinary(char* charTmpBuf, int fieldLength)
{
unsigned char* p = reinterpret_cast<unsigned char*>(charTmpBuf);
char* f = charTmpBuf;
char v = '\0';
for (int i = 0; i < fieldLength / 2; i++, p++)
{
// Store even number byte in high order 4 bits of next output byte
v = *f;
if (!isxdigit(v))
return WriteEngine::COLPOSPAIR_NULL_TOKEN_OFFSET;
if (v <= '9')
*p = v - '0';
else if (v <= 'F')
*p = v - 'A' + 10;
else // if (v <= 'f')
*p = v - 'a' + 10;
*p <<= 4;
f++;
// Store odd number byte in low order 4 bite of next output byte
v = *f;
if (!isxdigit(v))
return WriteEngine::COLPOSPAIR_NULL_TOKEN_OFFSET;
if (v <= '9')
*p |= v - '0';
else if (v <= 'F')
*p |= v - 'A' + 10;
else // if (v <= 'f')
*p |= v - 'a' + 10;
f++;
}
// Changed our mind and decided to have the read thread reject rows with
// incomplete (odd length) varbinary fields, so the following check is not
// necessary. We should only get to this function with an even fieldLength.
#if 0
// Handle case where input data field has "odd" byte length.
// Store last input byte in high order 4 bits of additional output byte,
// and leave the low order bits set to 0.
if ((fieldLength & 1) == 1)
{
v = *f;
if (!isxdigit(v))
return WriteEngine::COLPOSPAIR_NULL_TOKEN_OFFSET;
if (v <= '9')
*p = v - '0';
else if (v <= 'F')
*p = v - 'A' + 10;
else //if (v <= 'f')
*p = v - 'a' + 10;
*p <<= 4;
fieldLength++;
}
#endif
return (fieldLength / 2);
}
} // namespace
namespace WriteEngine
{
//------------------------------------------------------------------------------
// ColumnInfo constructor
//------------------------------------------------------------------------------
ColumnInfo::ColumnInfo(Log* logger, int idIn, const JobColumn& columnIn, DBRootExtentTracker* pDBRootExtTrk,
TableInfo* pTableInfo)
: id(idIn)
, lastProcessingTime(0)
,
#ifdef PROFILE
totalProcessingTime(0)
,
#endif
fColBufferMgr(0)
, availFileSize(0)
, fileSize(0)
, fLog(logger)
, fDelayedFileStartBlksSkipped(0)
, fSavedLbid(0)
, fLastUpdatedLbid(0)
, fSizeWrittenStart(0)
, fSizeWritten(0)
, fLastInputRowInCurrentExtent(0)
, fLoadingAbbreviatedExtent(false)
, fColExtInf(0)
, fMaxNumRowsPerSegFile(0)
, fStore(0)
, fAutoIncLastValue(0)
, fSaturatedRowCnt(0)
, fpTableInfo(pTableInfo)
, fAutoIncMgr(0)
, fDbRootExtTrk(pDBRootExtTrk)
, fColWidthFactor(1)
, fDelayedFileCreation(INITIAL_DBFILE_STAT_FILE_EXISTS)
, fRowsPerExtent(0)
{
column = columnIn;
fRowsPerExtent = BRMWrapper::getInstance()->getExtentRows();
// Allocate a ColExtInfBase object for those types that won't track
// min/max CasualPartition info; this is a stub class that won't do
// anything.
switch (column.weType)
{
case WriteEngine::WR_FLOAT:
case WriteEngine::WR_DOUBLE:
case WriteEngine::WR_VARBINARY: // treat like char dictionary for now
case WriteEngine::WR_TOKEN:
{
fColExtInf = new ColExtInfBase();
break;
}
case WriteEngine::WR_CHAR:
{
if (column.colType == COL_TYPE_DICT)
{
fColExtInf = new ColExtInfBase();
}
else
{
fColExtInf = new ColExtInf(column.mapOid, logger);
}
break;
}
case WriteEngine::WR_SHORT:
case WriteEngine::WR_BYTE:
case WriteEngine::WR_LONGLONG:
case WriteEngine::WR_MEDINT:
case WriteEngine::WR_INT:
case WriteEngine::WR_USHORT:
case WriteEngine::WR_UBYTE:
case WriteEngine::WR_ULONGLONG:
case WriteEngine::WR_UMEDINT:
case WriteEngine::WR_UINT:
case WriteEngine::WR_BINARY:
default:
{
fColExtInf = new ColExtInf(column.mapOid, logger);
break;
}
}
colOp.reset(new ColumnOpBulk(logger, column.compressionType));
fMaxNumRowsPerSegFile = fRowsPerExtent * Config::getExtentsPerSegmentFile();
// Create auto-increment object to manage auto-increment next-value
if (column.autoIncFlag)
{
fAutoIncMgr = new ColumnAutoIncIncremental(logger);
// formerly used ColumnAutoIncJob for Shared Everything
// fAutoIncMgr = new ColumnAutoIncJob(logger);
}
}
//------------------------------------------------------------------------------
// ColumnInfo destructor
//------------------------------------------------------------------------------
ColumnInfo::~ColumnInfo()
{
clearMemory();
// Closing dictionary file also updates the extent map; which we
// don't want to do if we are aborting the job. Besides, the
// application code should be closing the dictionary as needed,
// instead of relying on the destructor, so disabled this code.
// if(fStore != NULL)
//{
// fStore->closeDctnryStore();
// delete fStore;
//}
if (fColExtInf)
delete fColExtInf;
if (fAutoIncMgr)
delete fAutoIncMgr;
if (fDbRootExtTrk)
delete fDbRootExtTrk;
}
//------------------------------------------------------------------------------
// Clear memory consumed by this ColumnInfo object.
//------------------------------------------------------------------------------
void ColumnInfo::clearMemory()
{
if (fColBufferMgr)
{
delete fColBufferMgr;
fColBufferMgr = 0;
}
fDictBlocks.clear();
}
//------------------------------------------------------------------------------
// If at the start of the job, We have encountered a PM that has no DB file for
// this column, or whose HWM extent is disabled; then this function is called
// to setup delayed file creation.
// A starting DB file will be created if/when we determine that we have rows
// to be processed.
//------------------------------------------------------------------------------
void ColumnInfo::setupDelayedFileCreation(uint16_t dbRoot, uint32_t partition, uint16_t segment, HWM hwm,
bool bEmptyPM)
{
if (bEmptyPM)
fDelayedFileCreation = INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY;
else
fDelayedFileCreation = INITIAL_DBFILE_STAT_CREATE_FILE_ON_DISABLED;
fDelayedFileStartBlksSkipped = hwm;
fSavedLbid = INVALID_LBID;
colOp->initColumn(curCol);
colOp->setColParam(curCol, id, column.width, column.dataType, column.weType, column.mapOid,
column.compressionType, dbRoot, partition, segment);
colOp->findTypeHandler(column.width, column.dataType);
}
//------------------------------------------------------------------------------
// Create a DB file as part of delayed file creation. See setupDelayedFile-
// Creation for an explanation.
//------------------------------------------------------------------------------
int ColumnInfo::createDelayedFileIfNeeded(const std::string& tableName)
{
int rc = NO_ERROR;
// For optimization sake, we use a separate mutex (fDelayedFileCreateMutex)
// exclusively reserved to be used as the gatekeeper to this function.
// No sense in waiting for a fColMutex lock, when 99.99% of the time,
// all we need to do is check fDelayedFileCreation, see that it's value
// is INITIAL_DBFILE_STAT_FILE_EXISTS, and exit the function.
boost::mutex::scoped_lock lock(fDelayedFileCreateMutex);
if (fDelayedFileCreation == INITIAL_DBFILE_STAT_FILE_EXISTS)
return NO_ERROR;
// Don't try creating extent again if we are already in error state with a
// previous thread failing to create this extent.
if (fDelayedFileCreation == INITIAL_DBFILE_STAT_ERROR_STATE)
{
rc = ERR_FILE_CREATE;
std::ostringstream oss;
oss << "Previous attempt failed to create initial dbroot" << curCol.dataFile.fDbRoot
<< " extent for column file OID-" << column.mapOid << "; dbroot-" << curCol.dataFile.fDbRoot
<< "; partition-" << curCol.dataFile.fPartition;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
// Once we get this far, we go ahead and acquire a fColMutex lock. The
// fDelayedFileCreateMutex lock might suffice, but better to explicitly
// lock fColMutex since we are modifying attributes that we typically
// change within the scope of a fColMutex lock.
boost::mutex::scoped_lock lock2(fColMutex);
uint16_t dbRoot = curCol.dataFile.fDbRoot;
uint32_t partition = curCol.dataFile.fPartition;
// We don't have a file on this PM, so we create an initial file
ColumnOpBulk tempColOp(fLog, column.compressionType);
bool createLeaveFileOpen = false;
IDBDataFile* createPFile = 0;
uint16_t createDbRoot = dbRoot;
uint32_t createPartition = partition;
uint16_t createSegment = 0;
std::string createSegFile;
HWM createHwm = 0; // output
BRM::LBID_t createStartLbid = 0; // output
bool createNewFile = true; // output
int createAllocSize = 0; // output
char* createHdrs = 0; // output
std::string allocErrMsg;
rc = fpTableInfo->allocateBRMColumnExtent(curCol.dataFile.fid, createDbRoot, createPartition, createSegment,
createStartLbid, createAllocSize, createHwm, allocErrMsg);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error creating initial dbroot" << dbRoot << " BRM extent for OID-" << column.mapOid << "; dbroot-"
<< dbRoot << "; partition-" << partition << "; " << ec.errorString(rc) << "; " << allocErrMsg;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE;
return rc;
}
uint16_t segment = createSegment;
partition = createPartition; // update our partition variable in
// case extent was added to a different
// partition than we intended
BRM::LBID_t lbid = createStartLbid;
rc = tempColOp.extendColumn(curCol, createLeaveFileOpen, createHwm, createStartLbid, createAllocSize,
createDbRoot, createPartition, createSegment, createSegFile, createPFile,
createNewFile, createHdrs);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error adding initial dbroot" << dbRoot << " extent to column file OID-" << column.mapOid
<< "; dbroot-" << dbRoot << "; partition-" << partition << "; segment-" << segment << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE;
return rc;
}
// We don't have a file on this PM (or HWM extent is disabled), so we
// create a new file to load
std::ostringstream oss1;
if (fDelayedFileCreation == INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY)
oss1 << "PM empty; Creating starting column extent";
else
oss1 << "HWM extent disabled; Creating starting column extent";
oss1 << " on DBRoot-" << createDbRoot << " for OID-" << column.mapOid << "; part-" << createPartition
<< "; seg-" << createSegment << "; hwm-" << createHwm << "; LBID-" << createStartLbid << "; file-"
<< createSegFile;
fLog->logMsg(oss1.str(), MSGLVL_INFO2);
// Create corresponding dictionary store file if applicable
if (column.colType == COL_TYPE_DICT)
{
std::ostringstream oss;
oss << "Creating starting dictionary extent on dbroot" << dbRoot << " (segment " << segment
<< ") for dictionary OID " << column.dctnry.dctnryOid;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
BRM::LBID_t dLbid;
Dctnry* tempD = 0;
if (column.dctnry.fCompressionType != 0)
{
DctnryCompress1* tempD1;
tempD1 = new DctnryCompress1(column.dctnry.fCompressionType);
tempD1->setMaxActiveChunkNum(1);
tempD1->setBulkFlag(true);
tempD = tempD1;
}
else
{
tempD = new DctnryCompress0;
}
boost::scoped_ptr<Dctnry> refDctnry(tempD);
// MCOL-4328 Define a file owner uid and gid
refDctnry->setUIDGID(this);
rc = tempD->createDctnry(column.dctnry.dctnryOid, column.dctnryWidth, dbRoot, partition, segment, dLbid,
true); // creating the store file
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error creating initial dbroot" << dbRoot << " extent for dictionary file OID-"
<< column.dctnry.dctnryOid << "; dbroot-" << dbRoot << "; partition-" << partition << "; segment-"
<< segment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE;
return rc;
}
rc = tempD->closeDctnry();
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error creating/closing initial dbroot" << dbRoot << " extent for dictionary file OID-"
<< column.dctnry.dctnryOid << "; partition-" << partition << "; segment-" << segment << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE;
return rc;
}
} // end of dictionary column processing
// Check for special case: where we skip initial blk(s) at the start of
// the "very" 1st file on each PM.
// We are checking to see if the PM is empty, "and" if the partition is 0.
// The PM could be empty if all the existing files on the PM were dropped
// or disabled, but we don't want/need to do block skipping in this case;
// so we also check to see if the partition number is 0, denoting the 1st
// extent for the PM.
// (The reason we are skipping blocks in partition 0, is because import
// does this with the partition 0, segment 0 file created by DDL.
// We skip blocks on the other PMs, so that the 1st file created on each
// PM will employ the same block skipping.)
HWM hwm = 0;
if ((fDelayedFileCreation == INITIAL_DBFILE_STAT_CREATE_FILE_ON_EMPTY) && (partition == 0))
{
hwm = fDelayedFileStartBlksSkipped;
}
rc = setupInitialColumnExtent(dbRoot, partition, segment, tableName, lbid, hwm, hwm, false, true);
if (rc == NO_ERROR)
fDelayedFileCreation = INITIAL_DBFILE_STAT_FILE_EXISTS;
else
fDelayedFileCreation = INITIAL_DBFILE_STAT_ERROR_STATE;
return rc;
}
//------------------------------------------------------------------------------
// Add an extent for this column. The next segment file in the DBRoot,
// partition, segment number rotation will be selected for the extent.
//
// NOTE: no mutex lock is employed here. It is assumed that the calling
// application code is taking care of this, if it is needed.
//------------------------------------------------------------------------------
int ColumnInfo::extendColumn(bool saveLBIDForCP)
{
//..We assume the applicable file is already open, so...
// the HWM of the current segment file should be set to reference the
// last block in the current file (as specified in curCol.dataFile.pFile).
//
// Prior to adding compression, we used ftell() to set HWM, but that
// would not work for compressed data. Code now assumes that if we
// are adding an extent, that fSizeWritten is a multiple of blksize,
// which it should be. If we are adding an extent, fSizeWritten should
// point to the last byte of a full extent boundary.
HWM hwm = (fSizeWritten / BYTE_PER_BLOCK) - 1;
//..Save info about the current segment column file, and close that file.
addToSegFileList(curCol.dataFile, hwm);
// Close current segment column file prior to adding extent to next seg file
int rc = closeColumnFile(true, false);
if (rc != NO_ERROR)
{
std::ostringstream oss;
oss << "extendColumn: error closing extent in "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-" << hwm;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
// Call Config::initConfigCache() to force the Config class
// to reload config cache "if" the config file has changed.
Config::initConfigCache();
bool bChangeFlag = Config::hasLocalDBRootListChanged();
// if (fLog->isDebug( DEBUG_1 ))
//{
// std::ostringstream oss;
// oss << "Checking DBRootListChangeFlag: " << bChangeFlag;
// fLog->logMsg( oss.str(), MSGLVL_INFO2 );
//}
if (bChangeFlag)
{
rc = ERR_BULK_DBROOT_CHANGE;
WErrorCodes ec;
std::ostringstream oss;
oss << "extendColumn: DBRoots changed; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
//..Declare variables used to advance to the next extent
uint16_t dbRootNext = 0;
uint32_t partitionNext = 0;
uint16_t segmentNext = 0;
HWM hwmNext = 0;
BRM::LBID_t startLbid;
//..When we finish an extent, we typically should be advancing to the next
// DBRoot to create a "new" extent. But "if" the user has moved a DBRoot
// from another PM to this PM, then we may have a partial extent that we
// need to fill up. Here's where we just fill out such partially filled
// extents with empty values, until we can get back to a "normal" full
// extent boundary case.
bool bAllocNewExtent = false;
while (!bAllocNewExtent)
{
//..If we have a DBRoot Tracker, then use that to determine next DBRoot
// to rotate to, else the old legacy BRM extent allocator will assign,
// if we pass in a dbroot of 0.
bAllocNewExtent = true;
if (fDbRootExtTrk)
{
bAllocNewExtent =
fDbRootExtTrk->nextSegFile(dbRootNext, partitionNext, segmentNext, hwmNext, startLbid);
}
// If our next extent is a partial extent, then fill out that extent
// to the next full extent boundary, and round up HWM accordingly.
if (!bAllocNewExtent)
{
rc = extendColumnOldExtent(dbRootNext, partitionNext, segmentNext, hwmNext);
if (rc != NO_ERROR)
return rc;
}
}
// Once we are back on a "normal" full extent boundary, we add a new extent
// to resume adding rows.
rc = extendColumnNewExtent(saveLBIDForCP, dbRootNext, partitionNext);
return rc;
}
//------------------------------------------------------------------------------
// Add a new extent to this column, at the specified DBRoot. Partition may be
// used if DBRoot is empty.
//------------------------------------------------------------------------------
int ColumnInfo::extendColumnNewExtent(bool saveLBIDForCP, uint16_t dbRootNew, uint32_t partitionNew)
{
//..Declare variables used to advance to the next extent
IDBDataFile* pFileNew = 0;
HWM hwmNew = 0;
bool newFile = false;
std::string segFileNew;
uint16_t segmentNew = 0;
BRM::LBID_t startLbid;
char hdr[compress::CompressInterface::HDR_BUF_LEN * 2];
// Extend the column by adding an extent to the next
// DBRoot, partition, and segment file in the rotation
int allocsize = 0;
std::string allocErrMsg;
int rc = fpTableInfo->allocateBRMColumnExtent(curCol.dataFile.fid, dbRootNew, partitionNew, segmentNew,
startLbid, allocsize, hwmNew, allocErrMsg);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "extendColumnNewExtent: error creating BRM extent after "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment;
oss << "; newDBRoot-" << dbRootNew << "; newpart-" << partitionNew << "; " << ec.errorString(rc) << "; "
<< allocErrMsg;
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return rc;
}
rc = colOp->extendColumn(curCol,
true, // leave file open
hwmNew, startLbid, allocsize, dbRootNew, partitionNew, segmentNew, segFileNew,
pFileNew, newFile, hdr);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "extendColumnNewExtent: error adding file extent after "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment;
oss << "; newDBRoot-" << dbRootNew << "; newpart-" << partitionNew << "; newseg-" << segmentNew
<< "; fbo-" << hwmNew << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
if (pFileNew)
colOp->closeFile(pFileNew); // clean up loose ends
return rc;
}
std::ostringstream oss;
oss << "Add column extent OID-" << curCol.dataFile.fid << "; DBRoot-" << dbRootNew << "; part-"
<< partitionNew << "; seg-" << segmentNew << "; hwm-" << hwmNew << "; LBID-" << startLbid << "; file-"
<< segFileNew;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
// Update lbid.
fLastUpdatedLbid = startLbid;
// Save the LBID with our CP extent info, so that we can update extent map
if (saveLBIDForCP)
{
int rcLBID = fColExtInf->updateEntryLbid(startLbid);
// If error occurs, we log WARNING, but we don't fail the job.
if (rcLBID != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "updateEntryLbid failed for OID-" << curCol.dataFile.fid << "; LBID-" << startLbid
<< "; CasualPartition info may become invalid; " << ec.errorString(rcLBID);
fLog->logMsg(oss.str(), rcLBID, MSGLVL_WARNING);
}
}
//..Reset data members to reflect where we are in the newly
// opened column segment file. The file may be a new file, or we may
// be adding an extent to an existing column segment file.
curCol.dataFile.hwm = hwmNew;
curCol.dataFile.pFile = pFileNew;
curCol.dataFile.fPartition = partitionNew;
curCol.dataFile.fSegment = segmentNew;
curCol.dataFile.fDbRoot = dbRootNew;
curCol.dataFile.fSegFileName = segFileNew;
rc = resetFileOffsetsNewExtent(hdr);
if (rc != NO_ERROR)
{
std::ostringstream oss;
oss << "extendColumnNewExtent: error moving to new extent in "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-"
<< curCol.dataFile.hwm;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
if (pFileNew)
closeColumnFile(false, true); // clean up loose ends
return rc;
}
if (fLog->isDebug(DEBUG_1))
{
std::ostringstream oss2;
oss2 << "Extent added to column OID-" << curCol.dataFile.fid << "; DBRoot-" << dbRootNew << "; part-"
<< partitionNew << "; seg-" << segmentNew << "; begByte-" << fSizeWritten << "; endByte-" << fileSize
<< "; freeBytes-" << availFileSize;
fLog->logMsg(oss2.str(), MSGLVL_INFO2);
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Fill out existing partial extent to extent boundary, so that we can resume
// inserting rows on an extent boundary basis. This use case should only take
// place when a DBRoot with a partial extent has been moved from one PM to
// another.
//------------------------------------------------------------------------------
int ColumnInfo::extendColumnOldExtent(uint16_t dbRootNext, uint32_t partitionNext, uint16_t segmentNext,
HWM hwmNext)
{
const unsigned int BLKS_PER_EXTENT = (fRowsPerExtent * column.width) / BYTE_PER_BLOCK;
HWM hwmNextExtentBoundary = hwmNext;
// Round up HWM to the end of the current extent
unsigned int nBlks = hwmNext + 1;
unsigned int nRem = nBlks % BLKS_PER_EXTENT;
if (nRem > 0)
hwmNextExtentBoundary = nBlks - nRem + BLKS_PER_EXTENT - 1;
else
hwmNextExtentBoundary = nBlks - 1;
std::ostringstream oss;
oss << "Padding partial extent to extent boundary in OID-" << curCol.dataFile.fid << "; DBRoot-"
<< dbRootNext << "; part-" << partitionNext << "; seg-" << segmentNext << "; oldhwm-" << hwmNext
<< "; newhwm-" << hwmNextExtentBoundary;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
long long fileSizeBytes;
int rc = colOp->getFileSize(curCol.dataFile.fid, dbRootNext, partitionNext, segmentNext, fileSizeBytes);
if (rc != NO_ERROR)
{
std::ostringstream oss;
oss << "extendColumnOldExtent: error padding partial extent for "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-"
<< curCol.dataFile.hwm;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
curCol.dataFile.pFile = 0;
curCol.dataFile.fDbRoot = dbRootNext;
curCol.dataFile.fPartition = partitionNext;
curCol.dataFile.fSegment = segmentNext;
curCol.dataFile.hwm = hwmNextExtentBoundary;
curCol.dataFile.fSegFileName.clear();
// See if we have an abbreviated extent that needs to be expanded on disk
if (fileSizeBytes == (long long)INITIAL_EXTENT_ROWS_TO_DISK * column.width)
{
std::string segFile;
// @bug 5572 - HDFS usage: incorporate *.tmp file backup flag
IDBDataFile* pFile = colOp->openFile(curCol, dbRootNext, partitionNext, segmentNext, segFile, true);
if (!pFile)
{
std::ostringstream oss;
rc = ERR_FILE_OPEN;
oss << "extendColumnOldExtent: error padding partial extent for "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-"
<< curCol.dataFile.hwm;
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
rc = colOp->expandAbbrevColumnExtent(pFile, dbRootNext, column.emptyVal, column.width, column.dataType);
if (rc != NO_ERROR)
{
std::ostringstream oss;
oss << "extendColumnOldExtent: error padding partial extent for "
<< "column OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; hwm-"
<< curCol.dataFile.hwm;
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
colOp->closeFile(pFile);
return rc;
}
colOp->closeFile(pFile);
}
addToSegFileList(curCol.dataFile, hwmNextExtentBoundary);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Either add or update the File object, so that it has the updated HWM.
// We will access this info to update the HWM in the ExtentMap at the end
// of the import.
// dmc-could optimize later by changing fSegFileUpdateList from a vector
// to a map or hashtable with a key consisting of partition and segment.
//------------------------------------------------------------------------------
void ColumnInfo::addToSegFileList(File& dataFile, HWM hwm)
{
bool foundFlag = false;
for (unsigned int i = 0; i < fSegFileUpdateList.size(); i++)
{
if ((fSegFileUpdateList[i].fPartition == dataFile.fPartition) &&
(fSegFileUpdateList[i].fSegment == dataFile.fSegment))
{
if (fLog->isDebug(DEBUG_1))
{
std::ostringstream oss3;
oss3 << "Updating HWM list"
"; column OID-"
<< dataFile.fid << "; DBRoot-" << dataFile.fDbRoot << "; part-" << dataFile.fPartition
<< "; seg-" << dataFile.fSegment << "; oldhwm-" << fSegFileUpdateList[i].hwm << "; newhwm-"
<< hwm;
fLog->logMsg(oss3.str(), MSGLVL_INFO2);
}
fSegFileUpdateList[i].hwm = hwm;
foundFlag = true;
break;
}
}
if (!foundFlag)
{
if (fLog->isDebug(DEBUG_1))
{
std::ostringstream oss3;
oss3 << "Adding to HWM list"
<< "; column OID-" << dataFile.fid << "; DBRoot-" << dataFile.fDbRoot << "; part-"
<< dataFile.fPartition << "; seg-" << dataFile.fSegment << "; hwm-" << hwm;
fLog->logMsg(oss3.str(), MSGLVL_INFO2);
}
dataFile.hwm = hwm;
fSegFileUpdateList.push_back(dataFile);
}
}
//------------------------------------------------------------------------------
// Reset file offset data member attributes when we start working on the next
// extent.
//------------------------------------------------------------------------------
int ColumnInfo::resetFileOffsetsNewExtent(const char* /*hdr*/)
{
setFileSize(curCol.dataFile.hwm, false);
long long byteOffset = (long long)curCol.dataFile.hwm * (long long)BYTE_PER_BLOCK;
fSizeWritten = byteOffset;
fSizeWrittenStart = fSizeWritten;
availFileSize = fileSize - fSizeWritten;
// If we are adding an extent as part of preliminary block skipping, then
// we won't have a ColumnBufferManager object yet, but that's okay, because
// we are only adding the empty extent at this point.
if (fColBufferMgr)
{
RETURN_ON_ERROR(fColBufferMgr->setDbFile(curCol.dataFile.pFile, curCol.dataFile.hwm, 0));
RETURN_ON_ERROR(colOp->setFileOffset(curCol.dataFile.pFile, byteOffset));
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Set current size of file in raw (uncompressed) bytes, given the specified
// hwm. abbrevFlag indicates whether this is a fixed size abbreviated extent.
// For unabbreviated extents the "logical" file size is calculated by rounding
// the hwm up to the nearest multiple of the extent size.
//------------------------------------------------------------------------------
void ColumnInfo::setFileSize(HWM hwm, int abbrevFlag)
{
// Must be an abbreviated extent if there is only 1 compressed chunk in
// the db file. Even a 1-byte column would have 2 4MB chunks for an 8M
// row column extent.
if (abbrevFlag)
{
fileSize = (INITIAL_EXTENT_ROWS_TO_DISK * curCol.colWidth);
}
else
{
const unsigned int ROWS_PER_EXTENT = fRowsPerExtent;
long long nRows = ((long long)(hwm + 1) * (long long)BYTE_PER_BLOCK) / (long long)curCol.colWidth;
long long nRem = nRows % ROWS_PER_EXTENT;
if (nRem == 0)
{
fileSize = nRows * curCol.colWidth;
}
else
{
fileSize = (nRows - nRem + ROWS_PER_EXTENT) * curCol.colWidth;
}
}
}
//------------------------------------------------------------------------------
// If we are dealing with the first extent in the first segment file for this
// column, and the segment file is still equal to 256K rows, then we set the
// fLoadingAbbreviatedExtent flag. This tells us (later on) that we are dealing
// with an abbreviated extent that still needs to be expanded and filled, before
// we start adding new extents.
//------------------------------------------------------------------------------
void ColumnInfo::setAbbrevExtentCheck()
{
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0))
{
if (fileSize == (INITIAL_EXTENT_ROWS_TO_DISK * curCol.colWidth))
{
fLoadingAbbreviatedExtent = true;
if (fLog->isDebug(DEBUG_1))
{
std::ostringstream oss;
oss << "Importing into abbreviated extent, column OID-" << curCol.dataFile.fid << "; DBRoot-"
<< curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-"
<< curCol.dataFile.fSegment << "; fileSize-" << fileSize;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
}
}
}
//------------------------------------------------------------------------------
// If this is an abbreviated extent, we expand the extent to a full extent on
// disk, by initializing the necessary number of remaining blocks.
// bRetainFilePos flag controls whether the current file position is retained
// upon return from this function; else the file will be positioned at the end
// of the file.
//------------------------------------------------------------------------------
int ColumnInfo::expandAbbrevExtent(bool bRetainFilePos)
{
if (fLoadingAbbreviatedExtent)
{
off64_t oldOffset = 0;
if (bRetainFilePos)
{
oldOffset = curCol.dataFile.pFile->tell();
}
colOp->setFileOffset(curCol.dataFile.pFile, 0, SEEK_END);
std::ostringstream oss;
oss << "Expanding first extent to column OID-" << curCol.dataFile.fid << "; DBRoot-"
<< curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-"
<< curCol.dataFile.fSegment << "; file-" << curCol.dataFile.fSegFileName;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
int rc = colOp->expandAbbrevExtent(curCol);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "expandAbbrevExtent: error expanding extent for "
<< "OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return rc;
}
// Update available file size to reflect disk space added by expanding
// the extent.
long long fileSizeBeforeExpand = fileSize;
setFileSize((fileSizeBeforeExpand / BYTE_PER_BLOCK), false);
availFileSize += (fileSize - fileSizeBeforeExpand);
// Restore offset back to where we were before expanding the extent
if (bRetainFilePos)
{
rc = colOp->setFileOffset(curCol.dataFile.pFile, oldOffset, SEEK_SET);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "expandAbbrevExtent: error seeking to new extent for "
<< "OID-" << curCol.dataFile.fid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return rc;
}
}
// We only use abbreviated extents for the very first extent. So after
// expanding a col's abbreviated extent, we should disable this check.
fLoadingAbbreviatedExtent = false;
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Close the current Column file.
//------------------------------------------------------------------------------
int ColumnInfo::closeColumnFile(bool /*bCompletingExtent*/, bool /*bAbort*/)
{
if (curCol.dataFile.pFile)
{
colOp->closeFile(curCol.dataFile.pFile);
curCol.dataFile.pFile = 0;
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Initialize fLastInputRowInCurrentExtent used in detecting when a Read Buffer
// is crossing an extent boundary, so that we can accurately track the min/max
// for each extent as the Read buffers are parsed.
//------------------------------------------------------------------------------
void ColumnInfo::lastInputRowInExtentInit(bool bIsNewExtent)
{
// Reworked initial block skipping for compression:
const unsigned int ROWS_PER_EXTENT = fRowsPerExtent;
RID numRowsLeftInExtent = 0;
RID numRowsWritten = fSizeWritten / curCol.colWidth;
if ((numRowsWritten % ROWS_PER_EXTENT) != 0)
numRowsLeftInExtent = ROWS_PER_EXTENT - (numRowsWritten % ROWS_PER_EXTENT);
bool bRoomToAddToOriginalExtent = true;
if (fSizeWritten > 0)
{
// Handle edge case; if numRowsLeftInExtent comes out to be 0, then
// current extent is full. In this case we first bump up row count
// by a full extent before we subtract by 1 to get the last row number
// in extent.
if (numRowsLeftInExtent == 0)
{
numRowsLeftInExtent = ROWS_PER_EXTENT;
;
bRoomToAddToOriginalExtent = false;
}
}
else
{
// Starting new file with empty extent, so set row count to full extent
numRowsLeftInExtent = ROWS_PER_EXTENT;
}
fLastInputRowInCurrentExtent = numRowsLeftInExtent - 1;
// If we have a pre-existing extent that we are going to add rows to,
// then we need to add that extent to our ColExtInf object, so that we
// can update the CP min/max at the end of the bulk load job.
if (bRoomToAddToOriginalExtent)
{
fColExtInf->addFirstEntry(fLastInputRowInCurrentExtent, fSavedLbid, bIsNewExtent);
}
}
//------------------------------------------------------------------------------
// Increment fLastRIDInExtent to the end of the next extent.
//------------------------------------------------------------------------------
void ColumnInfo::lastInputRowInExtentInc()
{
fLastInputRowInCurrentExtent += fRowsPerExtent;
}
//------------------------------------------------------------------------------
// Parsing is complete for this column. Flush pending data. Close the current
// segment file, and corresponding dictionary store file (if applicable). Also
// clears memory taken up by this ColumnInfo object.
//------------------------------------------------------------------------------
int ColumnInfo::finishParsing()
{
int rc = NO_ERROR;
// Close the dctnry file handle.
if (fStore)
{
rc = closeDctnryStore(false);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "finishParsing: close dictionary file error with column " << column.colName << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
// We don't need the mutex to protect against concurrent access by other
// threads, since by the time we get to this point, this is the last
// thread working on this column. But, we use the mutex to insure that
// we see the latest state that may have been set by another parsing thread
// working with the same column.
boost::mutex::scoped_lock lock(fColMutex);
// Force the flushing of remaining data in the output buffer
if (fColBufferMgr)
{
rc = fColBufferMgr->flush();
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "finishParsing: flush error with column " << column.colName << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
}
// Close the column file
rc = closeColumnFile(false, false);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "finishParsing: close column file error with column " << column.colName << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
clearMemory();
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Store updated column information in BRMReporter for this column at EOJ;
// so that Extent Map CP information and HWM's can be updated.
// Bug2117-Src code from this function was factored over from we_tableinfo.cpp.
//
// We use mutex because this function is called by "one" of the parsing threads
// when parsing is complete for all the columns from this column's table.
// We use the mutex to insure that this parsing thread, which ends up being
// responsible for updating BRM for this column, is getting the most up to
// date values in fSegFileUpdateList, fSizeWritten, etc which may have been
// set by another parsing thread.
//------------------------------------------------------------------------------
void ColumnInfo::getBRMUpdateInfo(BRMReporter& brmReporter)
{
boost::mutex::scoped_lock lock(fColMutex);
// Useful for debugging
// printCPInfo(column);
int entriesAdded = getHWMInfoForBRM(brmReporter);
// If we added any rows (HWM update count > 0), then update corresponding CP
if (entriesAdded > 0)
getCPInfoForBRM(brmReporter);
}
//------------------------------------------------------------------------------
// Get updated Casual Partition (CP) information for BRM for this column at EOJ.
//------------------------------------------------------------------------------
void ColumnInfo::getCPInfoForBRM(BRMReporter& brmReporter)
{
fColExtInf->getCPInfoForBRM(column, brmReporter);
}
//------------------------------------------------------------------------------
// Get updated HWM information for BRM for this column at EOJ.
// Returns count of the number of HWM entries added to the BRMReporter.
//------------------------------------------------------------------------------
int ColumnInfo::getHWMInfoForBRM(BRMReporter& brmReporter)
{
//..If we wrote out any data to the last segment file, then
// update HWM for the current (last) segment file we were writing to.
// Bug1374 - Update HWM when data added to file
if (fSizeWritten > fSizeWrittenStart)
{
// Bug1372.
HWM hwm = (fSizeWritten - 1) / BYTE_PER_BLOCK;
addToSegFileList(curCol.dataFile, hwm);
}
int entriesAdded = 0;
//..Update HWM for each segment file we touched, including the last one
for (unsigned int iseg = 0; iseg < fSegFileUpdateList.size(); iseg++)
{
// Log for now; may control with debug flag later
// if (fLog->isDebug( DEBUG_1 ))
{
std::ostringstream oss;
oss << "Saving HWM update for OID-" << fSegFileUpdateList[iseg].fid << "; hwm-"
<< fSegFileUpdateList[iseg].hwm << "; DBRoot-" << fSegFileUpdateList[iseg].fDbRoot << "; partition-"
<< fSegFileUpdateList[iseg].fPartition << "; segment-" << fSegFileUpdateList[iseg].fSegment;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
BRM::BulkSetHWMArg hwmArg;
hwmArg.oid = fSegFileUpdateList[iseg].fid;
hwmArg.partNum = fSegFileUpdateList[iseg].fPartition;
hwmArg.segNum = fSegFileUpdateList[iseg].fSegment;
hwmArg.hwm = fSegFileUpdateList[iseg].hwm;
brmReporter.addToHWMInfo(hwmArg);
// Save list of modified db column files
BRM::FileInfo aFile;
aFile.oid = fSegFileUpdateList[iseg].fid;
aFile.partitionNum = fSegFileUpdateList[iseg].fPartition;
aFile.segmentNum = fSegFileUpdateList[iseg].fSegment;
aFile.dbRoot = fSegFileUpdateList[iseg].fDbRoot;
aFile.compType = curCol.compressionType;
brmReporter.addToFileInfo(aFile);
// Save list of corresponding modified db dictionary store files
if (column.colType == COL_TYPE_DICT)
{
BRM::FileInfo dFile;
dFile.oid = column.dctnry.dctnryOid;
dFile.partitionNum = fSegFileUpdateList[iseg].fPartition;
dFile.segmentNum = fSegFileUpdateList[iseg].fSegment;
dFile.dbRoot = fSegFileUpdateList[iseg].fDbRoot;
dFile.compType = curCol.compressionType;
brmReporter.addToDctnryFileInfo(dFile);
}
entriesAdded++;
}
fSegFileUpdateList.clear(); // don't need vector anymore, so release memory
return entriesAdded;
}
// Returns last updated LBID.
BRM::LBID_t ColumnInfo::getLastUpdatedLBID() const
{
return fLastUpdatedLbid;
}
//------------------------------------------------------------------------------
// Setup initial extent we will begin loading at start of import.
// DBRoot, partition, segment, etc for the starting extent are specified.
// If block skipping is causing us to advance to the next extent, then we
// set things up to point to the last block in the current extent. When we
// start adding rows, we will automatically advance to the next extent.
//------------------------------------------------------------------------------
int ColumnInfo::setupInitialColumnExtent(uint16_t dbRoot, // dbroot of starting extent
uint32_t partition, // partition number of starting extent
uint16_t segment, // segment number of starting extent
const std::string& tblName, // name of table containing this column
BRM::LBID_t lbid, // starting LBID for starting extent
HWM oldHwm, // original HWM
HWM hwm, // new projected HWM after block skipping
bool bSkippedToNewExtent, // blk skipping to next extent
bool bIsNewExtent) // treat as new extent (for CP updates)
{
// Init the ColumnInfo object
colOp->initColumn(curCol);
colOp->setColParam(curCol, id, column.width, column.dataType, column.weType, column.mapOid,
column.compressionType, dbRoot, partition, segment);
colOp->findTypeHandler(column.width, column.dataType);
// Open the column file
if (!colOp->exists(column.mapOid, dbRoot, partition, segment))
{
std::ostringstream oss;
oss << "Column file does not exist for OID-" << column.mapOid << "; DBRoot-" << dbRoot << "; partition-"
<< partition << "; segment-" << segment;
fLog->logMsg(oss.str(), ERR_FILE_NOT_EXIST, MSGLVL_ERROR);
return ERR_FILE_NOT_EXIST;
}
std::string segFile;
bool useTmpSuffix = false;
if (!bIsNewExtent)
useTmpSuffix = true;
// @bug 5572 - HDFS usage: incorporate *.tmp file backup flag
int rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error opening column file for OID-" << column.mapOid << "; DBRoot-" << dbRoot << "; partition-"
<< partition << "; segment-" << segment << "; filename-" << segFile << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), ERR_FILE_OPEN, MSGLVL_ERROR);
return ERR_FILE_OPEN;
}
std::ostringstream oss1;
oss1 << "Initializing import: "
<< "Table-" << tblName << "; Col-" << column.colName;
if (curCol.compressionType)
oss1 << " (compressed)";
oss1 << "; OID-" << column.mapOid << "; hwm-" << hwm;
if (bSkippedToNewExtent)
oss1 << " (full; load into next extent)";
oss1 << "; file-" << curCol.dataFile.fSegFileName;
fLog->logMsg(oss1.str(), MSGLVL_INFO2);
if (column.colType == COL_TYPE_DICT)
{
RETURN_ON_ERROR(openDctnryStore(true));
}
fSavedLbid = lbid;
fLastUpdatedLbid = lbid;
if (bSkippedToNewExtent)
oldHwm = hwm;
rc = setupInitialColumnFile(oldHwm, hwm);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "Error reading/positioning column file for OID-" << column.mapOid << "; DBRoot-" << dbRoot
<< "; partition-" << partition << "; segment-" << segment << "; filename-" << segFile << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
return rc;
}
// Reworked initial block skipping for compression:
// Block skipping is causing us to wrap up this extent. We consider
// the current extent to be full, so we "pretend" to fill out the
// last block by adding 8192 bytes to the bytes written count.
// This will help trigger the addition of a new extent when we
// try to store the first section of rows to the db.
if (bSkippedToNewExtent)
{
updateBytesWrittenCounts(BYTE_PER_BLOCK);
fSizeWrittenStart = fSizeWritten;
}
// Reworked initial block skipping for compression:
// This initializes CP stats for first extent regardless of whether
// we end up adding rows to this extent, or initial block skipping
// ultimately causes us to start with a new extent.
lastInputRowInExtentInit(bIsNewExtent);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Prepare the initial column segment file for import.
//------------------------------------------------------------------------------
int ColumnInfo::setupInitialColumnFile(HWM oldHwm, HWM hwm)
{
// Initialize the output buffer manager for the column.
if (column.colType == COL_TYPE_DICT)
{
fColBufferMgr = new ColumnBufferManagerDctnry(this, 8, fLog, 0);
}
else
{
fColBufferMgr = new ColumnBufferManager(this, column.width, fLog, 0);
}
RETURN_ON_ERROR(fColBufferMgr->setDbFile(curCol.dataFile.pFile, hwm, 0));
RETURN_ON_ERROR(colOp->getFileSize(curCol.dataFile.pFile, fileSize));
// See if dealing with abbreviated extent that will need expanding.
// This only applies to the first extent of the first segment file.
setAbbrevExtentCheck();
// If we are dealing with initial extent, see if block skipping has
// exceeded disk allocation, in which case we expand to a full extent.
if (isAbbrevExtent())
{
unsigned int numBlksForFirstExtent = (INITIAL_EXTENT_ROWS_TO_DISK * column.width) / BYTE_PER_BLOCK;
if (((oldHwm + 1) <= numBlksForFirstExtent) && ((hwm + 1) > numBlksForFirstExtent))
{
RETURN_ON_ERROR(expandAbbrevExtent(false));
}
}
// Seek till the HWM lbid.
// Store the current allocated file size in availFileSize.
long long byteOffset = (long long)hwm * (long long)BYTE_PER_BLOCK;
RETURN_ON_ERROR(colOp->setFileOffset(curCol.dataFile.pFile, byteOffset));
fSizeWritten = byteOffset;
fSizeWrittenStart = fSizeWritten;
availFileSize = fileSize - fSizeWritten;
if (fLog->isDebug(DEBUG_1))
{
std::ostringstream oss;
oss << "Init raw data offsets in column file OID-" << curCol.dataFile.fid << "; DBRoot-"
<< curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-"
<< curCol.dataFile.fSegment << "; begByte-" << fSizeWritten << "; endByte-" << fileSize
<< "; freeBytes-" << availFileSize;
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Update the number of bytes in the file, and the free space still remaining.
//------------------------------------------------------------------------------
void ColumnInfo::updateBytesWrittenCounts(unsigned int numBytesWritten)
{
availFileSize = availFileSize - numBytesWritten;
fSizeWritten = fSizeWritten + numBytesWritten;
}
//------------------------------------------------------------------------------
// Tell whether the current column segment file being managed by ColumnInfo,
// has filled up all its extents with data.
//------------------------------------------------------------------------------
bool ColumnInfo::isFileComplete() const
{
if ((fSizeWritten / column.width) >= fMaxNumRowsPerSegFile)
return true;
return false;
}
//------------------------------------------------------------------------------
// Initialize last used auto-increment value from the current "next"
// auto-increment value taken from the system catalog (or BRM).
//------------------------------------------------------------------------------
int ColumnInfo::initAutoInc(const std::string& fullTableName)
{
int rc = fAutoIncMgr->init(fullTableName, this);
return rc;
}
//------------------------------------------------------------------------------
// Reserves the requested number of auto-increment numbers (autoIncCount).
// The starting value of the reserved block of numbers is returned in nextValue.
//------------------------------------------------------------------------------
int ColumnInfo::reserveAutoIncNums(uint32_t autoIncCount, uint64_t& nextValue)
{
int rc = fAutoIncMgr->reserveNextRange(autoIncCount, nextValue);
return rc;
}
//------------------------------------------------------------------------------
// Finished using auto-increment. Current value can be committed back to the
// system catalog (or BRM).
//------------------------------------------------------------------------------
int ColumnInfo::finishAutoInc()
{
int rc = fAutoIncMgr->finish();
return rc;
}
//------------------------------------------------------------------------------
// Get current dbroot, partition, segment, and HWM for this column.
//
// We use mutex because this function is called by "one" of the parsing threads
// when parsing is complete for all the columns from this column's table.
// We use the mutex to insure that this parsing thread, which ends up being
// responsible for wrapping up this column, is getting the most up to
// date values for dbroot, partition, segment, and HWM which may have been
// set by another parsing thread.
//------------------------------------------------------------------------------
void ColumnInfo::getSegFileInfo(DBRootExtentInfo& fileInfo)
{
boost::mutex::scoped_lock lock(fColMutex);
fileInfo.fDbRoot = curCol.dataFile.fDbRoot;
fileInfo.fPartition = curCol.dataFile.fPartition;
fileInfo.fSegment = curCol.dataFile.fSegment;
if (fSizeWritten > 0)
fileInfo.fLocalHwm = (fSizeWritten - 1) / BYTE_PER_BLOCK;
else
fileInfo.fLocalHwm = 0;
}
//------------------------------------------------------------------------------
// Open a new or existing Dictionary store file based on the DBRoot,
// partition, and segment settings in curCol.dataFile.
//------------------------------------------------------------------------------
int ColumnInfo::openDctnryStore(bool bMustExist)
{
int rc = NO_ERROR;
if (column.dctnry.fCompressionType != 0)
{
DctnryCompress1* dctnryCompress1 = new DctnryCompress1(column.dctnry.fCompressionType);
dctnryCompress1->setMaxActiveChunkNum(1);
dctnryCompress1->setBulkFlag(true);
fStore = dctnryCompress1;
}
else
{
fStore = new DctnryCompress0;
}
fStore->setLogger(fLog);
fStore->setColWidth(column.dctnryWidth);
fStore->setUIDGID(this);
if (column.fWithDefault)
fStore->setDefault(column.fDefaultChr);
fStore->setImportDataMode(fpTableInfo->getImportDataMode());
// If we are in the process of adding an extent to this column,
// and the extent we are adding is the first extent for the
// relevant column segment file, then the corresponding dictionary
// store file will not exist, in which case we must create
// the store file, else we open the applicable store file.
if ((bMustExist) || (colOp->exists(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot,
curCol.dataFile.fPartition, curCol.dataFile.fSegment)))
{
// Save HWM chunk (for compressed files) if this seg file calls for it
// @bug 5572 - HDFS usage: incorporate *.tmp file backup flag
bool useTmpSuffixDctnry = false;
RETURN_ON_ERROR(saveDctnryStoreHWMChunk(useTmpSuffixDctnry));
// @bug 5572 - HDFS usage: incorporate *.tmp file backup flag
rc = fStore->openDctnry(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition,
curCol.dataFile.fSegment, useTmpSuffixDctnry);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "openDctnryStore: error opening existing store file for "
<< "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; tmpFlag-"
<< useTmpSuffixDctnry << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
// Ignore return code from closing file; already in error state
closeDctnryStore(true); // clean up loose ends
return rc;
}
if (INVALID_LBID != fStore->getCurLbid())
fDictBlocks.push_back(fStore->getCurLbid());
std::ostringstream oss;
oss << "Opening existing store file for " << column.colName << "; OID-" << column.dctnry.dctnryOid
<< "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-"
<< curCol.dataFile.fSegment << "; hwm-" << fStore->getHWM() << "; file-" << fStore->getFileName();
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
else
{
BRM::LBID_t startLbid;
rc = fStore->createDctnry(column.dctnry.dctnryOid,
column.dctnryWidth, //@bug 3313 - pass string col width
curCol.dataFile.fDbRoot, curCol.dataFile.fPartition, curCol.dataFile.fSegment,
startLbid);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "openDctnryStore: error creating new store file for "
<< "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
// Ignore return code from closing file; already in error state
closeDctnryStore(true); // clean up loose ends
return rc;
}
rc = fStore->openDctnry(column.dctnry.dctnryOid, curCol.dataFile.fDbRoot, curCol.dataFile.fPartition,
curCol.dataFile.fSegment, false);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "openDctnryStore: error opening new store file for "
<< "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
// Ignore return code from closing file; already in error state
closeDctnryStore(true); // clean up loose ends
return rc;
}
std::ostringstream oss;
oss << "Opening new store file for " << column.colName << "; OID-" << column.dctnry.dctnryOid
<< "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-" << curCol.dataFile.fPartition << "; seg-"
<< curCol.dataFile.fSegment << "; file-" << fStore->getFileName();
fLog->logMsg(oss.str(), MSGLVL_INFO2);
}
return rc;
}
//------------------------------------------------------------------------------
// Close the current Dictionary store file.
//------------------------------------------------------------------------------
int ColumnInfo::closeDctnryStore(bool bAbort)
{
int rc = NO_ERROR;
if (fStore)
{
if (bAbort)
rc = fStore->closeDctnryOnly();
else
rc = fStore->closeDctnry();
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "closeDctnryStore: error closing store file for "
<< "OID-" << column.dctnry.dctnryOid << "; file-" << fStore->getFileName() << "; "
<< ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_ERROR);
}
delete fStore;
fStore = 0;
}
return rc;
}
//--------------------------------------------------------------------------------------
// Update dictionary store file with string column parquet data, and return the assigned
// tokens (tokenbuf) to be stored in the corresponding column token file.
//--------------------------------------------------------------------------------------
int ColumnInfo::updateDctnryStoreParquet(std::shared_ptr<arrow::Array> columnData, int tokenPos,
const int totalRow, char* tokenBuf)
{
long long truncCount = 0;
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
boost::mutex::scoped_lock lock(fDictionaryMutex);
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
int rc = fStore->insertDctnryParquet(columnData, tokenPos, totalRow, id, tokenBuf, truncCount, column.cs, column.weType);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "updateDctnryStore: error adding rows to store file for "
<< "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return rc;
}
incSaturatedCnt(truncCount);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Update dictionary store file with specified strings, and return the assigned
// tokens (tokenbuf) to be stored in the corresponding column token file.
//------------------------------------------------------------------------------
int ColumnInfo::updateDctnryStore(char* buf, ColPosPair** pos, const int totalRow, char* tokenBuf)
{
long long truncCount = 0; // No. of rows with truncated values
// If this is a VARBINARY column; convert the ascii hex string into binary
// data and fix the length (it's now only half as long).
// Should be safe to modify pos and buf arrays outside a mutex, as no other
// thread should be accessing the strings from the same buffer, for this
// column.
// This only applies to default text mode. This step is bypassed for
// binary imports, because in that case, the data is already true binary.
if (((curCol.colType == WR_VARBINARY) || (curCol.colType == WR_BLOB && fpTableInfo->readFromSTDIN())) &&
(fpTableInfo->getImportDataMode() == IMPORT_DATA_TEXT))
{
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_COMPACT_VARBINARY);
#endif
for (int i = 0; i < totalRow; i++)
{
pos[i][id].offset = compactVarBinary(buf + pos[i][id].start, pos[i][id].offset);
}
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_COMPACT_VARBINARY);
#endif
}
#ifdef PROFILE
Stats::startParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
boost::mutex::scoped_lock lock(fDictionaryMutex);
#ifdef PROFILE
Stats::stopParseEvent(WE_STATS_WAIT_TO_PARSE_DCT);
#endif
int rc = fStore->insertDctnry(buf, pos, totalRow, id, tokenBuf, truncCount, column.cs, column.weType);
if (rc != NO_ERROR)
{
WErrorCodes ec;
std::ostringstream oss;
oss << "updateDctnryStore: error adding rows to store file for "
<< "OID-" << column.dctnry.dctnryOid << "; DBRoot-" << curCol.dataFile.fDbRoot << "; part-"
<< curCol.dataFile.fPartition << "; seg-" << curCol.dataFile.fSegment << "; " << ec.errorString(rc);
fLog->logMsg(oss.str(), rc, MSGLVL_CRITICAL);
fpTableInfo->fBRMReporter.addToErrMsgEntry(oss.str());
return rc;
}
incSaturatedCnt(truncCount);
return NO_ERROR;
}
//------------------------------------------------------------------------------
// No action necessary for uncompressed dictionary files
//------------------------------------------------------------------------------
// @bug 5572 - HDFS usage: add flag used to control *.tmp file usage
int ColumnInfo::saveDctnryStoreHWMChunk(bool& needBackup)
{
needBackup = false;
return NO_ERROR;
}
//------------------------------------------------------------------------------
// Truncate specified dictionary store file for this column.
// Only applies to compressed columns.
//------------------------------------------------------------------------------
int ColumnInfo::truncateDctnryStore(OID /*dctnryOid*/, uint16_t /*root*/, uint32_t /*pNum*/,
uint16_t /*sNum*/) const
{
return NO_ERROR;
}
//------------------------------------------------------------------------------
// utility to convert a Status enumeration to a string
//------------------------------------------------------------------------------
/* static */
void ColumnInfo::convertStatusToString(WriteEngine::Status status, std::string& statusString)
{
static std::string statusStringParseComplete("PARSE_COMPLETE");
static std::string statusStringReadComplete("READ_COMPLETE");
static std::string statusStringReadProgress("READ_PROGRESS");
static std::string statusStringNew("NEW");
static std::string statusStringErr("ERR");
static std::string statusStringUnknown("OTHER");
switch (status)
{
case PARSE_COMPLETE: statusString = statusStringParseComplete; break;
case READ_COMPLETE: statusString = statusStringReadComplete; break;
case READ_PROGRESS: statusString = statusStringReadProgress; break;
case NEW: statusString = statusStringNew; break;
case ERR: statusString = statusStringErr; break;
default: statusString = statusStringUnknown; break;
}
}
} // namespace WriteEngine