mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-20 09:07:44 +03:00
1971 lines
63 KiB
C++
1971 lines
63 KiB
C++
/* Copyright (C) 2014 InfiniDB, Inc.
|
|
|
|
This program is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU General Public License
|
|
as published by the Free Software Foundation; version 2 of
|
|
the License.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
MA 02110-1301, USA. */
|
|
|
|
// $Id: we_colop.cpp 4740 2013-08-15 22:26:46Z chao $
|
|
|
|
/** @file */
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <vector>
|
|
#include <map>
|
|
#include <boost/scoped_ptr.hpp>
|
|
|
|
using namespace std;
|
|
|
|
#include "we_colop.h"
|
|
#include "we_log.h"
|
|
#include "we_dbfileop.h"
|
|
#include "we_dctnrycompress.h"
|
|
#include "we_colopcompress.h"
|
|
#include "idbcompress.h"
|
|
#include "writeengine.h"
|
|
#include "cacheutils.h"
|
|
#include "we_fileop.h"
|
|
|
|
using namespace execplan;
|
|
|
|
#include "dataconvert.h"
|
|
#include "IDBDataFile.h"
|
|
#include "IDBPolicy.h"
|
|
|
|
using namespace idbdatafile;
|
|
|
|
#include "mcs_decimal.h"
|
|
|
|
namespace WriteEngine
|
|
{
|
|
struct RefcolInfo
|
|
{
|
|
int localHwm;
|
|
unsigned numExtents;
|
|
};
|
|
|
|
/**
|
|
* Constructor
|
|
*/
|
|
ColumnOp::ColumnOp()
|
|
{
|
|
// memset(m_workBlock.data, 0, BYTE_PER_BLOCK);
|
|
}
|
|
ColumnOp::ColumnOp(Log* logger)
|
|
{
|
|
setDebugLevel(logger->getDebugLevel());
|
|
setLogger(logger);
|
|
}
|
|
|
|
/**
|
|
* Default Destructor
|
|
*/
|
|
ColumnOp::~ColumnOp()
|
|
{
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Allocate Row ID
|
|
* PARAMETERS:
|
|
* tableFid - the file id for table bitmap file
|
|
* totalRow - the total number of rows need to be allocated
|
|
* useStartingExtent - Indicates whether rows can be added to an existing
|
|
* starting extent
|
|
* newExtents - where to write extents allocated for newColStructList, etc, structures.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* rowIdArray - allocation of the row id left here
|
|
***********************************************************/
|
|
int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, Column& column, uint64_t totalRow,
|
|
RID* rowIdArray, HWM& hwm, bool& newExtent, uint64_t& rowsLeft, HWM& newHwm,
|
|
bool& newFile, ColStructList& newColStructList,
|
|
DctnryStructList& newDctnryStructList,
|
|
std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers,
|
|
bool insertSelect, bool isBatchInsert, OID tableOid, bool isFirstBatchPm,
|
|
std::vector<BRM::LBID_t>* newExtents)
|
|
{
|
|
// MultiFiles per OID: always append the rows to the end for now.
|
|
// See if the current HWM block might be in an abbreviated extent that
|
|
// needs to be expanded, if we end up adding enough rows.
|
|
bool bCheckAbbrevExtent = false;
|
|
uint64_t numBlksPerInitialExtent = INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK * column.colWidth;
|
|
int counter = 0;
|
|
uint64_t totalRowPerBlock = BYTE_PER_BLOCK / column.colWidth;
|
|
uint64_t extentRows = BRMWrapper::getInstance()->getExtentRows();
|
|
|
|
if (useStartingExtent)
|
|
{
|
|
// DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
|
|
if ((column.dataFile.fPartition == 0) && (column.dataFile.fSegment == 0) &&
|
|
((hwm + 1) <= numBlksPerInitialExtent))
|
|
bCheckAbbrevExtent = abbreviatedExtent(column.dataFile.pFile, column.colWidth);
|
|
|
|
// The current existed rows upto hwm
|
|
uint64_t currentRows = totalRowPerBlock * hwm;
|
|
uint64_t numExtentsFilled = currentRows / extentRows;
|
|
uint64_t rowsAvailable = extentRows - (numExtentsFilled * extentRows);
|
|
rowsLeft = totalRow < rowsAvailable ? 0 : totalRow - rowsAvailable;
|
|
}
|
|
else
|
|
{
|
|
rowsLeft = totalRow;
|
|
}
|
|
|
|
newExtent = false;
|
|
uint32_t j = 0, i = 0, rowsallocated = 0;
|
|
int rc = 0;
|
|
newFile = false;
|
|
Column newCol;
|
|
unsigned char buf[BYTE_PER_BLOCK];
|
|
unsigned char* curVal;
|
|
const uint8_t* emptyVal = getEmptyRowValue(column.colDataType, column.colWidth);
|
|
if (useStartingExtent)
|
|
{
|
|
// ZZ. For insert select, skip the hwm block and start inserting from the next block
|
|
// to avoid self insert issue.
|
|
// For batch insert: if not first batch, use the saved last rid to start adding rows.
|
|
|
|
if (!insertSelect || !isFirstBatchPm)
|
|
{
|
|
//..Search the HWM block for empty rows
|
|
rc = readBlock(column.dataFile.pFile, buf, hwm);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
for (j = 0, curVal = buf; j < totalRowPerBlock; j++, curVal += column.colWidth)
|
|
{
|
|
if (isEmptyRow((uint64_t*)curVal, emptyVal, column.colWidth))
|
|
{
|
|
rowIdArray[counter] = getRowId(hwm, column.colWidth, j);
|
|
rowsallocated++;
|
|
counter++;
|
|
|
|
if (rowsallocated >= totalRow)
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rowsallocated < totalRow)
|
|
{
|
|
if (useStartingExtent)
|
|
{
|
|
//..Search remaining blks in current extent (after HWM) for empty rows
|
|
// Need go to next block
|
|
// need check whether this block is the last block for this extent
|
|
while (((totalRowPerBlock * (hwm + 1)) % extentRows) > 0)
|
|
{
|
|
hwm++;
|
|
|
|
// Expand abbreviated initial extent on disk if needed.
|
|
if (bCheckAbbrevExtent)
|
|
{
|
|
if ((hwm + 1) > numBlksPerInitialExtent)
|
|
{
|
|
RETURN_ON_ERROR(expandAbbrevExtent(column));
|
|
bCheckAbbrevExtent = false;
|
|
}
|
|
}
|
|
|
|
rc = readBlock(column.dataFile.pFile, buf, hwm);
|
|
|
|
// MCOL-498 add a block.
|
|
// DRRTUY TODO Given there is no more hwm pre-allocated we
|
|
// could extend the file once to accomodate all records.
|
|
if (rc != NO_ERROR)
|
|
{
|
|
if (rc == ERR_FILE_EOF)
|
|
{
|
|
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, column.colWidth);
|
|
RETURN_ON_ERROR(saveBlock(column.dataFile.pFile, buf, hwm));
|
|
}
|
|
else
|
|
{
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
for (j = 0, curVal = buf; j < totalRowPerBlock; j++, curVal += column.colWidth)
|
|
{
|
|
if (isEmptyRow((uint64_t*)curVal, emptyVal, column.colWidth))
|
|
{
|
|
rowIdArray[counter] = getRowId(hwm, column.colWidth, j);
|
|
rowsallocated++;
|
|
counter++;
|
|
|
|
if (rowsallocated >= totalRow)
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (rowsallocated >= totalRow)
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ((rowsallocated == 0) && isFirstBatchPm)
|
|
{
|
|
TableMetaData::removeTableMetaData(tableOid);
|
|
}
|
|
|
|
// Check if a new extent is needed
|
|
if (rowsallocated < totalRow)
|
|
{
|
|
// Create another extent
|
|
uint16_t dbRoot;
|
|
uint32_t partition = 0;
|
|
uint16_t segment;
|
|
IDBDataFile* pFile = NULL;
|
|
std::string segFile;
|
|
rowsLeft = 0;
|
|
int allocSize = 0;
|
|
newExtent = true;
|
|
|
|
if ((column.dataFile.fid < 3000) || (!isBatchInsert)) // systables or single insert
|
|
{
|
|
dbRoot = column.dataFile.fDbRoot;
|
|
}
|
|
else
|
|
{
|
|
// Find out where the rest rows go
|
|
BRM::LBID_t startLbid;
|
|
// need to put in a loop until newExtent is true
|
|
newExtent =
|
|
dbRootExtentTrackers[column.colNo]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
|
|
TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
|
|
|
|
while (!newExtent)
|
|
{
|
|
/*partially filled extent encountered due to user moved dbroot. Set hwm to the end of the extent.
|
|
If compressed,fill the rest eith empty values.
|
|
*/
|
|
unsigned int BLKS_PER_EXTENT = 0;
|
|
unsigned int nBlks = 0;
|
|
unsigned int nRem = 0;
|
|
FileOp fileOp;
|
|
long long fileSizeBytes = 0;
|
|
|
|
for (i = 0; i < dbRootExtentTrackers.size(); i++)
|
|
{
|
|
uint32_t colNo = column.colNo;
|
|
if (i != colNo)
|
|
dbRootExtentTrackers[i]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
|
|
|
|
// Round up HWM to the end of the current extent
|
|
BLKS_PER_EXTENT =
|
|
(BRMWrapper::getInstance()->getExtentRows() * newColStructList[i].colWidth) / BYTE_PER_BLOCK;
|
|
nBlks = newHwm + 1;
|
|
nRem = nBlks % BLKS_PER_EXTENT;
|
|
|
|
if (nRem > 0)
|
|
newHwm = nBlks - nRem + BLKS_PER_EXTENT - 1;
|
|
else
|
|
newHwm = nBlks - 1;
|
|
|
|
// save it to set in the end
|
|
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(newColStructList[i].dataOid);
|
|
ColExtInfo aExt;
|
|
aExt.dbRoot = dbRoot;
|
|
aExt.partNum = partition;
|
|
aExt.segNum = segment;
|
|
aExt.hwm = newHwm;
|
|
aExt.isNewExt = false;
|
|
aExt.current = false;
|
|
aColExtsInfo.push_back(aExt);
|
|
|
|
if (newColStructList[i].fCompressionType > 0)
|
|
{
|
|
string errorInfo;
|
|
rc = fileOp.fillCompColumnExtentEmptyChunks(
|
|
newColStructList[i].dataOid, newColStructList[i].colWidth, emptyVal, dbRoot, partition,
|
|
segment, newColStructList[i].colDataType, newHwm, segFile, errorInfo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
//@Bug 4758. Check whether this is a abbreviated extent
|
|
else if (newColStructList[i].fCompressionType == 0)
|
|
{
|
|
rc = fileOp.getFileSize(newColStructList[i].dataOid, dbRoot, partition, segment, fileSizeBytes);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
if (fileSizeBytes == (long long)INITIAL_EXTENT_ROWS_TO_DISK * newColStructList[i].colWidth)
|
|
{
|
|
IDBDataFile* pFile =
|
|
fileOp.openFile(newColStructList[i].dataOid, dbRoot, partition, segment, segFile);
|
|
|
|
if (!pFile)
|
|
{
|
|
rc = ERR_FILE_OPEN;
|
|
return rc;
|
|
}
|
|
|
|
rc = fileOp.expandAbbrevColumnExtent(pFile, dbRoot, emptyVal, newColStructList[i].colWidth,
|
|
newColStructList[i].colDataType);
|
|
|
|
// set hwm for this extent.
|
|
fileOp.closeFile(pFile);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
tableMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
|
|
}
|
|
|
|
newExtent =
|
|
dbRootExtentTrackers[column.colNo]->nextSegFile(dbRoot, partition, segment, newHwm, startLbid);
|
|
}
|
|
}
|
|
|
|
std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
|
|
|
|
if (newExtent)
|
|
{
|
|
// extend all columns together
|
|
std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
|
|
BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;
|
|
|
|
for (i = 0; i < newColStructList.size(); i++)
|
|
{
|
|
createStripeColumnExtentsArgIn.oid = newColStructList[i].dataOid;
|
|
createStripeColumnExtentsArgIn.width = newColStructList[i].colWidth;
|
|
createStripeColumnExtentsArgIn.colDataType = newColStructList[i].colDataType;
|
|
cols.push_back(createStripeColumnExtentsArgIn);
|
|
}
|
|
|
|
rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partition, segment, extents);
|
|
newHwm = extents[column.colNo].startBlkOffset;
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
// Create column files
|
|
vector<BRM::LBID_t> lbids;
|
|
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
|
|
|
for (i = 0; i < extents.size(); i++)
|
|
{
|
|
setColParam(newCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType,
|
|
newColStructList[i].colType, newColStructList[i].dataOid,
|
|
newColStructList[i].fCompressionType, dbRoot, partition, segment);
|
|
|
|
compressionType(newColStructList[i].fCompressionType);
|
|
rc = extendColumn(newCol, false, extents[i].startBlkOffset, extents[i].startLbid,
|
|
extents[i].allocSize, dbRoot, partition, segment, segFile, pFile, newFile);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
newColStructList[i].fColPartition = partition;
|
|
newColStructList[i].fColSegment = segment;
|
|
newColStructList[i].fColDbRoot = dbRoot;
|
|
newDctnryStructList[i].fColPartition = partition;
|
|
newDctnryStructList[i].fColSegment = segment;
|
|
newDctnryStructList[i].fColDbRoot = dbRoot;
|
|
lbids.push_back(extents[i].startLbid);
|
|
if (newExtents)
|
|
{
|
|
(*newExtents).push_back(extents[i].startLbid);
|
|
}
|
|
colDataTypes.push_back(newColStructList[i].colDataType);
|
|
}
|
|
|
|
// mark the extents to updating
|
|
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
// create corresponding dictionary files
|
|
if (newFile)
|
|
{
|
|
boost::scoped_ptr<WriteEngineWrapper> we(new WriteEngineWrapper());
|
|
we->setTransId(txnid);
|
|
we->setBulkFlag(true);
|
|
std::map<FID, FID> columnOids;
|
|
|
|
for (i = 0; i < newDctnryStructList.size(); i++)
|
|
{
|
|
if (newDctnryStructList[i].dctnryOid > 0)
|
|
{
|
|
rc = we->createDctnry(txnid, newDctnryStructList[i].dctnryOid, newDctnryStructList[i].colWidth,
|
|
dbRoot, partition, segment, newDctnryStructList[i].fCompressionType);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
columnOids[newDctnryStructList[i].dctnryOid] = newDctnryStructList[i].dctnryOid;
|
|
}
|
|
}
|
|
|
|
we->flushDataFiles(rc, txnid, columnOids);
|
|
}
|
|
}
|
|
|
|
// save the extent info for batch insert
|
|
if (isBatchInsert && newExtent)
|
|
{
|
|
TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
|
|
|
|
for (i = 0; i < newColStructList.size(); i++)
|
|
{
|
|
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(newColStructList[i].dataOid);
|
|
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
|
|
|
while (it != aColExtsInfo.end())
|
|
{
|
|
if ((it->dbRoot == newColStructList[i].fColDbRoot) &&
|
|
(it->partNum == newColStructList[i].fColPartition) &&
|
|
(it->segNum == newColStructList[i].fColSegment))
|
|
break;
|
|
|
|
it++;
|
|
}
|
|
|
|
ColExtInfo aExt;
|
|
aExt.dbRoot = newColStructList[i].fColDbRoot;
|
|
aExt.partNum = newColStructList[i].fColPartition;
|
|
aExt.segNum = newColStructList[i].fColSegment;
|
|
aExt.hwm = extents[i].startBlkOffset;
|
|
aExt.isNewExt = true;
|
|
aExt.current = true;
|
|
aExt.isDict = false;
|
|
aColExtsInfo.push_back(aExt);
|
|
tableMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
|
|
}
|
|
|
|
for (i = 0; i < newDctnryStructList.size(); i++)
|
|
{
|
|
if (newDctnryStructList[i].dctnryOid > 0)
|
|
{
|
|
ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
|
|
ColExtsInfo::iterator it = aColExtsInfo.begin();
|
|
|
|
while (it != aColExtsInfo.end())
|
|
{
|
|
if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) &&
|
|
(it->partNum == newDctnryStructList[i].fColPartition) &&
|
|
(it->segNum == newDctnryStructList[i].fColSegment))
|
|
break;
|
|
|
|
it++;
|
|
}
|
|
|
|
if (it == aColExtsInfo.end()) // add this one to the list
|
|
{
|
|
ColExtInfo aExt;
|
|
aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
|
|
aExt.partNum = newDctnryStructList[i].fColPartition;
|
|
aExt.segNum = newDctnryStructList[i].fColSegment;
|
|
aExt.compType = newDctnryStructList[i].fCompressionType;
|
|
aExt.isDict = true;
|
|
aColExtsInfo.push_back(aExt);
|
|
}
|
|
|
|
tableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
|
|
}
|
|
}
|
|
}
|
|
|
|
setColParam(newCol, 0, column.colWidth, column.colDataType, column.colType, column.dataFile.fid,
|
|
column.compressionType, dbRoot, partition, segment);
|
|
rc = openColumnFile(newCol, segFile, false); // @bug 5572 HDFS tmp file
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
//@Bug 3164 update compressed extent
|
|
updateColumnExtent(newCol.dataFile.pFile, allocSize, /*lbid=*/0);
|
|
//..Search first block of new extent for empty rows
|
|
rc = readBlock(newCol.dataFile.pFile, buf, newHwm);
|
|
|
|
// MCOL-498 add a block.
|
|
// DRRTUY TODO Given there is no more hwm pre-allocated we
|
|
// could extend the file once to accomodate all records.
|
|
if (rc != NO_ERROR)
|
|
{
|
|
if (rc == ERR_FILE_EOF)
|
|
{
|
|
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, newCol.colWidth);
|
|
RETURN_ON_ERROR(saveBlock(newCol.dataFile.pFile, buf, newHwm));
|
|
}
|
|
else
|
|
{
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
for (j = 0, curVal = buf; j < totalRowPerBlock; j++, curVal += column.colWidth)
|
|
{
|
|
if (isEmptyRow((uint64_t*)curVal, emptyVal,
|
|
column.colWidth)) // Why to check it if beacause line 483 is always true ?
|
|
{
|
|
rowIdArray[counter] = getRowId(newHwm, column.colWidth, j);
|
|
rowsallocated++;
|
|
rowsLeft++;
|
|
counter++;
|
|
|
|
if (rowsallocated >= totalRow)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rowsallocated < totalRow)
|
|
{
|
|
//..Search remaining blks in new extent for empty rows
|
|
newHwm++;
|
|
|
|
while (((totalRowPerBlock * newHwm) % extentRows) > 0)
|
|
{
|
|
rc = readBlock(newCol.dataFile.pFile, buf, newHwm);
|
|
|
|
// MCOL-498 add a block.
|
|
// DRRTUY TODO Given there is no more hwm pre-allocated we
|
|
// could extend the file once to accomodate all records.
|
|
if (rc != NO_ERROR)
|
|
{
|
|
if (rc == ERR_FILE_EOF)
|
|
{
|
|
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, newCol.colWidth);
|
|
RETURN_ON_ERROR(saveBlock(newCol.dataFile.pFile, buf, newHwm));
|
|
}
|
|
else
|
|
{
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
for (j = 0, curVal = buf; j < totalRowPerBlock; j++, curVal += column.colWidth)
|
|
{
|
|
if (isEmptyRow((uint64_t*)curVal, emptyVal, column.colWidth))
|
|
{
|
|
rowIdArray[counter] = getRowId(newHwm, newCol.colWidth, j);
|
|
rowsallocated++;
|
|
rowsLeft++;
|
|
counter++;
|
|
|
|
if (rowsallocated >= totalRow)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ((rowsallocated < totalRow))
|
|
{
|
|
newHwm++;
|
|
}
|
|
else
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (rowsallocated < totalRow)
|
|
{
|
|
return 1;
|
|
}
|
|
|
|
if (!newExtent)
|
|
rowsLeft = 0;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Clear a column
|
|
* PARAMETERS:
|
|
* column - column
|
|
* RETURN:
|
|
* none
|
|
***********************************************************/
|
|
void ColumnOp::clearColumn(Column& column, bool isFlush) const
|
|
{
|
|
if (column.dataFile.pFile && isFlush)
|
|
{
|
|
column.dataFile.pFile->flush();
|
|
}
|
|
|
|
// setColParam(column);
|
|
closeColumnFile(column);
|
|
setColParam(column);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Close column's file
|
|
* PARAMETERS:
|
|
* column - column
|
|
* RETURN:
|
|
* none
|
|
***********************************************************/
|
|
void ColumnOp::closeColumnFile(Column& column) const
|
|
{
|
|
if (column.dataFile.pFile != NULL)
|
|
closeFile(column.dataFile.pFile);
|
|
|
|
column.dataFile.pFile = NULL;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Create a column and its' related files
|
|
* PARAMETERS:
|
|
* column - column
|
|
* colNo - column number
|
|
* colWidth - column width
|
|
* colType - column type
|
|
* dataFid - the file id for column data file
|
|
* dbRoot - DBRoot where file is to be located
|
|
* partition-Starting partition number for segment file path
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_EXIST if file exists
|
|
* ERR_FILE_CREATE if something wrong in creating the file
|
|
***********************************************************/
|
|
int ColumnOp::createColumn(Column& column, int colNo, int colWidth,
|
|
CalpontSystemCatalog::ColDataType colDataType, ColType colType, FID dataFid,
|
|
uint16_t dbRoot, uint32_t partition)
|
|
{
|
|
int rc, newWidth, allocSize;
|
|
int compressionType = column.compressionType;
|
|
setColParam(column, colNo, colWidth, colDataType, colType);
|
|
const uint8_t* emptyVal = getEmptyRowValue(colDataType, colWidth);
|
|
newWidth = getCorrectRowWidth(colDataType, colWidth);
|
|
column.dataFile.fid = dataFid;
|
|
column.dataFile.fDbRoot = dbRoot;
|
|
column.dataFile.fPartition = partition;
|
|
column.dataFile.fSegment = 0;
|
|
column.compressionType = compressionType;
|
|
rc = createFile(column.dataFile.fid, allocSize, dbRoot, partition, colDataType, emptyVal, newWidth);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/* BUG931
|
|
* @brief Fills up a column with null/default values in all non-empty rows as the reference column. The
|
|
* reference column would typically be another column of the same table.
|
|
*
|
|
* @param
|
|
*
|
|
* @return
|
|
*/
|
|
int ColumnOp::fillColumn(const TxnID& txnid, Column& column, Column& refCol, void* defaultVal, Dctnry* dctnry,
|
|
ColumnOp* refColOp, const OID dictOid, const int dictColWidth,
|
|
const string defaultValStr, bool autoincrement)
|
|
{
|
|
unsigned char refColBuf[BYTE_PER_BLOCK]; // Refernce column buffer
|
|
unsigned char colBuf[BYTE_PER_BLOCK];
|
|
bool dirty = false;
|
|
HWM colHwm = 0;
|
|
RID maxRowId = 0;
|
|
int size = sizeof(Token);
|
|
|
|
long long startColFbo = 0;
|
|
long long startRefColFbo = 0;
|
|
|
|
int refBufOffset = 0;
|
|
int colBufOffset = 0;
|
|
uint64_t nexValNeeded = 0;
|
|
uint64_t nextVal;
|
|
uint32_t partition;
|
|
uint16_t segment;
|
|
HWM lastRefHwm;
|
|
int rc = 0;
|
|
std::string segFile, errorMsg;
|
|
BRM::LBID_t startLbid;
|
|
bool newFile = true;
|
|
int allocSize = 0;
|
|
/*
|
|
boost::scoped_ptr<Dctnry> dctnry;
|
|
if (m_compressionType == 0)
|
|
dctnry.reset(new DctnryCompress0);
|
|
else
|
|
dctnry.reset(new DctnryCompress1);
|
|
|
|
boost::scoped_ptr<ColumnOp> refColOp;
|
|
if (refCol.compressionType != 0)
|
|
refColOp.reset(new ColumnOpCompress1);
|
|
else
|
|
refColOp.reset(new ColumnOpCompress0);
|
|
*/
|
|
// get dbroots from config
|
|
Config config;
|
|
config.initConfigCache();
|
|
std::vector<uint16_t> rootList;
|
|
config.getRootIdList(rootList);
|
|
const uint8_t* emptyVal = getEmptyRowValue(column.colDataType, column.colWidth);
|
|
// Set TypeHandler to get empty value ptr for the ref column
|
|
findTypeHandler(refCol.colWidth, refCol.colDataType);
|
|
const uint8_t* refEmptyVal = getEmptyRowValue(refCol.colDataType, refCol.colWidth);
|
|
findTypeHandler(column.colWidth, column.colDataType);
|
|
// find the dbroots which have rows for refrence column
|
|
unsigned int i = 0, k = 0;
|
|
|
|
for (i = 0; i < rootList.size(); i++)
|
|
{
|
|
std::vector<struct BRM::EMEntry> refEntries;
|
|
rc = BRMWrapper::getInstance()->getExtents_dbroot(refCol.dataFile.fid, refEntries, rootList[i]);
|
|
std::vector<struct BRM::EMEntry>::const_iterator iter = refEntries.begin();
|
|
|
|
while (iter != refEntries.end())
|
|
{
|
|
// fill in for the new column for each extent in the reference column
|
|
// organize the extents into a file
|
|
std::vector<struct BRM::EMEntry> fileExtents;
|
|
fileExtents.push_back(refEntries[0]);
|
|
|
|
// cout << "Get extent for ref oid:dbroot:part:seg = " <<
|
|
// refCol.dataFile.fid<<":"<<rootList[0]<<":"<<refEntries[0].partitionNum
|
|
//<<":"<<refEntries[0].segmentNum<<endl;
|
|
for (k = 1; k < refEntries.size(); k++)
|
|
{
|
|
if ((refEntries[0].partitionNum == refEntries[k].partitionNum) &&
|
|
(refEntries[0].segmentNum == refEntries[k].segmentNum)) // already the same dbroot
|
|
{
|
|
fileExtents.push_back(refEntries[k]);
|
|
}
|
|
}
|
|
|
|
// Process this file
|
|
lastRefHwm = fileExtents[0].HWM;
|
|
|
|
for (k = 1; k < fileExtents.size(); k++)
|
|
{
|
|
// Find the hwm of this file
|
|
if (fileExtents[k].HWM > lastRefHwm)
|
|
lastRefHwm = fileExtents[k].HWM;
|
|
}
|
|
|
|
// create extents for the new column
|
|
// If we are processing the first extent in the first segment
|
|
// file, we check to see if we have enough rows (256K) to re-
|
|
// quire just create the initial abbrev extent for the new column.
|
|
std::vector<struct BRM::EMEntry> newEntries;
|
|
|
|
if ((refEntries.size() == 1) && (refEntries[0].partitionNum == 0) && (refEntries[0].segmentNum == 0))
|
|
{
|
|
//@Bug3565 use ref colwidth to calculate.
|
|
unsigned int numBlksForFirstExtent = (INITIAL_EXTENT_ROWS_TO_DISK / BYTE_PER_BLOCK) * refCol.colWidth;
|
|
|
|
if ((lastRefHwm + 1) < numBlksForFirstExtent)
|
|
{
|
|
rc = createColumn(column, 0, column.colWidth, column.colDataType, WriteEngine::WR_CHAR,
|
|
column.dataFile.fid, rootList[i], 0);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
// cout << "createColumn for oid " << column.dataFile.fid << endl;
|
|
BRM::EMEntry aEntry;
|
|
aEntry.partitionNum = partition = 0;
|
|
aEntry.segmentNum = segment = 0;
|
|
aEntry.dbRoot = rootList[i];
|
|
newEntries.push_back(aEntry);
|
|
|
|
if (dictOid > 3000) // Create dictionary file if needed
|
|
{
|
|
rc = dctnry->createDctnry(dictOid, dictColWidth, rootList[i], partition, segment, startLbid,
|
|
newFile);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
//@Bug 5652.
|
|
std::map<FID, FID> oids1;
|
|
oids1[dictOid] = dictOid;
|
|
dctnry->flushFile(rc, oids1);
|
|
dctnry->closeDctnry();
|
|
|
|
// tokenize default value if needed
|
|
if (defaultValStr.length() > 0)
|
|
{
|
|
DctnryStruct dctnryStruct;
|
|
dctnryStruct.dctnryOid = dictOid;
|
|
dctnryStruct.columnOid = column.dataFile.fid;
|
|
dctnryStruct.fColPartition = partition;
|
|
dctnryStruct.fColSegment = segment;
|
|
dctnryStruct.fColDbRoot = rootList[i];
|
|
dctnryStruct.colWidth = dictColWidth;
|
|
dctnryStruct.fCompressionType = column.compressionType;
|
|
DctnryTuple dctnryTuple;
|
|
dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str();
|
|
dctnryTuple.sigSize = defaultValStr.length();
|
|
|
|
rc = dctnry->openDctnry(dctnryStruct.dctnryOid, dctnryStruct.fColDbRoot,
|
|
dctnryStruct.fColPartition, dctnryStruct.fColSegment,
|
|
false); // @bug 5572 HDFS tmp file
|
|
rc = dctnry->updateDctnry(dctnryTuple.sigValue, dctnryTuple.sigSize, dctnryTuple.token);
|
|
|
|
if (dctnryStruct.fCompressionType > 0)
|
|
dctnry->closeDctnry(false);
|
|
else
|
|
dctnry->closeDctnry(true);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
memcpy(defaultVal, &dctnryTuple.token, size);
|
|
//@Bug 5652.
|
|
std::map<FID, FID> oids1;
|
|
oids1[dictOid] = dictOid;
|
|
dctnry->flushFile(rc, oids1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (newEntries.size() == 0)
|
|
{
|
|
for (k = 0; k < fileExtents.size(); k++)
|
|
{
|
|
uint16_t dbroot = rootList[i];
|
|
partition = fileExtents[k].partitionNum;
|
|
segment = fileExtents[k].segmentNum;
|
|
|
|
if (k == 0)
|
|
{
|
|
rc = addExtent(column, dbroot, partition, segment, segFile, startLbid, newFile, allocSize);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc; // Clean up will be done throgh DDLProc
|
|
|
|
// cout << "extendColumn for oid " << column.dataFile.fid << endl;
|
|
BRM::EMEntry aEntry;
|
|
aEntry.partitionNum = partition;
|
|
aEntry.segmentNum = segment;
|
|
aEntry.dbRoot = rootList[i];
|
|
newEntries.push_back(aEntry);
|
|
|
|
if ((dictOid > 3000) && newFile) // Create dictionary file if needed
|
|
{
|
|
rc = dctnry->createDctnry(dictOid, dictColWidth, rootList[i], partition, segment, startLbid,
|
|
newFile);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
//@Bug 5652.
|
|
std::map<FID, FID> oids1;
|
|
oids1[dictOid] = dictOid;
|
|
dctnry->flushFile(rc, oids1);
|
|
dctnry->closeDctnry();
|
|
|
|
// tokenize default value if needed
|
|
if (defaultValStr.length() > 0)
|
|
{
|
|
DctnryStruct dctnryStruct;
|
|
dctnryStruct.dctnryOid = dictOid;
|
|
dctnryStruct.columnOid = column.dataFile.fid;
|
|
dctnryStruct.fColPartition = partition;
|
|
dctnryStruct.fColSegment = segment;
|
|
dctnryStruct.fColDbRoot = rootList[i];
|
|
dctnryStruct.colWidth = dictColWidth;
|
|
dctnryStruct.fCompressionType = column.compressionType;
|
|
DctnryTuple dctnryTuple;
|
|
dctnryTuple.sigValue = (unsigned char*)defaultValStr.c_str();
|
|
// WriteEngineWrapper wrapper;
|
|
dctnryTuple.sigSize = defaultValStr.length();
|
|
// rc = wrapper.tokenize(txnid, dctnryStruct, dctnryTuple);
|
|
rc = dctnry->openDctnry(dctnryStruct.dctnryOid, dctnryStruct.fColDbRoot,
|
|
dctnryStruct.fColPartition, dctnryStruct.fColSegment,
|
|
false); // @bug 5572 HDFS tmp file
|
|
rc = dctnry->updateDctnry(dctnryTuple.sigValue, dctnryTuple.sigSize, dctnryTuple.token);
|
|
|
|
if (dctnryStruct.fCompressionType > 0)
|
|
dctnry->closeDctnry(false);
|
|
else
|
|
dctnry->closeDctnry(true);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
memcpy(defaultVal, &dctnryTuple.token, size);
|
|
//@Bug 5652.
|
|
std::map<FID, FID> oids1;
|
|
oids1[dictOid] = dictOid;
|
|
dctnry->flushFile(rc, oids1);
|
|
}
|
|
}
|
|
}
|
|
else // just add a extent to the file
|
|
{
|
|
rc = addExtent(column, dbroot, partition, segment, segFile, startLbid, newFile, allocSize);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc; // Clean up will be done throgh DDLProc
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fill the new file with values
|
|
// Open new column file and reference column file
|
|
column.dataFile.fDbRoot = rootList[i];
|
|
column.dataFile.fPartition = newEntries[0].partitionNum;
|
|
column.dataFile.fSegment = newEntries[0].segmentNum;
|
|
RETURN_ON_ERROR(openColumnFile(column, segFile, false)); // @bug 5572 HDFS tmp file
|
|
// cout << "Processing new col file " << segFile << endl;
|
|
refCol.dataFile.fDbRoot = rootList[i];
|
|
refCol.dataFile.fPartition = newEntries[0].partitionNum;
|
|
refCol.dataFile.fSegment = newEntries[0].segmentNum;
|
|
std::string segFileRef;
|
|
RETURN_ON_ERROR(refColOp->openColumnFile(refCol, segFileRef, false)); // @bug 5572 HDFS tmp file
|
|
// cout << "Processing ref file " << segFileRef << " and hwm is " << lastRefHwm << endl;
|
|
RETURN_ON_ERROR(refColOp->readBlock(refCol.dataFile.pFile, refColBuf, lastRefHwm));
|
|
|
|
refBufOffset = BYTE_PER_BLOCK - refCol.colWidth;
|
|
maxRowId = (lastRefHwm * BYTE_PER_BLOCK) / refCol.colWidth; // Local maxRowId
|
|
|
|
while (refBufOffset > 0)
|
|
{
|
|
if (memcmp(&refColBuf[refBufOffset], refEmptyVal, refCol.colWidth) != 0)
|
|
{
|
|
maxRowId = maxRowId + (refBufOffset / refCol.colWidth);
|
|
break;
|
|
}
|
|
|
|
refBufOffset -= refCol.colWidth;
|
|
}
|
|
|
|
// Compute local hwm for the new column
|
|
colHwm = (maxRowId * column.colWidth) / BYTE_PER_BLOCK;
|
|
// cout << " new col hwm is " << colHwm << endl;
|
|
startRefColFbo = 0;
|
|
startColFbo = 0;
|
|
// Initizliaing to BYTE_PER_BLOCK to force read the first time
|
|
refBufOffset = BYTE_PER_BLOCK;
|
|
colBufOffset = BYTE_PER_BLOCK;
|
|
dirty = false;
|
|
ExtCPInfo cpInfo(column.colDataType, column.colWidth);
|
|
|
|
if (autoincrement)
|
|
{
|
|
uint64_t nextValStart = 0;
|
|
|
|
while (startRefColFbo <= lastRefHwm || startColFbo <= colHwm)
|
|
{
|
|
// nexValNeeded = 0;
|
|
// cout << "current startRefColFbo:startColFbo:refBufOffset:colBufOffset = " << startRefColFbo
|
|
// <<":"<< startColFbo <<":"<<refBufOffset<<":"<<colBufOffset<< endl;
|
|
if ((refBufOffset + refCol.colWidth) > BYTE_PER_BLOCK)
|
|
{
|
|
// If current reference column block is fully processed get to the next one
|
|
// cout << "reading from ref " << endl;
|
|
RETURN_ON_ERROR(refColOp->readBlock(refCol.dataFile.pFile, refColBuf, startRefColFbo));
|
|
startRefColFbo++;
|
|
refBufOffset = 0;
|
|
nexValNeeded = 0;
|
|
}
|
|
|
|
if ((colBufOffset + column.colWidth) > BYTE_PER_BLOCK)
|
|
{
|
|
// Current block of the new colum is full. Write it if dirty and then get the next block
|
|
if (dirty)
|
|
{
|
|
// cout << " writing to new col " << endl;
|
|
RETURN_ON_ERROR(saveBlock(column.dataFile.pFile, colBuf, startColFbo - 1));
|
|
dirty = false;
|
|
}
|
|
|
|
// cout << "reading from new col " << endl;
|
|
RETURN_ON_ERROR(readBlock(column.dataFile.pFile, colBuf, startColFbo));
|
|
startColFbo++;
|
|
colBufOffset = 0;
|
|
}
|
|
|
|
if (nexValNeeded == 0)
|
|
{
|
|
int tmpBufOffset = 0;
|
|
|
|
while ((tmpBufOffset + refCol.colWidth) <= BYTE_PER_BLOCK)
|
|
|
|
{
|
|
if (memcmp(refColBuf + tmpBufOffset, refEmptyVal, refCol.colWidth) !=
|
|
0) // Find the number of nextVal needed.
|
|
{
|
|
nexValNeeded++;
|
|
// memcpy(colBuf + colBufOffset, defaultVal, column.colWidth);
|
|
// dirty = true;
|
|
}
|
|
|
|
tmpBufOffset += refCol.colWidth;
|
|
}
|
|
|
|
// reserve the next value, should have a AI sequence in controller from DDLProc
|
|
if (nexValNeeded > 0)
|
|
{
|
|
rc = BRMWrapper::getInstance()->getAutoIncrementRange(column.dataFile.fid, nexValNeeded,
|
|
nextVal, errorMsg);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
|
|
nextValStart = nextVal;
|
|
}
|
|
|
|
// write the values to column
|
|
|
|
// colBufOffset = 0; @Bug 5436 ref column coud have different column width
|
|
while (((refBufOffset + refCol.colWidth) <= BYTE_PER_BLOCK) &&
|
|
((colBufOffset + column.colWidth) <= BYTE_PER_BLOCK))
|
|
{
|
|
if (memcmp(refColBuf + refBufOffset, refEmptyVal, refCol.colWidth) !=
|
|
0) // Find the number of nextVal needed.
|
|
{
|
|
memcpy(defaultVal, &nextVal, 8);
|
|
nextVal++;
|
|
memcpy(colBuf + colBufOffset, defaultVal, column.colWidth);
|
|
dirty = true;
|
|
}
|
|
|
|
refBufOffset += refCol.colWidth;
|
|
colBufOffset += column.colWidth;
|
|
}
|
|
}
|
|
|
|
if (!cpInfo.isBinaryColumn())
|
|
{
|
|
cpInfo.fCPInfo.max = nextValStart + nexValNeeded - 1;
|
|
cpInfo.fCPInfo.min = nextValStart;
|
|
}
|
|
else
|
|
{
|
|
cpInfo.fCPInfo.bigMax = nextValStart + nexValNeeded - 1;
|
|
cpInfo.fCPInfo.bigMin = nextValStart;
|
|
}
|
|
cpInfo.fCPInfo.seqNum = 0;
|
|
}
|
|
else
|
|
{
|
|
while (startRefColFbo <= lastRefHwm || startColFbo <= colHwm)
|
|
{
|
|
if ((refBufOffset + refCol.colWidth) > BYTE_PER_BLOCK)
|
|
{
|
|
// If current reference column block is fully processed get to the next one
|
|
RETURN_ON_ERROR(refColOp->readBlock(refCol.dataFile.pFile, refColBuf, startRefColFbo));
|
|
startRefColFbo++;
|
|
refBufOffset = 0;
|
|
}
|
|
|
|
if ((colBufOffset + column.colWidth) > BYTE_PER_BLOCK)
|
|
{
|
|
// Current block of the new colum is full. Write it if dirty and then get the next block
|
|
if (dirty)
|
|
{
|
|
RETURN_ON_ERROR(saveBlock(column.dataFile.pFile, colBuf, startColFbo - 1));
|
|
dirty = false;
|
|
}
|
|
|
|
RETURN_ON_ERROR(readBlock(column.dataFile.pFile, colBuf, startColFbo));
|
|
startColFbo++;
|
|
colBufOffset = 0;
|
|
}
|
|
|
|
while (((refBufOffset + refCol.colWidth) <= BYTE_PER_BLOCK) &&
|
|
((colBufOffset + column.colWidth) <= BYTE_PER_BLOCK))
|
|
{
|
|
if (memcmp(refColBuf + refBufOffset, refEmptyVal, refCol.colWidth) != 0)
|
|
{
|
|
/*if (autoincrement)
|
|
{
|
|
memcpy(defaultVal, &nextVal, 8);
|
|
nextVal++;
|
|
} */
|
|
memcpy(colBuf + colBufOffset, defaultVal, column.colWidth);
|
|
dirty = true;
|
|
}
|
|
else if (column.compressionType != 0) //@Bug 3866, fill the empty row value for compressed chunk
|
|
{
|
|
memcpy(colBuf + colBufOffset, emptyVal, column.colWidth);
|
|
dirty = true;
|
|
}
|
|
|
|
refBufOffset += refCol.colWidth;
|
|
colBufOffset += column.colWidth;
|
|
}
|
|
}
|
|
|
|
cpInfo.toInvalid();
|
|
|
|
cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID;
|
|
}
|
|
|
|
if (dirty)
|
|
{
|
|
RETURN_ON_ERROR(saveBlock(column.dataFile.pFile, colBuf, startColFbo - 1));
|
|
dirty = false;
|
|
}
|
|
|
|
std::map<FID, FID> oids;
|
|
oids[column.dataFile.fid] = column.dataFile.fid;
|
|
oids[refCol.dataFile.fid] = refCol.dataFile.fid;
|
|
rc = flushFile(rc, oids);
|
|
closeColumnFile(column);
|
|
refColOp->closeColumnFile(refCol);
|
|
oids.clear();
|
|
|
|
// Mark extents invalid first
|
|
BRM::LBID_t startLbid;
|
|
rc = BRMWrapper::getInstance()->getStartLbid(column.dataFile.fid, column.dataFile.fPartition,
|
|
column.dataFile.fSegment, colHwm, startLbid);
|
|
|
|
if (autoincrement) //@Bug 4074. Mark it invalid first to set later
|
|
{
|
|
ExtCPInfo cpInfo1(column.colDataType, column.colWidth);
|
|
cpInfo1.toInvalid();
|
|
cpInfo1.fCPInfo.seqNum = SEQNUM_MARK_INVALID;
|
|
cpInfo1.fCPInfo.firstLbid = startLbid;
|
|
ExtCPInfoList cpinfoList1;
|
|
cpinfoList1.push_back(cpInfo1);
|
|
rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList1);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
|
|
ExtCPInfoList cpinfoList;
|
|
cpInfo.fCPInfo.firstLbid = startLbid;
|
|
cpinfoList.push_back(cpInfo);
|
|
rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
rc = BRMWrapper::getInstance()->setLocalHWM((OID)column.dataFile.fid, column.dataFile.fPartition,
|
|
column.dataFile.fSegment, colHwm);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
// erase the entries from this dbroot.
|
|
std::vector<struct BRM::EMEntry> refEntriesTrimed;
|
|
|
|
for (uint32_t m = 0; m < refEntries.size(); m++)
|
|
{
|
|
if ((refEntries[0].partitionNum != refEntries[m].partitionNum) ||
|
|
(refEntries[0].segmentNum != refEntries[m].segmentNum))
|
|
refEntriesTrimed.push_back(refEntries[m]);
|
|
}
|
|
|
|
refEntriesTrimed.swap(refEntries);
|
|
iter = refEntries.begin();
|
|
}
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Create a table file
|
|
* PARAMETERS:
|
|
* tableFid - the file id for table bitmap file
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_EXIST if file exists
|
|
* ERR_FILE_CREATE if something wrong in creating the file
|
|
***********************************************************/
|
|
int ColumnOp::createTable(/*const FID tableFid*/) const
|
|
{
|
|
// return createFile(tableFid, DEFAULT_TOTAL_BLOCK );
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Drop column related files
|
|
* PARAMETERS:
|
|
* dataFid - the file id for column data file
|
|
* bitmapFid - the file id for column bitmap file
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NOT_EXIST if file not exist
|
|
***********************************************************/
|
|
int ColumnOp::dropColumn(const FID dataFid)
|
|
{
|
|
return deleteFile(dataFid);
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Drop column and dictionary related files
|
|
* PARAMETERS:
|
|
* dataFids - the file oids for column and dictionary data file
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_NOT_EXIST if file not exist
|
|
***********************************************************/
|
|
int ColumnOp::dropFiles(const std::vector<int32_t>& dataFids)
|
|
{
|
|
return deleteFiles(dataFids);
|
|
}
|
|
|
|
int ColumnOp::dropPartitions(const std::vector<OID>& dataFids,
|
|
const std::vector<BRM::PartitionInfo>& partitions)
|
|
{
|
|
return deletePartitions(dataFids, partitions);
|
|
}
|
|
|
|
int ColumnOp::deleteOIDsFromExtentMap(const std::vector<int32_t>& dataFids)
|
|
{
|
|
int rc = 0;
|
|
rc = BRMWrapper::getInstance()->deleteOIDsFromExtentMap(dataFids);
|
|
return rc;
|
|
}
|
|
|
|
/**************************************************************
|
|
* DESCRIPTION:
|
|
* Add an extent to the specified column OID and DBRoot.
|
|
* Partition and segment number (and HWM) of the segment file containing
|
|
* the new extent are returned.
|
|
* PARAMETERS:
|
|
* column - input column attributes like OID and column width.
|
|
* leaveFileOpen - indicates whether db file is to be left open upon return
|
|
* hwm - The HWM (or fbo) of the column segment file where the
|
|
* new extent begins.
|
|
* startLbid - The starting LBID for the new extent.
|
|
* allocSize - Number of blocks in new extent.
|
|
* dbRoot - The DBRoot of the file with the new extent.
|
|
* partition - The partition num of the file with the new extent.
|
|
* segment - The segment number of the file with the new extent.
|
|
* segFile - Name of the segment file to which the extent is added.
|
|
* pFile - FILE ptr to the file where the extent is added.
|
|
* newFile - Indicates if extent is added to new or existing file.
|
|
* hdrs - Contents of headers if file is compressed.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* other number if fail
|
|
**************************************************************/
|
|
int ColumnOp::extendColumn(const Column& column, bool leaveFileOpen, HWM hwm, BRM::LBID_t startLbid,
|
|
int allocSize, uint16_t dbRoot, uint32_t partition, uint16_t segment,
|
|
std::string& segFile, IDBDataFile*& pFile, bool& newFile, char* hdrs)
|
|
{
|
|
const uint8_t* emptyVal = getEmptyRowValue(column.colDataType, column.colWidth);
|
|
int rc = extendFile(column.dataFile.fid, emptyVal, column.colWidth, column.colDataType, hwm, startLbid,
|
|
allocSize, dbRoot, partition, segment, segFile, pFile, newFile, hdrs);
|
|
|
|
if (rc != NO_ERROR)
|
|
{
|
|
if ((!leaveFileOpen) && (pFile))
|
|
closeFile(pFile);
|
|
|
|
return rc;
|
|
}
|
|
|
|
// Only close file for DML/DDL; leave file open for bulkload
|
|
if (!leaveFileOpen)
|
|
closeFile(pFile);
|
|
|
|
return rc;
|
|
}
|
|
|
|
int ColumnOp::addExtent(const Column& column, uint16_t dbRoot, uint32_t partition, uint16_t segment,
|
|
std::string& segFile, BRM::LBID_t& startLbid, bool& newFile, int& allocSize,
|
|
char* hdrs)
|
|
{
|
|
const uint8_t* emptyVal = getEmptyRowValue(column.colDataType, column.colWidth);
|
|
int rc = addExtentExactFile(column.dataFile.fid, emptyVal, column.colWidth, allocSize, dbRoot, partition,
|
|
segment, column.colDataType, segFile, startLbid, newFile, hdrs);
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Expand the current abbreviated extent in the column file to a full
|
|
* extent.
|
|
* PARAMETERS:
|
|
* column - input column attributes like OID and column width.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* other number if fail
|
|
***********************************************************/
|
|
int ColumnOp::expandAbbrevExtent(const Column& column)
|
|
{
|
|
const uint8_t* emptyVal = getEmptyRowValue(column.colDataType, column.colWidth);
|
|
int rc = expandAbbrevColumnExtent(column.dataFile.pFile, column.dataFile.fDbRoot, emptyVal, column.colWidth,
|
|
column.colDataType);
|
|
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Get column data type
|
|
* PARAMETERS:
|
|
* name - type name
|
|
* RETURN:
|
|
* true if success, false otherwise
|
|
***********************************************************/
|
|
bool ColumnOp::getColDataType(const char* name, CalpontSystemCatalog::ColDataType& colDataType) const
|
|
{
|
|
bool bFound = false;
|
|
|
|
for (int i = 0; i < CalpontSystemCatalog::NUM_OF_COL_DATA_TYPE; i++)
|
|
if (strcmp(name, ColDataTypeStr[i]) == 0)
|
|
{
|
|
colDataType = static_cast<CalpontSystemCatalog::ColDataType>(i);
|
|
bFound = true;
|
|
break;
|
|
}
|
|
|
|
return bFound;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Initialize a column
|
|
* PARAMETERS:
|
|
* column - current column
|
|
* RETURN:
|
|
* none
|
|
***********************************************************/
|
|
void ColumnOp::initColumn(Column& column) const
|
|
{
|
|
setColParam(column);
|
|
column.dataFile.pFile = NULL;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Check whether the row is empty
|
|
* PARAMETERS:
|
|
* buf - data buffer
|
|
* offset - buffer offset
|
|
* column - current column
|
|
* RETURN:
|
|
* true if success, false otherwise
|
|
***********************************************************/
|
|
inline bool ColumnOp::isEmptyRow(uint64_t* curVal, const uint8_t* emptyVal, const int colWidth)
|
|
{
|
|
// colWidth is either 1, 2, 4, 8, or 16 (Convertor::getCorrectRowWidth)
|
|
switch (colWidth)
|
|
{
|
|
case 1: return *(uint8_t*)curVal == *(uint8_t*)emptyVal;
|
|
|
|
case 2: return *(uint16_t*)curVal == *(uint16_t*)emptyVal;
|
|
|
|
case 4: return *(uint32_t*)curVal == *(uint32_t*)emptyVal;
|
|
|
|
case 8: return *(uint64_t*)curVal == *(uint64_t*)emptyVal;
|
|
|
|
case 16: return *(uint128_t*)curVal == *(uint128_t*)emptyVal;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Check whether column parameters are valid
|
|
* PARAMETERS:
|
|
* column - current column
|
|
* RETURN:
|
|
* true if success, false otherwise
|
|
***********************************************************/
|
|
bool ColumnOp::isValid(Column& column) const
|
|
{
|
|
return /*column.colNo > 0 && */ column.colWidth > 0;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Open all column related files
|
|
* PARAMETERS:
|
|
* column - column (includes the file id for column data file,
|
|
* as well as the DBRoot, partition, and segment number)
|
|
* segFile- is set to the name of the column segment file
|
|
* that is opened.
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_READ if something wrong in reading the file
|
|
***********************************************************/
|
|
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
|
int ColumnOp::openColumnFile(Column& column, std::string& segFile, bool useTmpSuffix,
|
|
int ioBuffSize, bool isReadOnly) const
|
|
{
|
|
if (!isValid(column))
|
|
return ERR_INVALID_PARAM;
|
|
|
|
std::string mode = "r";
|
|
|
|
if (!isReadOnly)
|
|
{
|
|
mode = "r+b";
|
|
}
|
|
|
|
// open column data file
|
|
column.dataFile.pFile =
|
|
openFile(column, column.dataFile.fDbRoot, column.dataFile.fPartition, column.dataFile.fSegment,
|
|
column.dataFile.fSegFileName, useTmpSuffix, mode.c_str(), ioBuffSize, isReadOnly);
|
|
segFile = column.dataFile.fSegFileName;
|
|
|
|
if (column.dataFile.pFile == NULL)
|
|
{
|
|
ostringstream oss;
|
|
oss << "oid: " << column.dataFile.fid << " with path " << segFile;
|
|
logging::Message::Args args;
|
|
logging::Message message(1);
|
|
args.add("Error opening file ");
|
|
args.add(oss.str());
|
|
args.add("");
|
|
args.add("");
|
|
message.format(args);
|
|
logging::LoggingID lid(21);
|
|
logging::MessageLog ml(lid);
|
|
|
|
ml.logErrorMessage(message);
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
|
|
// open column bitmap file
|
|
/* column.bitmapFile.pFile = openFile(column.bitmapFile.fid);
|
|
if (column.bitmapFile.pFile == NULL) {
|
|
closeFile(column.dataFile.pFile ); // clear previous one
|
|
column.dataFile.pFile = NULL;
|
|
return ERR_FILE_OPEN;
|
|
}
|
|
*/
|
|
return NO_ERROR;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Open all table related files
|
|
* PARAMETERS:
|
|
* table - table structure
|
|
* RETURN:
|
|
* NO_ERROR if success
|
|
* ERR_FILE_READ if something wrong in reading the file
|
|
***********************************************************/
|
|
/* int ColumnOp::openTableFile() const
|
|
{
|
|
// open table bitmap file
|
|
return NO_ERROR;
|
|
}
|
|
*/
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Set column parameters
|
|
* PARAMETERS:
|
|
* column - current column
|
|
* colNo - column no
|
|
* colWidth - column width
|
|
* RETURN:
|
|
* none
|
|
***********************************************************/
|
|
void ColumnOp::setColParam(Column& column, int colNo, int colWidth,
|
|
CalpontSystemCatalog::ColDataType colDataType, ColType colType, FID dataFid,
|
|
int compressionType, uint16_t dbRoot, uint32_t partition, uint16_t segment) const
|
|
{
|
|
column.colNo = colNo;
|
|
column.colWidth = colWidth;
|
|
column.colType = colType;
|
|
column.colDataType = colDataType;
|
|
|
|
column.dataFile.fid = dataFid;
|
|
column.dataFile.fDbRoot = dbRoot;
|
|
column.dataFile.fPartition = partition;
|
|
column.dataFile.fSegment = segment;
|
|
|
|
column.compressionType = compressionType;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write row(s)
|
|
* PARAMETERS:
|
|
* curCol - column information
|
|
* totalRow - the total number of rows need to be inserted
|
|
* rowIdArray - the array of row id, for performance purpose, I am assuming the rowIdArray is sorted
|
|
* valArray - the array of row values
|
|
* oldValArray - the array of old value
|
|
* bDelete - yet. The flag must be useless b/c writeRows
|
|
* is used for deletion.
|
|
* RETURN:
|
|
* NO_ERROR if success, other number otherwise
|
|
***********************************************************/
|
|
int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray, const void* valArray,
|
|
void* oldValArray, bool bDelete)
|
|
{
|
|
uint64_t i = 0, curRowId;
|
|
int dataFbo, dataBio, curDataFbo = -1;
|
|
unsigned char dataBuf[BYTE_PER_BLOCK];
|
|
bool bExit = false, bDataDirty = false;
|
|
const void* pVal = 0;
|
|
char charTmpBuf[8];
|
|
int rc = NO_ERROR;
|
|
uint16_t rowsInBlock = BYTE_PER_BLOCK / curCol.colWidth;
|
|
|
|
while (!bExit)
|
|
{
|
|
curRowId = rowIdArray[i];
|
|
|
|
calculateRowId(curRowId, rowsInBlock, curCol.colWidth, dataFbo, dataBio);
|
|
|
|
// load another data block if necessary
|
|
if (curDataFbo != dataFbo)
|
|
{
|
|
if (bDataDirty)
|
|
{
|
|
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
bDataDirty = false;
|
|
}
|
|
|
|
curDataFbo = dataFbo;
|
|
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
bDataDirty = true;
|
|
}
|
|
|
|
switch (curCol.colType)
|
|
{
|
|
case WriteEngine::WR_FLOAT:
|
|
if (!bDelete)
|
|
pVal = &((float*)valArray)[i];
|
|
|
|
break;
|
|
|
|
case WriteEngine::WR_DOUBLE:
|
|
if (!bDelete)
|
|
pVal = &((double*)valArray)[i];
|
|
|
|
break;
|
|
|
|
case WriteEngine::WR_VARBINARY: // treat same as char for now
|
|
case WriteEngine::WR_BLOB:
|
|
case WriteEngine::WR_TEXT:
|
|
case WriteEngine::WR_CHAR:
|
|
if (!bDelete)
|
|
{
|
|
memcpy(charTmpBuf, (char*)valArray + i * 8, 8);
|
|
pVal = charTmpBuf;
|
|
}
|
|
break;
|
|
|
|
case WriteEngine::WR_SHORT:
|
|
if (!bDelete)
|
|
pVal = &((short*)valArray)[i];
|
|
|
|
break;
|
|
|
|
case WriteEngine::WR_BYTE:
|
|
if (!bDelete)
|
|
pVal = &((char*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_LONGLONG:
|
|
if (!bDelete)
|
|
pVal = &((long long*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_TOKEN:
|
|
if (!bDelete)
|
|
pVal = &((Token*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_INT:
|
|
case WriteEngine::WR_MEDINT:
|
|
if (!bDelete)
|
|
pVal = &((int*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_USHORT:
|
|
if (!bDelete)
|
|
pVal = &((uint16_t*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_UBYTE:
|
|
if (!bDelete)
|
|
pVal = &((uint8_t*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_UINT:
|
|
case WriteEngine::WR_UMEDINT:
|
|
if (!bDelete)
|
|
pVal = &((uint32_t*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_ULONGLONG:
|
|
if (!bDelete)
|
|
pVal = &((uint64_t*)valArray)[i];
|
|
break;
|
|
|
|
case WriteEngine::WR_BINARY:
|
|
if (!bDelete)
|
|
{
|
|
if (curCol.colWidth == datatypes::MAXDECIMALWIDTH)
|
|
pVal = &((int128_t*)valArray)[i];
|
|
}
|
|
break;
|
|
|
|
default:
|
|
if (!bDelete)
|
|
pVal = &((int*)valArray)[i];
|
|
break;
|
|
}
|
|
|
|
if (bDelete)
|
|
{
|
|
pVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
|
|
}
|
|
|
|
// This is the write stuff
|
|
if (oldValArray)
|
|
{
|
|
uint8_t* p = static_cast<uint8_t*>(oldValArray);
|
|
memcpy(p + curCol.colWidth * i, dataBuf + dataBio, curCol.colWidth);
|
|
}
|
|
|
|
writeBufValue(dataBuf + dataBio, pVal, curCol.colWidth);
|
|
|
|
i++;
|
|
|
|
if (i >= totalRow)
|
|
bExit = true;
|
|
}
|
|
|
|
// take care of the cleanup
|
|
if (bDataDirty && curDataFbo >= 0)
|
|
{
|
|
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write rows
|
|
* PARAMETERS:
|
|
* curCol - column information
|
|
* totalRow - the total number of rows need to be inserted
|
|
* ridList - the vector of row id
|
|
* valArray - the array of one row value
|
|
* oldValArray - the array of old value
|
|
* RETURN:
|
|
* NO_ERROR if success, other number otherwise
|
|
***********************************************************/
|
|
int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridList, const void* valArray,
|
|
void* oldValArray, bool bDelete)
|
|
{
|
|
uint64_t i = 0, curRowId;
|
|
int dataFbo, dataBio, curDataFbo = -1;
|
|
unsigned char dataBuf[BYTE_PER_BLOCK];
|
|
bool bExit = false, bDataDirty = false;
|
|
const void* pVal = 0;
|
|
// void* pOldVal;
|
|
char charTmpBuf[8];
|
|
int rc = NO_ERROR;
|
|
|
|
if (bDelete)
|
|
{
|
|
pVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
|
|
}
|
|
|
|
while (!bExit)
|
|
{
|
|
curRowId = ridList[i];
|
|
|
|
calculateRowId(curRowId, BYTE_PER_BLOCK / curCol.colWidth, curCol.colWidth, dataFbo, dataBio);
|
|
|
|
// load another data block if necessary
|
|
if (curDataFbo != dataFbo)
|
|
{
|
|
if (bDataDirty)
|
|
{
|
|
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
curCol.dataFile.pFile->flush();
|
|
bDataDirty = false;
|
|
}
|
|
|
|
curDataFbo = dataFbo;
|
|
//@Bug 4849. need to check error code to prevent disk error
|
|
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
bDataDirty = true;
|
|
}
|
|
|
|
// This is a awkward way to convert void* and get ith element, I just don't have a good solution for that
|
|
// How about pVal = valArray? You're always getting the 0'th element here anyways.
|
|
// TODO MCOL-641 add support here
|
|
// This branch does not seem to be called from anywhere
|
|
if (!bDelete)
|
|
{
|
|
switch (curCol.colType)
|
|
{
|
|
case WriteEngine::WR_FLOAT: pVal = &((float*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_DOUBLE: pVal = &((double*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_VARBINARY: // treat same as char for now
|
|
case WriteEngine::WR_BLOB:
|
|
case WriteEngine::WR_TEXT:
|
|
case WriteEngine::WR_CHAR:
|
|
memcpy(charTmpBuf, (char*)valArray, 8);
|
|
pVal = charTmpBuf;
|
|
break;
|
|
|
|
// case WriteEngine::WR_BIT : pVal = &((bool *) valArray)[i]; break;
|
|
case WriteEngine::WR_SHORT: pVal = &((short*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_BYTE: pVal = &((char*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_LONGLONG: pVal = &((long long*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_TOKEN: pVal = &((Token*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_INT:
|
|
case WriteEngine::WR_MEDINT: pVal = &((int*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_USHORT: pVal = &((uint16_t*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_UBYTE: pVal = &((uint8_t*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_ULONGLONG: pVal = &((uint64_t*)valArray)[0]; break;
|
|
|
|
case WriteEngine::WR_UINT:
|
|
case WriteEngine::WR_UMEDINT: pVal = &((uint32_t*)valArray)[0]; break;
|
|
|
|
default: pVal = &((int*)valArray)[0]; break;
|
|
}
|
|
}
|
|
|
|
// This is the write stuff
|
|
if (oldValArray)
|
|
{
|
|
uint8_t* p = static_cast<uint8_t*>(oldValArray);
|
|
memcpy(p + i * curCol.colWidth, dataBuf + dataBio, curCol.colWidth);
|
|
}
|
|
|
|
writeBufValue(dataBuf + dataBio, pVal, curCol.colWidth);
|
|
|
|
i++;
|
|
|
|
if (i >= totalRow)
|
|
bExit = true;
|
|
}
|
|
|
|
// take care of the cleanup
|
|
if (bDataDirty && curDataFbo >= 0)
|
|
{
|
|
//@Bug 4849. need to check error code to prevent disk error
|
|
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
}
|
|
|
|
curCol.dataFile.pFile->flush();
|
|
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* MCOL-5021 Read-only version of writeRows() function.
|
|
* PARAMETERS:
|
|
* curCol - column information
|
|
* totalRow - the total number of rows that need to be read
|
|
* ridList - the vector of row id
|
|
* oldValArray - the array of old value
|
|
* RETURN:
|
|
* NO_ERROR if success, other number otherwise
|
|
***********************************************************/
|
|
int ColumnOp::writeRowsReadOnly(Column& curCol, uint64_t totalRow, const RIDList& ridList,
|
|
void* oldValArray)
|
|
{
|
|
uint64_t i = 0, curRowId;
|
|
int dataFbo, dataBio, curDataFbo = -1;
|
|
unsigned char dataBuf[BYTE_PER_BLOCK];
|
|
int rc = NO_ERROR;
|
|
|
|
while (i < totalRow)
|
|
{
|
|
curRowId = ridList[i];
|
|
|
|
calculateRowId(curRowId, BYTE_PER_BLOCK / curCol.colWidth, curCol.colWidth, dataFbo, dataBio);
|
|
|
|
// load another data block if necessary
|
|
if (curDataFbo != dataFbo)
|
|
{
|
|
curDataFbo = dataFbo;
|
|
//@Bug 4849. need to check error code to prevent disk error
|
|
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
}
|
|
|
|
// Read the old value of the record
|
|
if (oldValArray)
|
|
{
|
|
uint8_t* p = static_cast<uint8_t*>(oldValArray);
|
|
memcpy(p + i * curCol.colWidth, dataBuf + dataBio, curCol.colWidth);
|
|
}
|
|
|
|
i++;
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
/***********************************************************
|
|
* DESCRIPTION:
|
|
* Write rows
|
|
* PARAMETERS:
|
|
* curCol - column information
|
|
* totalRow - the total number of rows need to be inserted
|
|
* ridList - the vector of row id
|
|
* valArray - the array of one row value
|
|
* oldValArray - the array of old value
|
|
* RETURN:
|
|
* NO_ERROR if success, other number otherwise
|
|
***********************************************************/
|
|
int ColumnOp::writeRowsValues(Column& curCol, uint64_t totalRow, const RIDList& ridList, const void* valArray,
|
|
void* oldValArray)
|
|
{
|
|
uint64_t i = 0, curRowId;
|
|
int dataFbo, dataBio, curDataFbo = -1;
|
|
unsigned char dataBuf[BYTE_PER_BLOCK];
|
|
bool bExit = false, bDataDirty = false;
|
|
void* pVal = 0;
|
|
// void* pOldVal;
|
|
char charTmpBuf[8];
|
|
int rc = NO_ERROR;
|
|
|
|
while (!bExit)
|
|
{
|
|
curRowId = ridList[i];
|
|
|
|
calculateRowId(curRowId, BYTE_PER_BLOCK / curCol.colWidth, curCol.colWidth, dataFbo, dataBio);
|
|
|
|
// load another data block if necessary
|
|
if (curDataFbo != dataFbo)
|
|
{
|
|
if (bDataDirty)
|
|
{
|
|
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
bDataDirty = false;
|
|
}
|
|
|
|
curDataFbo = dataFbo;
|
|
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
|
|
if (rc != NO_ERROR)
|
|
return rc;
|
|
|
|
bDataDirty = true;
|
|
}
|
|
|
|
// This is a awkward way to convert void* and get ith element, I just don't have a good solution for that
|
|
switch (curCol.colType)
|
|
{
|
|
case WriteEngine::WR_FLOAT: pVal = &((float*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_DOUBLE: pVal = &((double*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_VARBINARY: // treat same as char for now
|
|
case WriteEngine::WR_BLOB:
|
|
case WriteEngine::WR_TEXT:
|
|
case WriteEngine::WR_CHAR:
|
|
{
|
|
memcpy(charTmpBuf, (char*)valArray + i * 8, 8);
|
|
pVal = charTmpBuf;
|
|
}
|
|
break;
|
|
|
|
case WriteEngine::WR_SHORT: pVal = &((short*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_BYTE: pVal = &((char*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_LONGLONG: pVal = &((long long*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_TOKEN: pVal = &((Token*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_INT:
|
|
case WriteEngine::WR_MEDINT: pVal = &((int*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_USHORT: pVal = &((uint16_t*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_UBYTE: pVal = &((uint8_t*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_ULONGLONG: pVal = &((uint64_t*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_UINT:
|
|
case WriteEngine::WR_UMEDINT: pVal = &((uint32_t*)valArray)[i]; break;
|
|
|
|
case WriteEngine::WR_BINARY: pVal = &((int128_t*)valArray)[i]; break;
|
|
|
|
default: pVal = &((int*)valArray)[i]; break;
|
|
}
|
|
|
|
// This is the write stuff
|
|
if (oldValArray)
|
|
{
|
|
uint8_t* p = static_cast<uint8_t*>(oldValArray);
|
|
memcpy(p + curCol.colWidth * i, dataBuf + dataBio, curCol.colWidth);
|
|
}
|
|
|
|
writeBufValue(dataBuf + dataBio, pVal, curCol.colWidth);
|
|
|
|
i++;
|
|
|
|
if (i >= totalRow)
|
|
bExit = true;
|
|
}
|
|
|
|
// take care of the cleanup
|
|
if (bDataDirty && curDataFbo >= 0)
|
|
{
|
|
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
|
|
}
|
|
|
|
return rc;
|
|
}
|
|
|
|
} // namespace WriteEngine
|