You've already forked mariadb-columnstore-engine
mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-06-12 05:01:56 +03:00
Refactor better extent info bookkeeping structure and handling
Logs for research purposes Keep progress - may not build Good interface to collect LBIDs and CPInfo's Write Engine compiles with new interface New interface breaks things the least way and allows for new features to be added gradually. Still ironing design - rewriting parts of WE Keep progress commit Write Engine compiles, going to test I could introduce crashes there. Let's see. Disable logging for tests Fixing build problems - keep progress commit Changed related to new interface Add back accidentally removed m_txnLBIDMap.find Remove printf/cout; up-to-date comment for AddLBIDtoList Add "auto" type annotation Work on PR comments Descriptive vector emptines check
This commit is contained in:
@ -69,6 +69,8 @@ namespace WriteEngine
|
||||
{
|
||||
StopWatch timer;
|
||||
|
||||
static BRM::CPInfo dummyCPInfo;
|
||||
|
||||
/**@brief WriteEngineWrapper Constructor
|
||||
*/
|
||||
WriteEngineWrapper::WriteEngineWrapper() : m_opType(NOOP)
|
||||
@ -918,6 +920,25 @@ void WriteEngineWrapper::flushVMCache() const
|
||||
|
||||
}
|
||||
|
||||
#if 0
|
||||
// XXX temporary for debugging purposes
|
||||
static void log_this(const char *message,
|
||||
logging::LOG_TYPE log_type, unsigned sid)
|
||||
{
|
||||
// corresponds with dbcon in SubsystemID vector
|
||||
// in messagelog.cpp
|
||||
unsigned int subSystemId = 24;
|
||||
logging::LoggingID logid( subSystemId, sid, 0);
|
||||
logging::Message::Args args1;
|
||||
logging::Message msg(1);
|
||||
args1.add(message);
|
||||
msg.format( args1 );
|
||||
Logger logger(logid.fSubsysID);
|
||||
logger.logMessage(log_type, msg, logid);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/*@insertColumnRecs - Insert value(s) into a column
|
||||
*/
|
||||
/***********************************************************
|
||||
@ -981,7 +1002,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
// debug information for testing
|
||||
if (isDebug(DEBUG_2))
|
||||
{
|
||||
printf("\nIn wrapper insert\n");
|
||||
printInputValue(colStructList, colValueList, ridList);
|
||||
}
|
||||
|
||||
@ -1395,7 +1415,7 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
|
||||
//cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -1644,8 +1664,6 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
//----------------------------------------------------------------------
|
||||
//Mark extents invalid
|
||||
//----------------------------------------------------------------------
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
bool successFlag = true;
|
||||
unsigned width = 0;
|
||||
int curFbo = 0, curBio, lastFbo = -1;
|
||||
@ -1665,17 +1683,15 @@ int WriteEngineWrapper::insertColumnRecs(const TxnID& txnid,
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
colStructList[i],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (lbids.size() > 0)
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
markTxnExtentsAsInvalid(txnid);
|
||||
|
||||
//----------------------------------------------------------------------
|
||||
// Write row(s) to database file(s)
|
||||
@ -2166,7 +2182,6 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
|
||||
if (rc != NO_ERROR)
|
||||
{
|
||||
cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -2411,8 +2426,6 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
//----------------------------------------------------------------------
|
||||
//Mark extents invalid
|
||||
//----------------------------------------------------------------------
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
bool successFlag = true;
|
||||
unsigned width = 0;
|
||||
int curFbo = 0, curBio, lastFbo = -1;
|
||||
@ -2432,10 +2445,9 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
colStructList[i],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -2455,18 +2467,16 @@ int WriteEngineWrapper::insertColumnRecsBinary(const TxnID& txnid,
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
newColStructList[i],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
else
|
||||
return ERR_INVALID_PARAM;
|
||||
}
|
||||
|
||||
if (lbids.size() > 0)
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
markTxnExtentsAsInvalid(txnid);
|
||||
|
||||
//----------------------------------------------------------------------
|
||||
// Write row(s) to database file(s)
|
||||
@ -2514,7 +2524,6 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
|
||||
// debug information for testing
|
||||
if (isDebug(DEBUG_2))
|
||||
{
|
||||
printf("\nIn wrapper insert\n");
|
||||
printInputValue(colStructList, colValueList, ridList);
|
||||
}
|
||||
|
||||
@ -3001,11 +3010,8 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
|
||||
}
|
||||
|
||||
//Mark extents invalid
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
bool successFlag = true;
|
||||
unsigned width = 0;
|
||||
BRM::LBID_t lbid;
|
||||
int curFbo = 0, curBio, lastFbo = -1;
|
||||
|
||||
if (totalRow - rowsLeft > 0)
|
||||
@ -3020,11 +3026,10 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
|
||||
{
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(
|
||||
colStructList[i].dataOid, colStructList[i].fColPartition,
|
||||
colStructList[i].fColSegment, curFbo, lbid));
|
||||
lbids.push_back((BRM::LBID_t)lbid);
|
||||
colDataTypes.push_back(colStructList[i].colDataType);
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
colStructList[i],
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3043,17 +3048,15 @@ int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid,
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
newColStructList[i],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//cout << "lbids size = " << lbids.size()<< endl;
|
||||
if (lbids.size() > 0)
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
markTxnExtentsAsInvalid(txnid);
|
||||
|
||||
if (rc == NO_ERROR)
|
||||
{
|
||||
@ -3178,7 +3181,6 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid,
|
||||
// debug information for testing
|
||||
if (isDebug(DEBUG_2))
|
||||
{
|
||||
printf("\nIn wrapper insert\n");
|
||||
printInputValue(colStructList, colValueList, ridList);
|
||||
}
|
||||
|
||||
@ -3264,7 +3266,6 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid,
|
||||
|
||||
bool newFile;
|
||||
// WIP
|
||||
cout << "Datafile " << curCol.dataFile.fSegFileName << endl;
|
||||
#ifdef PROFILE
|
||||
timer.start("allocRowId");
|
||||
#endif
|
||||
@ -3709,16 +3710,14 @@ int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid,
|
||||
{
|
||||
colDataTypes.push_back(colStructList[i].colDataType);
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
colStructList[i],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (lbids.size() > 0)
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
markTxnExtentsAsInvalid(txnid);
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// Write row(s) to database file(s)
|
||||
@ -4224,8 +4223,6 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid,
|
||||
|
||||
RIDList::iterator rid_iter;
|
||||
//Mark extents invalid
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
bool successFlag = true;
|
||||
unsigned width = 0;
|
||||
int curFbo = 0, curBio, lastFbo = -1;
|
||||
@ -4247,10 +4244,9 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid,
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
colStructList[j],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4258,8 +4254,7 @@ int WriteEngineWrapper::updateColumnRec(const TxnID& txnid,
|
||||
//#ifdef PROFILE
|
||||
//timer.start("markExtentsInvalid");
|
||||
//#endif
|
||||
if (lbids.size() > 0)
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
markTxnExtentsAsInvalid(txnid);
|
||||
|
||||
if (m_opType != DELETE)
|
||||
m_opType = UPDATE;
|
||||
@ -4284,8 +4279,6 @@ int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid,
|
||||
const int32_t tableOid)
|
||||
{
|
||||
//Mark extents invalid
|
||||
vector<BRM::LBID_t> lbids;
|
||||
vector<CalpontSystemCatalog::ColDataType> colDataTypes;
|
||||
ColumnOp* colOp = NULL;
|
||||
bool successFlag = true;
|
||||
unsigned width = 0;
|
||||
@ -4308,18 +4301,14 @@ int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid,
|
||||
if (curFbo != lastFbo)
|
||||
{
|
||||
RETURN_ON_ERROR(AddLBIDtoList(txnid,
|
||||
lbids,
|
||||
colDataTypes,
|
||||
colExtentsStruct[j],
|
||||
curFbo));
|
||||
curFbo,
|
||||
dummyCPInfo));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (lbids.size() > 0)
|
||||
{
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(lbids, colDataTypes);
|
||||
}
|
||||
markTxnExtentsAsInvalid(txnid);
|
||||
|
||||
if (m_opType != DELETE)
|
||||
m_opType = UPDATE;
|
||||
@ -5700,7 +5689,7 @@ int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId)
|
||||
if (prefix.length() == 0)
|
||||
{
|
||||
cerr << "Need a valid DBRMRoot entry in Calpont configuation file";
|
||||
return -1;
|
||||
return ERR_INVALID_PARAM;
|
||||
}
|
||||
|
||||
uint64_t pos = prefix.find_last_of ("/") ;
|
||||
@ -5715,7 +5704,7 @@ int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId)
|
||||
logging::Message::Args args;
|
||||
args.add("RollbackTran cannot find the dbrm directory for the DML log file");
|
||||
SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0007);
|
||||
return -1;
|
||||
return ERR_OPEN_DML_LOG;
|
||||
|
||||
}
|
||||
|
||||
@ -6089,22 +6078,18 @@ void WriteEngineWrapper::AddDictToList(const TxnID txnid,
|
||||
* lbids. This is an optimization to prevent invalidating the same
|
||||
* extents over and over.
|
||||
* PARAMETERS:
|
||||
* txnid - the lbid list is per txn. We use this to keep transactions
|
||||
* seperated.
|
||||
* lbids - the current list of lbids. We add to this list
|
||||
* if the discovered lbid is in a new extent.
|
||||
* These next are needed for dbrm to get the lbid
|
||||
* oid -the table oid.
|
||||
* colPartition - the table column partition
|
||||
* segment - table segment
|
||||
* fbo - file block offset
|
||||
* txnid - the lbid list is per txn. We use this to keep transactions
|
||||
* separated.
|
||||
* These next are needed for dbrm to get the lbid:
|
||||
* colStruct - reference to ColStruct structure of column we record (provides segment and type).
|
||||
* fbo - file block offset
|
||||
* cpInfo - a CPInfo to collect if we need that.
|
||||
* RETURN: 0 => OK. -1 => error
|
||||
***********************************************************/
|
||||
int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid,
|
||||
std::vector<BRM::LBID_t>& lbids,
|
||||
std::vector<CalpontSystemCatalog::ColDataType>& colDataTypes,
|
||||
int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid,
|
||||
const ColStruct& colStruct,
|
||||
const int fbo)
|
||||
const int fbo,
|
||||
const BRM::CPInfo& cpInfo)
|
||||
{
|
||||
int rtn = 0;
|
||||
|
||||
@ -6135,45 +6120,7 @@ int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid,
|
||||
if (rtn != 0)
|
||||
return -1;
|
||||
|
||||
if (spTxnLBIDRec->m_LBIDMap.find(startingLBID) == spTxnLBIDRec->m_LBIDMap.end())
|
||||
{
|
||||
// Not found in the map. This must be a new extent. Add it to the list.
|
||||
// cout << "Adding lbid " << startingLBID << " to txn " << txnid << endl;
|
||||
spTxnLBIDRec->AddLBID(startingLBID);
|
||||
lbids.push_back((BRM::LBID_t)startingLBID);
|
||||
colDataTypes.push_back(colStruct.colDataType);
|
||||
}
|
||||
else
|
||||
{
|
||||
++spTxnLBIDRec->m_squashedLbids;
|
||||
}
|
||||
|
||||
// If the starting LBID list has grown to more than 2000, truncate.
|
||||
// This is the purpose of the seqnum. If spTxnLBIDRec->m_lastSeqnum
|
||||
// is divisible by 1000 and size() > 1000, get rid of everything older
|
||||
// than the last 1000 entries. This is to save memory in large
|
||||
// transactions. We assume older extents are unlikely to be hit again.
|
||||
if (spTxnLBIDRec->m_lastSeqnum % 1000 == 0
|
||||
&& spTxnLBIDRec->m_LBIDMap.size() > 1000)
|
||||
{
|
||||
// cout << "Trimming the LBID list for " << txnid << ". LBID count is " << spTxnLBIDRec->m_LBIDMap.size() << endl;
|
||||
uint32_t firstDrop = spTxnLBIDRec->m_lastSeqnum - 1000;
|
||||
std::tr1::unordered_map<BRM::LBID_t, uint32_t>::iterator iter;
|
||||
|
||||
for (iter = spTxnLBIDRec->m_LBIDMap.begin(); iter != spTxnLBIDRec->m_LBIDMap.end();)
|
||||
{
|
||||
if ((*iter).second < firstDrop)
|
||||
{
|
||||
iter = spTxnLBIDRec->m_LBIDMap.erase(iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
|
||||
// cout << "LBID count is now" << spTxnLBIDRec->m_LBIDMap.size() << endl;
|
||||
}
|
||||
spTxnLBIDRec->AddLBID(startingLBID, colStruct.colDataType);
|
||||
|
||||
return rtn;
|
||||
}
|
||||
@ -6190,6 +6137,26 @@ void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid)
|
||||
}
|
||||
}
|
||||
|
||||
int WriteEngineWrapper::markTxnExtentsAsInvalid(const TxnID txnid, bool erase)
|
||||
{
|
||||
int rc = 0;
|
||||
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter = m_txnLBIDMap.find(txnid);
|
||||
if (mapIter != m_txnLBIDMap.end())
|
||||
{
|
||||
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
|
||||
if (!spTxnLBIDRec->m_LBIDs.empty())
|
||||
{
|
||||
rc = BRMWrapper::getInstance()->markExtentsInvalid(spTxnLBIDRec->m_LBIDs, spTxnLBIDRec->m_ColDataTypes);
|
||||
}
|
||||
|
||||
if (erase)
|
||||
{
|
||||
m_txnLBIDMap.erase(txnid); // spTxnLBIDRec is auto-destroyed
|
||||
}
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/***********************************************************
|
||||
* DESCRIPTION:
|
||||
* Remove a transaction LBID list from the LBID map
|
||||
@ -6197,35 +6164,11 @@ void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid)
|
||||
* PARAMETERS:
|
||||
* txnid - the transaction to remove.
|
||||
* RETURN:
|
||||
* 0 => success or not found, -1 => error
|
||||
* error code
|
||||
***********************************************************/
|
||||
int WriteEngineWrapper::RemoveTxnFromLBIDMap(const TxnID txnid)
|
||||
{
|
||||
int rtn = 0;
|
||||
std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;
|
||||
|
||||
// Find the set of extent starting LBIDs for this transaction. If not found, then create it.
|
||||
try
|
||||
{
|
||||
mapIter = m_txnLBIDMap.find(txnid);
|
||||
|
||||
if (mapIter != m_txnLBIDMap.end())
|
||||
{
|
||||
SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
|
||||
// Debug
|
||||
// cout << "Remove transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() << endl;
|
||||
// cout << " count = " << spTxnLBIDRec->m_LBIDMap.size() <<
|
||||
// ", lastSeqnum = " << spTxnLBIDRec->m_lastSeqnum <<
|
||||
// ", squashed lbids = " << spTxnLBIDRec->m_squashedLbids << endl;
|
||||
m_txnLBIDMap.erase(txnid); // spTxnLBIDRec is auto-destroyed
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
rtn = -1;
|
||||
}
|
||||
|
||||
return rtn;
|
||||
return markTxnExtentsAsInvalid(txnid, true);
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user