1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-08-01 06:46:55 +03:00

Merge pull request #827 from drrtuy/MCOL-3317

MCOL-3317 Moved fill-next-block from writeRow() into allocRowId.
This commit is contained in:
Roman Nozdrin
2019-08-16 22:42:26 +03:00
committed by GitHub
5 changed files with 75 additions and 126 deletions

View File

@ -289,9 +289,9 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
//get a unique number //get a unique number
VERBOSE_INFO("Removing the SYSTABLE meta data"); VERBOSE_INFO("Removing the SYSTABLE meta data");
//#ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl; cout << fTxnid.id << " Removing the SYSTABLEs meta data" << endl;
//#endif #endif
bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE; bytestream << (ByteStream::byte)WE_SVR_DELETE_SYSTABLE;
bytestream << uniqueId; bytestream << uniqueId;
bytestream << (uint32_t) dropTableStmt.fSessionID; bytestream << (uint32_t) dropTableStmt.fSessionID;
@ -324,9 +324,9 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
try try
{ {
// #ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl; cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSTABLES to pm " << pmNum << endl;
//#endif #endif
//cout << "deleting systable entries with txnid " << txnID.id << endl; //cout << "deleting systable entries with txnid " << txnID.id << endl;
fWEClient->write(bytestream, (uint32_t)pmNum); fWEClient->write(bytestream, (uint32_t)pmNum);
@ -356,18 +356,18 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
} }
catch (runtime_error& ex) //write error catch (runtime_error& ex) //write error
{ {
// #ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got exception" << endl; cout << fTxnid.id << " Drop table got exception" << endl;
// #endif #endif
rc = NETWORK_ERROR; rc = NETWORK_ERROR;
errorMsg = ex.what(); errorMsg = ex.what();
} }
catch (...) catch (...)
{ {
rc = NETWORK_ERROR; rc = NETWORK_ERROR;
//#ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got unknown exception" << endl; cout << fTxnid.id << " Drop table got unknown exception" << endl;
//#endif #endif
} }
if (rc != 0) if (rc != 0)
@ -417,9 +417,9 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
try try
{ {
//#ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl; cout << fTxnid.id << " Drop table sending WE_SVR_DELETE_SYSCOLUMN to pm " << pmNum << endl;
//#endif #endif
fWEClient->write(bytestream, (unsigned)pmNum); fWEClient->write(bytestream, (unsigned)pmNum);
while (1) while (1)
@ -448,18 +448,18 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
} }
catch (runtime_error& ex) //write error catch (runtime_error& ex) //write error
{ {
//#ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got exception" << endl; cout << fTxnid.id << " Drop table got exception" << endl;
//#endif #endif
rc = NETWORK_ERROR; rc = NETWORK_ERROR;
errorMsg = ex.what(); errorMsg = ex.what();
} }
catch (...) catch (...)
{ {
rc = NETWORK_ERROR; rc = NETWORK_ERROR;
// #ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table got unknown exception" << endl; cout << fTxnid.id << " Drop table got unknown exception" << endl;
//#endif #endif
} }
if (rc != 0) if (rc != 0)
@ -612,9 +612,9 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
bytestream << (uint32_t) oidList[i]; bytestream << (uint32_t) oidList[i];
} }
//#ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table removing column files" << endl; cout << fTxnid.id << " Drop table removing column files" << endl;
//#endif #endif
uint32_t msgRecived = 0; uint32_t msgRecived = 0;
try try
@ -686,9 +686,9 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(ddlpackage::Dro
//Flush primProc cache //Flush primProc cache
rc = cacheutils::flushOIDsFromCache( oidList ); rc = cacheutils::flushOIDsFromCache( oidList );
//Delete extents from extent map //Delete extents from extent map
//#ifdef IDB_DDL_DEBUG #ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Drop table deleteOIDs" << endl; cout << fTxnid.id << " Drop table deleteOIDs" << endl;
//#endif #endif
rc = fDbrm->deleteOIDs(oidList); rc = fDbrm->deleteOIDs(oidList);
if (rc != 0) if (rc != 0)

View File

@ -2150,7 +2150,7 @@ int ProcessDDLStatement(string& ddlStatement, string& schema, const string& tabl
if (b == ddlpackageprocessor::DDLPackageProcessor::WARNING) if (b == ddlpackageprocessor::DDLPackageProcessor::WARNING)
{ {
rc = 0; rc = 0;
string errmsg ("Error occured during file deletion. Restart DMLProc or use command tool ddlcleanup to clean up. " ); string errmsg ("Error occured during file deletion. Restart DDLProc or use command tool ddlcleanup to clean up. " );
push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, errmsg.c_str()); push_warning(thd, Sql_condition::WARN_LEVEL_WARN, 9999, errmsg.c_str());
} }

View File

@ -152,6 +152,7 @@ const int ERR_FILE_STAT = ERR_FILEBASE + 16;// Error getting stats o
const int ERR_VB_FILE_NOT_EXIST = ERR_FILEBASE + 17;// Version buffer file not exists const int ERR_VB_FILE_NOT_EXIST = ERR_FILEBASE + 17;// Version buffer file not exists
const int ERR_FILE_FLUSH = ERR_FILEBASE + 18;// Error flushing file const int ERR_FILE_FLUSH = ERR_FILEBASE + 18;// Error flushing file
const int ERR_FILE_GLOBBING = ERR_FILEBASE + 19;// Error globbing a file name const int ERR_FILE_GLOBBING = ERR_FILEBASE + 19;// Error globbing a file name
const int ERR_FILE_EOF = ERR_FILEBASE + 20;// EOF
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// XML level error // XML level error

View File

@ -2603,8 +2603,16 @@ int FileOp::readFile( IDBDataFile* pFile, unsigned char* readBuf,
{ {
if ( pFile != NULL ) if ( pFile != NULL )
{ {
if ( pFile->read( readBuf, readSize ) != readSize ) int bc = pFile->read( readBuf, readSize );
if (bc != readSize)
{
// MCOL-498 EOF if a next block is empty
if (bc == 0)
{
return ERR_FILE_EOF;
}
return ERR_FILE_READ; return ERR_FILE_READ;
}
} }
else else
return ERR_FILE_NULL; return ERR_FILE_NULL;

View File

@ -174,7 +174,23 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
} }
} }
RETURN_ON_ERROR(readBlock(column.dataFile.pFile, buf, hwm)); rc = readBlock(column.dataFile.pFile, buf, hwm);
// MCOL-498 add a block.
// DRRTUY TODO Given there is no more hwm pre-allocated we
// could extend the file once to accomodate all records.
if (rc != NO_ERROR)
{
if (rc == ERR_FILE_EOF)
{
uint64_t emptyVal = getEmptyRowValue(column.colDataType, column.colWidth);
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, column.colWidth);
RETURN_ON_ERROR(saveBlock(column.dataFile.pFile, buf, hwm));
} else
{
return rc;
}
}
for (j = 0; j < totalRowPerBlock; j++) for (j = 0; j < totalRowPerBlock; j++)
{ {
@ -197,8 +213,6 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
if ((rowsallocated == 0) && isFirstBatchPm) if ((rowsallocated == 0) && isFirstBatchPm)
{ {
TableMetaData::removeTableMetaData(tableOid); TableMetaData::removeTableMetaData(tableOid);
//TableMetaData* tableMetaData= TableMetaData::makeTableMetaData(tableOid);
} }
//Check if a new extent is needed //Check if a new extent is needed
@ -335,11 +349,6 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
vector<BRM::LBID_t> lbids; vector<BRM::LBID_t> lbids;
vector<CalpontSystemCatalog::ColDataType> colDataTypes; vector<CalpontSystemCatalog::ColDataType> colDataTypes;
//BRM::CPInfoList_t cpinfoList;
//BRM::CPInfo cpInfo;
//cpInfo.max = numeric_limits<int64_t>::min();
//cpInfo.min = numeric_limits<int64_t>::max();
//cpInfo.seqNum = -1;
for ( i = 0; i < extents.size(); i++) for ( i = 0; i < extents.size(); i++)
{ {
setColParam(newCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType, newColStructList[i].colType, setColParam(newCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType, newColStructList[i].colType,
@ -352,8 +361,6 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
if (rc != NO_ERROR) if (rc != NO_ERROR)
return rc; return rc;
//cpInfo.firstLbid = extents[i].startLbid;
//cpinfoList.push_back(cpInfo);
newColStructList[i].fColPartition = partition; newColStructList[i].fColPartition = partition;
newColStructList[i].fColSegment = segment; newColStructList[i].fColSegment = segment;
newColStructList[i].fColDbRoot = dbRoot; newColStructList[i].fColDbRoot = dbRoot;
@ -365,7 +372,6 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
} }
//mark the extents to updating //mark the extents to updating
//rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes); rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
if (rc != NO_ERROR) if (rc != NO_ERROR)
@ -470,18 +476,22 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
//..Search first block of new extent for empty rows //..Search first block of new extent for empty rows
rc = readBlock(newCol.dataFile.pFile, buf, newHwm); rc = readBlock(newCol.dataFile.pFile, buf, newHwm);
if ( rc != NO_ERROR) // MCOL-498 add a block.
return rc; // DRRTUY TODO Given there is no more hwm pre-allocated we
// could extend the file once to accomodate all records.
// MCOL-498 This must be a first block in a new extent so if (rc != NO_ERROR)
// fill the block up to its boundary with empties. Otherwise
// there could be fantom values.
{ {
uint64_t emptyVal = getEmptyRowValue(column.colDataType, column.colWidth); if (rc == ERR_FILE_EOF)
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, column.colWidth); {
uint64_t emptyVal = getEmptyRowValue(newCol.colDataType, newCol.colWidth);
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, newCol.colWidth);
RETURN_ON_ERROR(saveBlock(newCol.dataFile.pFile, buf, newHwm));
} else
{
return rc;
}
} }
for (j = 0; j < totalRowPerBlock; j++) for (j = 0; j < totalRowPerBlock; j++)
{ {
if (isEmptyRow(buf, j, column)) if (isEmptyRow(buf, j, column))
@ -507,14 +517,27 @@ int ColumnOp::allocRowId(const TxnID& txnid, bool useStartingExtent,
{ {
rc = readBlock(newCol.dataFile.pFile, buf, newHwm); rc = readBlock(newCol.dataFile.pFile, buf, newHwm);
if ( rc != NO_ERROR) // MCOL-498 add a block.
return rc; // DRRTUY TODO Given there is no more hwm pre-allocated we
// could extend the file once to accomodate all records.
if (rc != NO_ERROR)
{
if (rc == ERR_FILE_EOF)
{
uint64_t emptyVal = getEmptyRowValue(newCol.colDataType, newCol.colWidth);
setEmptyBuf(buf, BYTE_PER_BLOCK, emptyVal, newCol.colWidth);
RETURN_ON_ERROR(saveBlock(newCol.dataFile.pFile, buf, newHwm));
} else
{
return rc;
}
}
for (j = 0; j < totalRowPerBlock; j++) for (j = 0; j < totalRowPerBlock; j++)
{ {
if (isEmptyRow(buf, j, column)) if (isEmptyRow(buf, j, column))
{ {
rowIdArray[counter] = getRowId(newHwm, column.colWidth, j); rowIdArray[counter] = getRowId(newHwm, newCol.colWidth, j);
rowsallocated++; rowsallocated++;
rowsLeft++; rowsLeft++;
counter++; counter++;
@ -1542,13 +1565,9 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray,
unsigned char dataBuf[BYTE_PER_BLOCK]; unsigned char dataBuf[BYTE_PER_BLOCK];
bool bExit = false, bDataDirty = false; bool bExit = false, bDataDirty = false;
void* pVal = 0; void* pVal = 0;
// void* pOldVal;
char charTmpBuf[8]; char charTmpBuf[8];
uint64_t emptyVal; uint64_t emptyVal;
int rc = NO_ERROR; int rc = NO_ERROR;
bool fillUpWEmptyVals = false;
bool firstRowInBlock = false;
bool lastRowInBlock = false;
uint16_t rowsInBlock = BYTE_PER_BLOCK / curCol.colWidth; uint16_t rowsInBlock = BYTE_PER_BLOCK / curCol.colWidth;
while (!bExit) while (!bExit)
@ -1568,18 +1587,8 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray,
return rc; return rc;
bDataDirty = false; bDataDirty = false;
// MCOL-498 We got into the next block, so the row is first in that block
// - fill the block up with empty magics.
if ( curDataFbo != -1 && !bDelete )
fillUpWEmptyVals = true;
} }
// MCOL-498 CS hasn't touched any block yet,
// but the row filled will be the first in the block.
firstRowInBlock = ( !(curRowId % (rowsInBlock)) ) ? true : false;
if( firstRowInBlock && !bDelete )
fillUpWEmptyVals = true;
curDataFbo = dataFbo; curDataFbo = dataFbo;
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo); rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
@ -1593,17 +1602,14 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray,
// How about pVal = valArray + i*curCol.colWidth? // How about pVal = valArray + i*curCol.colWidth?
switch (curCol.colType) switch (curCol.colType)
{ {
// case WriteEngine::WR_LONG : pVal = &((long *) valArray)[i]; break;
case WriteEngine::WR_FLOAT : case WriteEngine::WR_FLOAT :
if (!bDelete) pVal = &((float*) valArray)[i]; if (!bDelete) pVal = &((float*) valArray)[i];
//pOldVal = &((float *) oldValArray)[i];
break; break;
case WriteEngine::WR_DOUBLE : case WriteEngine::WR_DOUBLE :
if (!bDelete) pVal = &((double*) valArray)[i]; if (!bDelete) pVal = &((double*) valArray)[i];
//pOldVal = &((double *) oldValArray)[i];
break; break;
case WriteEngine::WR_VARBINARY : // treat same as char for now case WriteEngine::WR_VARBINARY : // treat same as char for now
@ -1615,77 +1621,52 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray,
memcpy(charTmpBuf, (char*)valArray + i * 8, 8); memcpy(charTmpBuf, (char*)valArray + i * 8, 8);
pVal = charTmpBuf; pVal = charTmpBuf;
} }
//pOldVal = (char*)oldValArray + i*8;
break; break;
// case WriteEngine::WR_BIT : pVal = &((bool *) valArray)[i]; break;
case WriteEngine::WR_SHORT : case WriteEngine::WR_SHORT :
if (!bDelete) pVal = &((short*) valArray)[i]; if (!bDelete) pVal = &((short*) valArray)[i];
//pOldVal = &((short *) oldValArray)[i];
break; break;
case WriteEngine::WR_BYTE : case WriteEngine::WR_BYTE :
if (!bDelete) pVal = &((char*) valArray)[i]; if (!bDelete) pVal = &((char*) valArray)[i];
//pOldVal = &((char *) oldValArray)[i];
break; break;
case WriteEngine::WR_LONGLONG: case WriteEngine::WR_LONGLONG:
if (!bDelete) pVal = &((long long*) valArray)[i]; if (!bDelete) pVal = &((long long*) valArray)[i];
//pOldVal = &((long long *) oldValArray)[i];
break; break;
case WriteEngine::WR_TOKEN: case WriteEngine::WR_TOKEN:
if (!bDelete) pVal = &((Token*) valArray)[i]; if (!bDelete) pVal = &((Token*) valArray)[i];
//pOldVal = &((Token *) oldValArray)[i];
break; break;
case WriteEngine::WR_INT : case WriteEngine::WR_INT :
case WriteEngine::WR_MEDINT : case WriteEngine::WR_MEDINT :
if (!bDelete) pVal = &((int*) valArray)[i]; if (!bDelete) pVal = &((int*) valArray)[i];
//pOldVal = &((int *) oldValArray)[i];
break; break;
case WriteEngine::WR_USHORT: case WriteEngine::WR_USHORT:
if (!bDelete) pVal = &((uint16_t*) valArray)[i]; if (!bDelete) pVal = &((uint16_t*) valArray)[i];
//pOldVal = &((uint16_t *) oldValArray)[i];
break; break;
case WriteEngine::WR_UBYTE : case WriteEngine::WR_UBYTE :
if (!bDelete) pVal = &((uint8_t*) valArray)[i]; if (!bDelete) pVal = &((uint8_t*) valArray)[i];
//pOldVal = &((uint8_t *) oldValArray)[i];
break; break;
case WriteEngine::WR_UINT : case WriteEngine::WR_UINT :
case WriteEngine::WR_UMEDINT : case WriteEngine::WR_UMEDINT :
if (!bDelete) pVal = &((uint32_t*) valArray)[i]; if (!bDelete) pVal = &((uint32_t*) valArray)[i];
//pOldVal = &((uint8_t *) oldValArray)[i];
break; break;
case WriteEngine::WR_ULONGLONG: case WriteEngine::WR_ULONGLONG:
if (!bDelete) pVal = &((uint64_t*) valArray)[i]; if (!bDelete) pVal = &((uint64_t*) valArray)[i];
//pOldVal = &((uint64_t *) oldValArray)[i];
break; break;
default : default :
if (!bDelete) pVal = &((int*) valArray)[i]; if (!bDelete) pVal = &((int*) valArray)[i];
//pOldVal = &((int *) oldValArray)[i];
break; break;
} }
// This is the stuff to retrieve old value
//memcpy(pOldVal, dataBuf + dataBio, curCol.colWidth);
if (bDelete) if (bDelete)
{ {
emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth); emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
@ -1704,52 +1685,11 @@ int ColumnOp::writeRow(Column& curCol, uint64_t totalRow, const RID* rowIdArray,
// take care of the cleanup // take care of the cleanup
if (bDataDirty && curDataFbo >= 0) if (bDataDirty && curDataFbo >= 0)
{ {
if ( fillUpWEmptyVals )
{
emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
int writeSize = BYTE_PER_BLOCK - ( dataBio + curCol.colWidth );
// MCOL-498 Add the check though this is unlikely at the moment of writing.
if ( writeSize )
setEmptyBuf( dataBuf + dataBio + curCol.colWidth, writeSize,
emptyVal, curCol.colWidth );
}
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo); rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
if ( rc != NO_ERROR) if ( rc != NO_ERROR)
return rc; return rc;
// MCOL-498 If it was the last row in a block fill the next block with
// empty vals, otherwise next ColumnOp::allocRowId()
// will fail on the next block.
lastRowInBlock = ( rowsInBlock - ( curRowId % rowsInBlock ) == 1 ) ? true : false;
if ( lastRowInBlock )
{
if( !fillUpWEmptyVals )
emptyVal = getEmptyRowValue(curCol.colDataType, curCol.colWidth);
// MCOL-498 Skip if this is the last block in an extent.
if ( curDataFbo % MAX_NBLOCKS != MAX_NBLOCKS - 1 )
{
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
if ( rc != NO_ERROR)
return rc;
curDataFbo += 1;
rc = readBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
if ( rc != NO_ERROR)
return rc;
unsigned char zeroSubBlock[BYTE_PER_SUBBLOCK];
std::memset(zeroSubBlock, 0, BYTE_PER_SUBBLOCK);
// The first subblock is made of 0 - fill the block with empty vals.
if ( !std::memcmp(dataBuf, zeroSubBlock, BYTE_PER_SUBBLOCK) )
{
setEmptyBuf(dataBuf, BYTE_PER_BLOCK, emptyVal, curCol.colWidth);
rc = saveBlock(curCol.dataFile.pFile, dataBuf, curDataFbo);
}
}
}
} }
return rc; return rc;
} }