You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-07-30 19:23:07 +03:00
MCOL-5021 For the DELETE operation, empty magic values are only
written to database files for AUX column. Perform read-only operation for other columns in the table to update the Casual Partitioning information.
This commit is contained in:
@ -601,9 +601,9 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent, Column& col
|
||||
* RETURN:
|
||||
* none
|
||||
***********************************************************/
|
||||
void ColumnOp::clearColumn(Column& column) const
|
||||
void ColumnOp::clearColumn(Column& column, bool isFlush) const
|
||||
{
|
||||
if (column.dataFile.pFile)
|
||||
if (column.dataFile.pFile && isFlush)
|
||||
{
|
||||
column.dataFile.pFile->flush();
|
||||
}
|
||||
@ -1406,7 +1406,8 @@ bool ColumnOp::isValid(Column& column) const
|
||||
* ERR_FILE_READ if something wrong in reading the file
|
||||
***********************************************************/
|
||||
// @bug 5572 - HDFS usage: add *.tmp file backup flag
|
||||
int ColumnOp::openColumnFile(Column& column, std::string& segFile, bool useTmpSuffix, int ioBuffSize) const
|
||||
int ColumnOp::openColumnFile(Column& column, std::string& segFile, bool useTmpSuffix,
|
||||
int ioBuffSize) const
|
||||
{
|
||||
if (!isValid(column))
|
||||
return ERR_INVALID_PARAM;
|
||||
@ -1685,6 +1686,11 @@ int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridLis
|
||||
char charTmpBuf[8];
|
||||
int rc = NO_ERROR;
|
||||
|
||||
if (bDelete)
|
||||
{
|
||||
pVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
|
||||
}
|
||||
|
||||
while (!bExit)
|
||||
{
|
||||
curRowId = ridList[i];
|
||||
@ -1759,10 +1765,6 @@ int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridLis
|
||||
default: pVal = &((int*)valArray)[0]; break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
|
||||
}
|
||||
|
||||
// This is the write stuff
|
||||
if (oldValArray)
|
||||
@ -1791,6 +1793,59 @@ int ColumnOp::writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridLis
|
||||
return rc;
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* MCOL-5021 Read-only version of writeRows() function.
|
||||
* PARAMETERS:
|
||||
* curCol - column information
|
||||
* totalRow - the total number of rows that need to be read
|
||||
* ridList - the vector of row id
|
||||
* oldValArray - the array of old value
|
||||
* RETURN:
|
||||
* NO_ERROR if success, other number otherwise
|
||||
***********************************************************/
|
||||
int ColumnOp::writeRowsReadOnly(Column& curCol, uint64_t totalRow, const RIDList& ridList,
|
||||
void* oldValArray)
|
||||
{
|
||||
uint64_t i = 0, curRowId;
|
||||
int dataFbo, dataBio, curDataFbo = -1;
|
||||
unsigned char dataBuf[BYTE_PER_BLOCK];
|
||||
bool bExit = false;
|
||||
int rc = NO_ERROR;
|
||||
|
||||
while (!bExit)
|
||||
{
|
||||
curRowId = ridList[i];
|
||||
|
||||
calculateRowId(curRowId, BYTE_PER_BLOCK / curCol.colWidth, curCol.colWidth, dataFbo, dataBio);
|
||||
|
||||
// load another data block if necessary
|
||||
if (curDataFbo != dataFbo)
|
||||
{
|
||||
curDataFbo = dataFbo;
|
||||
//@Bug 4849. need to check error code to prevent disk error
|
||||
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo, true);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Read the old value of the record
|
||||
if (oldValArray)
|
||||
{
|
||||
uint8_t* p = static_cast<uint8_t*>(oldValArray);
|
||||
memcpy(p + i * curCol.colWidth, dataBuf + dataBio, curCol.colWidth);
|
||||
}
|
||||
|
||||
i++;
|
||||
|
||||
if (i >= totalRow)
|
||||
bExit = true;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Write rows
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "we_dbrootextenttracker.h"
|
||||
#include "we_tablemetadata.h"
|
||||
#include "../dictionary/we_dctnry.h"
|
||||
#include "stopwatch.h"
|
||||
#if defined(_MSC_VER) && defined(WRITEENGINE_DLLEXPORT)
|
||||
#define EXPORT __declspec(dllexport)
|
||||
#else
|
||||
@ -227,6 +228,16 @@ class ColumnOp : public DbFileOp
|
||||
EXPORT virtual int writeRows(Column& curCol, uint64_t totalRow, const RIDList& ridList,
|
||||
const void* valArray, void* oldValArray = 0, bool bDelete = false);
|
||||
|
||||
/**
|
||||
* @brief MCOL-5021 Read-only version of the writeRows() function above.
|
||||
This function only reads the values from the database file into
|
||||
oldValArray for updating the CP information. As of MCOL-5021, we only
|
||||
delete (i.e. write empty magic values) AUX column rows from the actual
|
||||
database files.
|
||||
*/
|
||||
EXPORT virtual int writeRowsReadOnly(Column& curCol, uint64_t totalRow, const RIDList& ridList,
|
||||
void* oldValArray = 0);
|
||||
|
||||
/**
|
||||
* @brief Write row(s) for update @Bug 1886,2870
|
||||
*/
|
||||
@ -246,7 +257,7 @@ class ColumnOp : public DbFileOp
|
||||
/**
|
||||
* @brief Clear a column
|
||||
*/
|
||||
EXPORT void clearColumn(Column& column) const;
|
||||
EXPORT void clearColumn(Column& column, bool isFlush = true) const;
|
||||
|
||||
/**
|
||||
* @brief open a data file of column
|
||||
@ -276,7 +287,8 @@ class ColumnOp : public DbFileOp
|
||||
/**
|
||||
* @brief populate readBuf with data in block #lbid
|
||||
*/
|
||||
virtual int readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo) = 0;
|
||||
virtual int readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo,
|
||||
bool isReadOnly = false) = 0;
|
||||
|
||||
/**
|
||||
* @brief output writeBuf to pFile starting at position fbo
|
||||
|
@ -95,7 +95,8 @@ int ColumnOpCompress0::blocksInFile(IDBDataFile* pFile) const
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ColumnOpCompress0::readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo)
|
||||
int ColumnOpCompress0::readBlock(IDBDataFile* pFile, unsigned char* readBuf,
|
||||
const uint64_t fbo, bool isReadOnly)
|
||||
{
|
||||
return readDBFile(pFile, readBuf, fbo, true);
|
||||
}
|
||||
@ -159,9 +160,10 @@ int ColumnOpCompress1::blocksInFile(IDBDataFile* pFile) const
|
||||
return m_chunkManager->getBlockCount(pFile);
|
||||
}
|
||||
|
||||
int ColumnOpCompress1::readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo)
|
||||
int ColumnOpCompress1::readBlock(IDBDataFile* pFile, unsigned char* readBuf,
|
||||
const uint64_t fbo, bool isReadOnly)
|
||||
{
|
||||
return m_chunkManager->readBlock(pFile, readBuf, fbo);
|
||||
return m_chunkManager->readBlock(pFile, readBuf, fbo, isReadOnly);
|
||||
}
|
||||
|
||||
int ColumnOpCompress1::saveBlock(IDBDataFile* pFile, const unsigned char* writeBuf, const uint64_t fbo)
|
||||
|
@ -72,7 +72,7 @@ class ColumnOpCompress0 : public ColumnOp
|
||||
/**
|
||||
* @brief virtual method in ColumnOp
|
||||
*/
|
||||
int readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo);
|
||||
int readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo, bool isReadOnly = false);
|
||||
|
||||
/**
|
||||
* @brief virtual method in ColumnOp
|
||||
@ -161,7 +161,7 @@ class ColumnOpCompress1 : public ColumnOp
|
||||
/**
|
||||
* @brief virtual method in ColumnOp
|
||||
*/
|
||||
int readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo);
|
||||
int readBlock(IDBDataFile* pFile, unsigned char* readBuf, const uint64_t fbo, bool isReadOnly = false);
|
||||
|
||||
/**
|
||||
* @brief virtual method in ColumnOp
|
||||
|
@ -4601,31 +4601,9 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, const vector<CSCType
|
||||
// timer.start("markExtentsInvalid");
|
||||
//#endif
|
||||
|
||||
if (m_opType == DELETE && hasAUXCol)
|
||||
{
|
||||
ColStructList colStructListAUX(1, colStructList.back());
|
||||
WriteEngine::CSCTypesList cscColTypeListAUX(1, cscColTypeList.back());
|
||||
ColValueList colValueListAUX(1, colValueList.back());
|
||||
std::vector<ExtCPInfo*> currentExtentRangesPtrsAUX(1, currentExtentRangesPtrs.back());
|
||||
|
||||
rc = writeColumnRecUpdate(txnid, cscColTypeListAUX, colStructListAUX, colValueListAUX, colOldValueList,
|
||||
ridLists[extent], tableOid, true, ridLists[extent].size(),
|
||||
¤tExtentRangesPtrsAUX);
|
||||
|
||||
for (auto& cpInfoPtr : currentExtentRangesPtrs)
|
||||
{
|
||||
if (cpInfoPtr)
|
||||
{
|
||||
cpInfoPtr->toInvalid();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
rc = writeColumnRecUpdate(txnid, cscColTypeList, colStructList, colValueList, colOldValueList,
|
||||
ridLists[extent], tableOid, true, ridLists[extent].size(),
|
||||
¤tExtentRangesPtrs);
|
||||
}
|
||||
rc = writeColumnRecUpdate(txnid, cscColTypeList, colStructList, colValueList, colOldValueList,
|
||||
ridLists[extent], tableOid, true, ridLists[extent].size(),
|
||||
¤tExtentRangesPtrs, hasAUXCol);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
break;
|
||||
@ -5727,7 +5705,8 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
const ColValueList& colValueList, vector<void*>& colOldValueList,
|
||||
const RIDList& ridList, const int32_t tableOid,
|
||||
bool convertStructFlag, ColTupleList::size_type nRows,
|
||||
std::vector<ExtCPInfo*>* cpInfos)
|
||||
std::vector<ExtCPInfo*>* cpInfos,
|
||||
bool hasAUXCol)
|
||||
{
|
||||
bool bExcp;
|
||||
int rc = 0;
|
||||
@ -5753,7 +5732,8 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
std::vector<VBRange> freeList;
|
||||
vector<vector<uint32_t> > fboLists;
|
||||
vector<vector<LBIDRange> > rangeLists;
|
||||
rc = processBeginVBCopy(txnid, colStructList, ridList, freeList, fboLists, rangeLists, rangeListTot);
|
||||
rc = processBeginVBCopy(txnid, ((m_opType == DELETE && hasAUXCol) ? ColStructList(1, colStructList.back()) : colStructList),
|
||||
ridList, freeList, fboLists, rangeLists, rangeListTot);
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
@ -5823,6 +5803,8 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
}
|
||||
|
||||
string segFile;
|
||||
bool isFlush = (m_opType != DELETE || !hasAUXCol || (i == colStructList.size() - 1));
|
||||
|
||||
rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE); // @bug 5572 HDFS tmp file
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
@ -5834,7 +5816,6 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
aFile.oid = curColStruct.dataOid;
|
||||
aFile.partitionNum = curColStruct.fColPartition;
|
||||
aFile.dbRoot = curColStruct.fColDbRoot;
|
||||
;
|
||||
aFile.segmentNum = curColStruct.fColSegment;
|
||||
aFile.compType = curColStruct.fCompressionType;
|
||||
files.push_back(aFile);
|
||||
@ -5846,13 +5827,19 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
|
||||
if (!idbdatafile::IDBPolicy::useHdfs())
|
||||
{
|
||||
if (rangeListTot.size() > 0)
|
||||
if (rangeListTot.size() > 0 &&
|
||||
(m_opType != DELETE || !hasAUXCol || (i == colStructList.size() - 1)))
|
||||
{
|
||||
if (freeList[0].size >= (blocksProcessed + rangeLists[i].size()))
|
||||
ColStructList::size_type j = i;
|
||||
|
||||
if (m_opType == DELETE && hasAUXCol && (i == colStructList.size() - 1))
|
||||
j = 0;
|
||||
|
||||
if (freeList[0].size >= (blocksProcessed + rangeLists[j].size()))
|
||||
{
|
||||
aRange.vbOID = freeList[0].vbOID;
|
||||
aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
|
||||
aRange.size = rangeLists[i].size();
|
||||
aRange.size = rangeLists[j].size();
|
||||
curFreeList.push_back(aRange);
|
||||
}
|
||||
else
|
||||
@ -5867,7 +5854,7 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
{
|
||||
aRange.vbOID = freeList[1].vbOID;
|
||||
aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid;
|
||||
aRange.size = rangeLists[i].size() - blockUsed;
|
||||
aRange.size = rangeLists[j].size() - blockUsed;
|
||||
curFreeList.push_back(aRange);
|
||||
blocksProcessedThisOid += aRange.size;
|
||||
}
|
||||
@ -5878,10 +5865,10 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
}
|
||||
}
|
||||
|
||||
blocksProcessed += rangeLists[i].size();
|
||||
blocksProcessed += rangeLists[j].size();
|
||||
|
||||
rc = BRMWrapper::getInstance()->writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid,
|
||||
curColStruct.dataOid, fboLists[i], rangeLists[i], colOp,
|
||||
curColStruct.dataOid, fboLists[j], rangeLists[j], colOp,
|
||||
curFreeList, curColStruct.fColDbRoot, true);
|
||||
}
|
||||
}
|
||||
@ -5946,7 +5933,10 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
#ifdef PROFILE
|
||||
timer.start("writeRows ");
|
||||
#endif
|
||||
rc = colOp->writeRows(curCol, totalRow, ridList, valArray, oldValArray, true);
|
||||
if (!hasAUXCol || (i == colStructList.size() - 1))
|
||||
rc = colOp->writeRows(curCol, totalRow, ridList, valArray, oldValArray, true);
|
||||
else
|
||||
rc = colOp->writeRowsReadOnly(curCol, totalRow, ridList, oldValArray);
|
||||
#ifdef PROFILE
|
||||
timer.stop("writeRows ");
|
||||
#endif
|
||||
@ -5954,10 +5944,9 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
|
||||
updateMaxMinRange(1, totalRow, cscColTypeList[i], curColStruct.colType,
|
||||
m_opType == DELETE ? NULL : valArray, oldValArray, cpInfo, false);
|
||||
// timer.start("Delete:closefile");
|
||||
colOp->clearColumn(curCol);
|
||||
|
||||
// timer.stop("Delete:closefile");
|
||||
colOp->clearColumn(curCol, isFlush);
|
||||
|
||||
if (valArray != NULL)
|
||||
{
|
||||
free(valArray);
|
||||
@ -5976,18 +5965,16 @@ int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesL
|
||||
|
||||
} // end of for (i = 0)
|
||||
|
||||
// timer.start("Delete:purgePrimProcFdCache");
|
||||
if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
|
||||
cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
|
||||
|
||||
// timer.stop("Delete:purgePrimProcFdCache");
|
||||
if (rangeListTot.size() > 0)
|
||||
BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);
|
||||
|
||||
// timer.stop("Delete:writecolrec");
|
||||
//#ifdef PROFILE
|
||||
// timer.finish();
|
||||
//#endif
|
||||
#ifdef PROFILE
|
||||
timer.finish();
|
||||
#endif
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -722,7 +722,8 @@ class WriteEngineWrapper : public WEObj
|
||||
const ColStructList& colStructList, const ColValueList& colValueList,
|
||||
std::vector<void*>& colOldValueList, const RIDList& ridList,
|
||||
const int32_t tableOid, bool convertStructFlag = true,
|
||||
ColTupleList::size_type nRows = 0, std::vector<ExtCPInfo*>* cpInfos = NULL);
|
||||
ColTupleList::size_type nRows = 0, std::vector<ExtCPInfo*>* cpInfos = NULL,
|
||||
bool hasAUXCol = false);
|
||||
|
||||
// For update column from column to use
|
||||
int writeColumnRecords(const TxnID& txnid, const CSCTypesList& cscColTypeList,
|
||||
|
Reference in New Issue
Block a user