1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

Merge pull request #2406 from tntnatbry/MCOL-5021-dev

MCOL-5021 AUX column implementation to improve DELETE performance.
This commit is contained in:
Roman Nozdrin
2022-08-15 19:03:42 +03:00
committed by GitHub
54 changed files with 40509 additions and 39064 deletions

View File

@ -267,12 +267,16 @@ keepGoing:
numDictCols++;
}
fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols +
1); // include column, oids,dictionary oids and tableoid
// Include column oids, dictionary oids, tableoid, and
// also include AUX oid as of MCOL-5021
fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 2);
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl;
#endif
uint32_t numColumnOids = numColumns + numDictCols;
numColumnOids += 1; // MCOL-5021
if (fStartingColOID < 0)
{
result.result = CREATE_ERROR;
@ -295,6 +299,7 @@ keepGoing:
bytestream << (uint32_t)createTableStmt.fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << (uint32_t)fStartingColOID;
bytestream << (uint32_t)(fStartingColOID + numColumnOids);
bytestream << (uint32_t)createTableStmt.fTableWithAutoi;
uint16_t dbRoot;
BRM::OID_t sysOid = 1001;
@ -537,7 +542,7 @@ keepGoing:
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
bytestream << (numColumns + numDictCols);
bytestream << numColumnOids;
unsigned colNum = 0;
unsigned dictNum = 0;
@ -601,6 +606,13 @@ keepGoing:
++iter;
}
bytestream << (fStartingColOID + numColumnOids);
bytestream << (uint8_t)execplan::AUX_COL_DATATYPE;
bytestream << (uint8_t) false;
bytestream << (uint32_t)execplan::AUX_COL_WIDTH;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)execplan::AUX_COL_COMPRESSION_TYPE;
//@Bug 4176. save oids to a log file for cleanup after fail over.
std::vector<CalpontSystemCatalog::OID> oidList;
@ -616,6 +628,9 @@ keepGoing:
oidList.push_back(fStartingColOID + numColumns + i + 1);
}
// MCOL-5021
oidList.push_back(fStartingColOID + numColumnOids);
try
{
createWriteDropLogFile(fStartingColOID, uniqueId, oidList);
@ -683,9 +698,9 @@ keepGoing:
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)(numColumns + numDictCols);
bytestream << (uint32_t)numColumnOids;
for (unsigned i = 0; i < (numColumns + numDictCols); i++)
for (unsigned i = 0; i < numColumnOids; i++)
{
bytestream << (uint32_t)(fStartingColOID + i + 1);
}

View File

@ -562,8 +562,19 @@ void DDLPackageProcessor::createFiles(CalpontSystemCatalog::TableName aTableName
{
SUMMARY_INFO("DDLPackageProcessor::createFiles");
boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr =
CalpontSystemCatalog::makeCalpontSystemCatalog(1);
CalpontSystemCatalog::makeCalpontSystemCatalog(1);
CalpontSystemCatalog::RIDList ridList = systemCatalogPtr->columnRIDs(aTableName);
CalpontSystemCatalog::OID tableAUXColOid =
systemCatalogPtr->tableAUXColumnOID(aTableName);
if (tableAUXColOid > 3000)
{
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = 0;
auxRoPair.objnum = tableAUXColOid;
ridList.push_back(auxRoPair);
}
fWEClient->addQueue(uniqueId);
CalpontSystemCatalog::ColType colType;
ByteStream bytestream;

View File

@ -70,6 +70,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage(
}
std::vector<CalpontSystemCatalog::OID> oidList;
CalpontSystemCatalog::OID tableAuxColOid;
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
execplan::CalpontSystemCatalog::ROPair roPair;
@ -231,6 +232,7 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage(
userTableName.table = dropPartitionStmt.fTableName->fName;
tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
@ -241,6 +243,11 @@ DropPartitionProcessor::DDLResult DropPartitionProcessor::processPackage(
oidList.push_back(tableColRidList[i].objnum);
}
if (tableAuxColOid > 3000)
{
oidList.push_back(tableAuxColOid);
}
for (unsigned i = 0; i < dictOIDList.size(); i++)
{
if (dictOIDList[i].dictOID > 3000)

View File

@ -89,6 +89,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
execplan::CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::OID tableAUXColOid;
std::string errorMsg;
ByteStream bytestream;
uint64_t uniqueId = 0;
@ -145,6 +146,15 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(
try
{
roPair = systemCatalogPtr->tableRID(tableName);
if (tableName.schema.compare(execplan::CALPONT_SCHEMA) == 0)
{
tableAUXColOid = 0;
}
else
{
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
}
}
catch (IDBExcept& ie)
{
@ -580,6 +590,18 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(
return result;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
oidList.push_back(tableAUXColOid);
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = 0;
auxRoPair.objnum = tableAUXColOid;
tableColRidList.push_back(auxRoPair);
}
// Save the oids to a file
try
{
@ -765,6 +787,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage(
std::vector<CalpontSystemCatalog::OID> columnOidList;
std::vector<CalpontSystemCatalog::OID> allOidList;
CalpontSystemCatalog::OID tableAuxColOid;
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
execplan::CalpontSystemCatalog::ROPair roPair;
@ -905,6 +928,7 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage(
userTableName.table = truncTableStmt.fTableName->fName;
tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
@ -917,6 +941,12 @@ TruncTableProcessor::DDLResult TruncTableProcessor::processPackage(
}
}
if (tableAuxColOid > 3000)
{
columnOidList.push_back(tableAuxColOid);
allOidList.push_back(tableAuxColOid);
}
for (unsigned i = 0; i < dictOIDList.size(); i++)
{
if (dictOIDList[i].dictOID > 3000)

View File

@ -64,6 +64,7 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage(
}
std::vector<CalpontSystemCatalog::OID> oidList;
CalpontSystemCatalog::OID tableAuxColOid;
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
std::string processName("DDLProc");
@ -193,6 +194,7 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage(
userTableName.table = markPartitionStmt.fTableName->fName;
tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
@ -203,6 +205,11 @@ MarkPartitionProcessor::DDLResult MarkPartitionProcessor::processPackage(
oidList.push_back(tableColRidList[i].objnum);
}
if (tableAuxColOid > 3000)
{
oidList.push_back(tableAuxColOid);
}
for (unsigned i = 0; i < dictOIDList.size(); i++)
{
if (dictOIDList[i].dictOID > 3000)

View File

@ -64,6 +64,7 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage(
}
std::vector<CalpontSystemCatalog::OID> oidList;
CalpontSystemCatalog::OID tableAuxColOid;
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
std::string processName("DDLProc");
@ -195,6 +196,7 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage(
userTableName.table = restorePartitionStmt.fTableName->fName;
tableColRidList = systemCatalogPtr->columnRIDs(userTableName);
tableAuxColOid = systemCatalogPtr->tableAUXColumnOID(userTableName);
dictOIDList = systemCatalogPtr->dictOIDs(userTableName);
@ -205,6 +207,11 @@ RestorePartitionProcessor::DDLResult RestorePartitionProcessor::processPackage(
oidList.push_back(tableColRidList[i].objnum);
}
if (tableAuxColOid > 3000)
{
oidList.push_back(tableAuxColOid);
}
for (unsigned i = 0; i < dictOIDList.size(); i++)
{
if (dictOIDList[i].dictOID > 3000)

View File

@ -374,6 +374,9 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage(
if (weRc == 0)
{
// MCOL-5021
fDbrm->addToLBIDList(fSessionID, lbidList);
//@Bug 4560 invalidate cp first as bulkrollback will truncate the newly added lbids.
fDbrm->invalidateUncommittedExtentLBIDs(0, true, &lbidList);
cpInvalidated = true;
@ -433,6 +436,12 @@ DMLPackageProcessor::DMLResult CommandPackageProcessor::processPackage(
if (!cpInvalidated)
{
// MCOL-5021
if (stmt == "ROLLBACK")
{
fDbrm->addToLBIDList(fSessionID, lbidList);
}
fDbrm->invalidateUncommittedExtentLBIDs(0, stmt == "ROLLBACK", &lbidList);
}
}

View File

@ -1241,6 +1241,14 @@ const CalpontSystemCatalog::ColType CalpontSystemCatalog::colType(const OID& Oid
ct.columnOID = Oid;
}
if ((sysDataList.size() == 0) && isAUXColumnOID(Oid) >= 3000)
{
ct.colDataType = execplan::AUX_COL_DATATYPE;
ct.colWidth = execplan::AUX_COL_WIDTH;
ct.compressionType = execplan::AUX_COL_COMPRESSION_TYPE;
ct.columnOID = Oid;
}
// populate colinfomap cache and oidbitmap
lk3.lock();
boost::mutex::scoped_lock lk2(fOIDmapLock);
@ -3546,13 +3554,13 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::tableRID(const TableNam
oss << "select objectid from systable where schema='" << aTableName.schema << "' and tablename='"
<< aTableName.table << "' --tableRID/";
csep.data(oss.str()); //@bug 6078. Log the statement
if (fIdentity == EC)
oss << "EC";
else
oss << "FE";
csep.data(oss.str()); //@bug 6078. Log the statement
NJLSysDataList sysDataList;
try
@ -3607,6 +3615,243 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::tableRID(const TableNam
throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args);
}
// This function is similar to CalpontSystemCatalog::tableRID, except that
// instead of returning a ROPair for the table, it returns the OID for the
// AUX column for the table
CalpontSystemCatalog::OID CalpontSystemCatalog::tableAUXColumnOID(const TableName& tableName,
int lower_case_table_names)
{
TableName aTableName;
aTableName.schema = tableName.schema;
aTableName.table = tableName.table;
if (lower_case_table_names)
{
boost::algorithm::to_lower(aTableName.schema);
boost::algorithm::to_lower(aTableName.table);
}
if (aTableName.schema.compare(CALPONT_SCHEMA) == 0)
{
std::ostringstream oss;
oss << "tableAUXColumnOID() cannot be called on a ";
oss << CALPONT_SCHEMA << " schema table";
throw runtime_error(oss.str());
}
DEBUG << "Enter tableAUXColumnOID: " << tableName.schema << "|" << tableName.table << endl;
// look up the OID in the cache first
OID auxColOid;
checkSysCatVer();
boost::mutex::scoped_lock lk1(fTableAUXColumnOIDMapLock);
TableOIDmap::const_iterator iter = fTableAUXColumnOIDMap.find(aTableName);
if (iter != fTableAUXColumnOIDMap.end())
{
auxColOid = iter->second;
return auxColOid;
}
lk1.unlock();
// select auxcolumnoid from systable where schema = tableName.schema and tablename = tableName.table;
CalpontSelectExecutionPlan csep;
CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
CalpontSelectExecutionPlan::ColumnMap colMap;
static const std::string sysCatSchemaTablePrefix =
CALPONT_SCHEMA + "." + SYSTABLE_TABLE + ".";
SimpleColumn* c1 =
new SimpleColumn(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, fSessionID);
SimpleColumn* c2 =
new SimpleColumn(sysCatSchemaTablePrefix + SCHEMA_COL, fSessionID);
SimpleColumn* c3 =
new SimpleColumn(sysCatSchemaTablePrefix + TABLENAME_COL, fSessionID);
SRCP srcp;
srcp.reset(c1);
colMap.insert(CMVT_(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, srcp));
srcp.reset(c2);
colMap.insert(CMVT_(sysCatSchemaTablePrefix + SCHEMA_COL, srcp));
srcp.reset(c3);
colMap.insert(CMVT_(sysCatSchemaTablePrefix + TABLENAME_COL, srcp));
csep.columnMapNonStatic(colMap);
srcp.reset(c1->clone());
returnedColumnList.push_back(srcp);
csep.returnedCols(returnedColumnList);
OID oid = c1->oid();
// Filters
SimpleFilter* f1 =
new SimpleFilter(opeq, c2->clone(), new ConstantColumn(aTableName.schema, ConstantColumn::LITERAL));
filterTokenList.push_back(f1);
filterTokenList.push_back(new Operator("and"));
SimpleFilter* f2 =
new SimpleFilter(opeq, c3->clone(), new ConstantColumn(aTableName.table, ConstantColumn::LITERAL));
filterTokenList.push_back(f2);
csep.filterTokenList(filterTokenList);
ostringstream oss;
oss << "select auxcolumnoid from systable where schema='" << aTableName.schema << "' and tablename='"
<< aTableName.table << "' --tableAUXColumnOID/";
if (fIdentity == EC)
oss << "EC";
else
oss << "FE";
csep.data(oss.str()); //@bug 6078. Log the statement
NJLSysDataList sysDataList;
try
{
getSysData(csep, sysDataList, SYSTABLE_TABLE);
}
catch (IDBExcept&)
{
throw;
}
catch (runtime_error& e)
{
throw runtime_error(e.what());
}
vector<ColumnResult*>::const_iterator it;
for (it = sysDataList.begin(); it != sysDataList.end(); it++)
{
if ((*it)->dataCount() == 0)
{
Message::Args args;
args.add("'" + tableName.schema + "." + tableName.table + "'");
// throw logging::NoTableExcept(msg);
throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args);
}
if ((*it)->ColumnOID() == oid)
{
auxColOid = (OID)((*it)->GetData(0));
// populate cache
lk1.lock();
fTableAUXColumnOIDMap[aTableName] = auxColOid;
return auxColOid;
}
}
Message::Args args;
args.add("'" + tableName.schema + "." + tableName.table + "'");
// throw logging::NoTableExcept(msg);
throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args);
}
CalpontSystemCatalog::OID CalpontSystemCatalog::isAUXColumnOID(const OID& oid)
{
DEBUG << "Enter isAUXColumnOID" << endl;
checkSysCatVer();
boost::mutex::scoped_lock lk1(fAUXColumnOIDToTableOIDMapLock);
AUXColumnOIDTableOIDmap::const_iterator iter = fAUXColumnOIDToTableOIDMap.find(oid);
if (iter != fAUXColumnOIDToTableOIDMap.end())
{
return iter->second;
}
lk1.unlock();
// select objectid from systable where auxcolumnoid = oid;
CalpontSelectExecutionPlan csep;
CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
CalpontSelectExecutionPlan::ColumnMap colMap;
static const std::string sysCatSchemaTablePrefix =
CALPONT_SCHEMA + "." + SYSTABLE_TABLE + ".";
SimpleColumn* c1 =
new SimpleColumn(sysCatSchemaTablePrefix + OBJECTID_COL, fSessionID);
SimpleColumn* c2 =
new SimpleColumn(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, fSessionID);
SRCP srcp;
srcp.reset(c1);
colMap.insert(CMVT_(sysCatSchemaTablePrefix + OBJECTID_COL, srcp));
srcp.reset(c2);
colMap.insert(CMVT_(sysCatSchemaTablePrefix + AUXCOLUMNOID_COL, srcp));
csep.columnMapNonStatic(colMap);
srcp.reset(c1->clone());
returnedColumnList.push_back(srcp);
csep.returnedCols(returnedColumnList);
CalpontSystemCatalog::OID systableOid = c1->oid();
// Filters
SimpleFilter* f1 =
new SimpleFilter(opeq, c2->clone(), new ConstantColumn((int64_t)oid, ConstantColumn::NUM));
filterTokenList.push_back(f1);
csep.filterTokenList(filterTokenList);
ostringstream oss;
oss << "select objectid from systable where auxcolumnoid='" << oid
<< "' --isAUXColumnOID/";
if (fIdentity == EC)
oss << "EC";
else
oss << "FE";
csep.data(oss.str()); //@bug 6078. Log the statement
NJLSysDataList sysDataList;
try
{
getSysData(csep, sysDataList, SYSTABLE_TABLE);
}
catch (IDBExcept&)
{
throw;
}
catch (runtime_error& e)
{
throw runtime_error(e.what());
}
vector<ColumnResult*>::const_iterator it;
for (it = sysDataList.begin(); it != sysDataList.end(); it++)
{
if ((*it)->ColumnOID() == systableOid)
{
if ((*it)->dataCount() == 1)
{
// populate cache
lk1.lock();
fAUXColumnOIDToTableOIDMap[oid] = (OID)((*it)->GetData(0));
return fAUXColumnOIDToTableOIDMap[oid];
}
else
{
break;
}
}
}
// populate cache
lk1.lock();
fAUXColumnOIDToTableOIDMap[oid] = 0;
return fAUXColumnOIDToTableOIDMap[oid];
}
#if 0
const CalpontSystemCatalog::IndexNameList CalpontSystemCatalog::indexNames(const TableName& tableName)
{
@ -5640,6 +5885,18 @@ void CalpontSystemCatalog::flushCache()
buildSysTablemap();
lk3.unlock();
boost::mutex::scoped_lock namemaplk(fTableNameMapLock);
fTableNameMap.clear();
namemaplk.unlock();
boost::mutex::scoped_lock auxlk(fTableAUXColumnOIDMapLock);
fTableAUXColumnOIDMap.clear();
auxlk.unlock();
boost::mutex::scoped_lock auxtotableoidlk(fAUXColumnOIDToTableOIDMapLock);
fAUXColumnOIDToTableOIDMap.clear();
auxtotableoidlk.unlock();
boost::recursive_mutex::scoped_lock lk4(fDctTokenMapLock);
fDctTokenMap.clear();
buildSysDctmap();
@ -5774,6 +6031,14 @@ void CalpontSystemCatalog::buildSysColinfomap()
aCol.columnOID = OID_SYSTABLE_AUTOINCREMENT;
fColinfomap[aCol.columnOID] = aCol;
aCol.colWidth = 4;
aCol.constraintType = NOTNULL_CONSTRAINT;
aCol.colDataType = INT;
aCol.ddn = notDict;
aCol.colPosition++;
aCol.columnOID = OID_SYSTABLE_AUXCOLUMNOID;
fColinfomap[aCol.columnOID] = aCol;
fTablemap[make_table(CALPONT_SCHEMA, SYSCOLUMN_TABLE)] = SYSCOLUMN_BASE;
aCol.colWidth = 129; // @bug 4433
@ -5984,6 +6249,7 @@ void CalpontSystemCatalog::buildSysOIDmap()
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AVGROWLEN_COL)] = OID_SYSTABLE_AVGROWLEN;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, NUMOFBLOCKS_COL)] = OID_SYSTABLE_NUMOFBLOCKS;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AUTOINC_COL)] = OID_SYSTABLE_AUTOINCREMENT;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AUXCOLUMNOID_COL)] = OID_SYSTABLE_AUXCOLUMNOID;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, SCHEMA_COL)] = OID_SYSCOLUMN_SCHEMA;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, TABLENAME_COL)] = OID_SYSCOLUMN_TABLENAME;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, COLNAME_COL)] = OID_SYSCOLUMN_COLNAME;

View File

@ -659,6 +659,17 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
*/
const ROPair tableRID(const TableName& tableName, int lower_case_table_names = 0);
/** return the OID of the table's AUX column
*
* returns the OID of the table's AUX column
*/
OID tableAUXColumnOID(const TableName& tableName, int lower_case_table_names = 0);
/** returns the table OID if the input OID is the AUX
* column OID of the table.
*/
CalpontSystemCatalog::OID isAUXColumnOID(const OID& oid);
/** return the RID of the index for a table
*
* returns the RID of the indexes for a table
@ -863,6 +874,14 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
typedef std::map<TableName, RID> Tablemap;
Tablemap fTablemap;
typedef std::map<TableName, OID> TableOIDmap;
TableOIDmap fTableAUXColumnOIDMap;
boost::mutex fTableAUXColumnOIDMapLock;
typedef std::map<OID, OID> AUXColumnOIDTableOIDmap;
AUXColumnOIDTableOIDmap fAUXColumnOIDToTableOIDMap;
boost::mutex fAUXColumnOIDToTableOIDMapLock;
typedef std::map<OID, ColType> Colinfomap;
Colinfomap fColinfomap;
boost::mutex fColinfomapLock;
@ -908,6 +927,15 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
static uint32_t fModuleID;
};
// MCOL-5021
const datatypes::SystemCatalog::ColDataType AUX_COL_DATATYPE = datatypes::SystemCatalog::UTINYINT;
const int32_t AUX_COL_WIDTH = 1;
// TODO MCOL-5021 compressionType is hardcoded to 2 (SNAPPY)
const CalpontSystemCatalog::CompressionType AUX_COL_COMPRESSION_TYPE = CalpontSystemCatalog::COMPRESSION2;
const std::string AUX_COL_DATATYPE_STRING = "unsigned-tinyint";
const uint64_t AUX_COL_MINVALUE = MIN_UTINYINT;
const uint64_t AUX_COL_MAXVALUE = MAX_UTINYINT;
/** convenience function to make a TableColName from 3 strings
*/
const CalpontSystemCatalog::TableColName make_tcn(const std::string& s, const std::string& t,
@ -1159,6 +1187,7 @@ const std::string MINVALUE_COL = "minvalue";
const std::string MAXVALUE_COL = "maxvalue";
const std::string COMPRESSIONTYPE_COL = "compressiontype";
const std::string NEXTVALUE_COL = "nextvalue";
const std::string AUXCOLUMNOID_COL = "auxcolumnoid";
/*****************************************************
* System tables OID definition
@ -1182,7 +1211,8 @@ const int OID_SYSTABLE_NUMOFROWS = SYSTABLE_BASE + 8; /** @brief total num
const int OID_SYSTABLE_AVGROWLEN = SYSTABLE_BASE + 9; /** @brief avg. row length column */
const int OID_SYSTABLE_NUMOFBLOCKS = SYSTABLE_BASE + 10; /** @brief num. of blocks column */
const int OID_SYSTABLE_AUTOINCREMENT = SYSTABLE_BASE + 11; /** @brief AUTOINCREMENT column */
const int SYSTABLE_MAX = SYSTABLE_BASE + 12; // be sure this is one more than the highest #
const int OID_SYSTABLE_AUXCOLUMNOID = SYSTABLE_BASE + 12; /** @brief AUXCOLUMNOID column */
const int SYSTABLE_MAX = SYSTABLE_BASE + 13; // be sure this is one more than the highest #
/*****************************************************
* SYSCOLUMN columns OID definition

View File

@ -90,12 +90,16 @@ BatchPrimitiveProcessorJL::~BatchPrimitiveProcessorJL()
{
}
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan, vector<BRM::LBID_t> lastScannedLBID)
void BatchPrimitiveProcessorJL::addFilterStep(const pColScanStep& scan,
vector<BRM::LBID_t> lastScannedLBID,
bool hasAuxCol,
const std::vector<BRM::EMEntry>& extentsAux,
execplan::CalpontSystemCatalog::OID oidAux)
{
SCommand cc;
tableOID = scan.tableOid();
cc.reset(new ColumnCommandJL(scan, lastScannedLBID));
cc.reset(new ColumnCommandJL(scan, lastScannedLBID, hasAuxCol, extentsAux, oidAux));
cc->setBatchPrimitiveProcessor(this);
cc->setQueryUuid(scan.queryUuid());
cc->setStepUuid(uuid);

View File

@ -115,7 +115,9 @@ class BatchPrimitiveProcessorJL
threadCount = tc;
}
void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID);
void addFilterStep(const pColScanStep&, std::vector<BRM::LBID_t> lastScannedLBID,
bool hasAuxCol, const std::vector<BRM::EMEntry>& extentsAux,
execplan::CalpontSystemCatalog::OID oidAux);
void addFilterStep(const PseudoColStep&);
void addFilterStep(const pColStep&);
void addFilterStep(const pDictionaryStep&);

View File

@ -43,7 +43,11 @@ using namespace messageqcpp;
namespace joblist
{
ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> lastLBID)
ColumnCommandJL::ColumnCommandJL(const pColScanStep& scan, vector<BRM::LBID_t> lastLBID,
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_,
execplan::CalpontSystemCatalog::OID oidAux) :
extentsAux(extentsAux_), hasAuxCol(hasAuxCol_),
fOidAux(oidAux)
{
BRM::DBRM dbrm;
isScan = true;
@ -88,6 +92,7 @@ ColumnCommandJL::ColumnCommandJL(const pColStep& step)
BRM::DBRM dbrm;
isScan = false;
hasAuxCol = false;
/* grab necessary vars from step */
traceFlags = step.fTraceFlags;
@ -160,6 +165,8 @@ ColumnCommandJL::ColumnCommandJL(const ColumnCommandJL& prevCmd, const DictStepJ
BOP = prevCmd.BOP;
}
isScan = prevCmd.isScan;
hasAuxCol = prevCmd.hasAuxCol;
extentsAux = prevCmd.extentsAux;
colType = prevCmd.colType;
extents = prevCmd.extents;
OID = prevCmd.OID;
@ -210,6 +217,10 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const
bs << BOP;
bs << filterCount;
}
if (hasAuxCol)
bs << (uint8_t)1;
else
bs << (uint8_t)0;
serializeInlineVector(bs, fLastLbid);
CommandJL::createCommand(bs);
@ -218,6 +229,9 @@ void ColumnCommandJL::createCommand(ByteStream& bs) const
void ColumnCommandJL::runCommand(ByteStream& bs) const
{
bs << lbid;
if (hasAuxCol)
bs << lbidAux;
}
void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot)
@ -247,11 +261,31 @@ void ColumnCommandJL::setLBID(uint64_t rid, uint32_t dbRoot)
"; blockNum = " << blockNum << "; OID=" << OID << " LBID=" << lbid;
cout << os.str() << endl;
*/
return;
break;
}
}
throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid.");
if (i == extents.size())
{
throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid.");
}
uint32_t j;
for (j = 0; j < extentsAux.size(); j++)
{
if (extentsAux[j].dbRoot == dbRoot && extentsAux[j].partitionNum == partNum &&
extentsAux[j].segmentNum == segNum && extentsAux[j].blockOffset == (extentNum * 1 * 1024))
{
lbidAux = extentsAux[j].range.start + (blockNum * 1);
break;
}
}
if (hasAuxCol && j == extentsAux.size())
{
throw logic_error("ColumnCommandJL: setLBID didn't find the extent for the rid.");
}
// ostringstream os;
// os << "CCJL: rid=" << rid << "; dbroot=" << dbRoot << "; partitionNum=" << partitionNum << ";
@ -350,6 +384,20 @@ void ColumnCommandJL::reloadExtents()
}
sort(extents.begin(), extents.end(), BRM::ExtentSorter());
if (hasAuxCol)
{
err = dbrm.getExtents(fOidAux, extentsAux);
if (err)
{
ostringstream os;
os << "BRM lookup error. Could not get extents for Aux OID " << fOidAux;
throw runtime_error(os.str());
}
sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter());
}
}
bool ColumnCommandJL::getIsDict()

View File

@ -40,7 +40,9 @@ namespace joblist
class ColumnCommandJL : public CommandJL
{
public:
ColumnCommandJL(const pColScanStep&, std::vector<BRM::LBID_t> lastLBID);
ColumnCommandJL(const pColScanStep&, std::vector<BRM::LBID_t> lastLBID,
bool hasAuxCol_, const std::vector<BRM::EMEntry>& extentsAux_,
execplan::CalpontSystemCatalog::OID oidAux);
ColumnCommandJL(const pColStep&);
ColumnCommandJL(const ColumnCommandJL&, const DictStepJL&);
virtual ~ColumnCommandJL();
@ -72,6 +74,10 @@ class ColumnCommandJL : public CommandJL
{
return extents;
}
const std::vector<struct BRM::EMEntry>& getExtentsAux()
{
return extentsAux;
}
const execplan::CalpontSystemCatalog::ColType& getColType() const
{
return colType;
@ -89,6 +95,15 @@ class ColumnCommandJL : public CommandJL
{
return isScan;
}
bool auxCol() const
{
return hasAuxCol;
}
execplan::CalpontSystemCatalog::OID getOIDAux() const
{
return fOidAux;
}
void reloadExtents();
@ -123,6 +138,11 @@ class ColumnCommandJL : public CommandJL
uint32_t numDBRoots;
uint32_t dbroot;
std::vector<struct BRM::EMEntry> extentsAux;
bool hasAuxCol;
uint64_t lbidAux;
execplan::CalpontSystemCatalog::OID fOidAux;
static const unsigned DEFAULT_FILES_PER_COLUMN_PARTITION = 32;
public:

View File

@ -1198,9 +1198,9 @@ string JobList::toString() const
for (i = 0; i < fQuery.size(); i++)
ret += fQuery[i]->toString();
// ret += "\nProjection Steps:\n";
// for (i = 0; i < fProject.size(); i++)
// ret += fProject[i]->toString();
ret += "\nProjection Steps:\n";
for (i = 0; i < fProject.size(); i++)
ret += fProject[i]->toString();
ret += "\n";
return ret;
}

View File

@ -717,6 +717,7 @@ struct NewColRequestHeader
uint16_t NOPS;
uint16_t NVALS;
uint8_t sort; // 1 to sort
bool hasAuxCol;
// this follows the header
// ColArgs ArgList[NOPS] (where the val field is DataSize bytes long)
// uint16_t Rids[NVALS] (each rid is relative to the given block)

View File

@ -1244,6 +1244,9 @@ class TupleBPS : public BatchPrimitive, public TupleDeliveryStep
execplan::CalpontSystemCatalog::ColType fColType;
execplan::CalpontSystemCatalog::OID fOid;
execplan::CalpontSystemCatalog::OID fTableOid;
execplan::CalpontSystemCatalog::OID fOidAux;
bool hasAuxCol;
std::vector<BRM::EMEntry> extentsAux;
uint64_t fLastTupleId;
BRM::LBIDRange_v lbidRanges;
std::vector<int32_t> lastExtent;

View File

@ -501,6 +501,7 @@ TupleBPS::TupleBPS(const pColStep& rhs, const JobInfo& jobInfo)
fRunExecuted = false;
fSwallowRows = false;
smallOuterJoiner = -1;
hasAuxCol = false;
// @1098 initialize scanFlags to be true
scanFlags.assign(numExtents, true);
@ -528,10 +529,65 @@ TupleBPS::TupleBPS(const pColScanStep& rhs, const JobInfo& jobInfo) : BatchPrimi
fTableOid = rhs.tableOid();
extentSize = rhs.extentSize;
lbidRanges = rhs.lbidRanges;
hasAuxCol = false;
execplan::CalpontSystemCatalog::TableName tableName;
if (fTableOid >= 3000)
{
try
{
tableName = jobInfo.csc->tableName(fTableOid);
fOidAux = jobInfo.csc->tableAUXColumnOID(tableName);
}
catch (logging::IDBExcept& ie)
{
std::ostringstream oss;
if (ie.errorCode() == logging::ERR_TABLE_NOT_IN_CATALOG)
{
oss << "Table " << tableName.toString();
oss << " does not exist in the system catalog.";
}
else
{
oss << "Error getting AUX column OID for table " << tableName.toString();
oss << " due to: " << ie.what();
}
throw runtime_error(oss.str());
}
catch(std::exception& ex)
{
std::ostringstream oss;
oss << "Error getting AUX column OID for table " << tableName.toString();
oss << " due to: " << ex.what();
throw runtime_error(oss.str());
}
catch(...)
{
std::ostringstream oss;
oss << "Error getting AUX column OID for table " << tableName.toString();
throw runtime_error(oss.str());
}
if (fOidAux > 3000)
{
hasAuxCol = true;
if (dbrm.getExtents(fOidAux, extentsAux))
throw runtime_error("TupleBPS::TupleBPS BRM extent lookup failure (1)");
sort(extentsAux.begin(), extentsAux.end(), BRM::ExtentSorter());
tr1::unordered_map<int64_t, EMEntry>& refAux = extentsMap[fOidAux];
for (uint32_t z = 0; z < extentsAux.size(); z++)
refAux[extentsAux[z].range.start] = extentsAux[z];
}
}
/* These lines are obsoleted by initExtentMarkers. Need to remove & retest. */
scannedExtents = rhs.extents;
extentsMap[fOid] = tr1::unordered_map<int64_t, EMEntry>();
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[fOid];
for (uint32_t z = 0; z < rhs.extents.size(); z++)
@ -650,6 +706,7 @@ TupleBPS::TupleBPS(const PassThruStep& rhs, const JobInfo& jobInfo) : BatchPrimi
fRunExecuted = false;
isFilterFeeder = false;
smallOuterJoiner = -1;
hasAuxCol = false;
// @1098 initialize scanFlags to be true
scanFlags.assign(numExtents, true);
@ -719,6 +776,7 @@ TupleBPS::TupleBPS(const pDictionaryStep& rhs, const JobInfo& jobInfo)
scanFlags.assign(numExtents, true);
runtimeCPFlags.assign(numExtents, true);
bop = BOP_AND;
hasAuxCol = false;
runRan = joinRan = false;
fDelivery = false;
@ -827,7 +885,7 @@ void TupleBPS::setBPP(JobStep* jobStep)
if (pcss != 0)
{
fBPP->addFilterStep(*pcss, lastScannedLBID);
fBPP->addFilterStep(*pcss, lastScannedLBID, hasAuxCol, extentsAux, fOidAux);
extentsMap[pcss->fOid] = tr1::unordered_map<int64_t, EMEntry>();
tr1::unordered_map<int64_t, EMEntry>& ref = extentsMap[pcss->fOid];
@ -1310,6 +1368,18 @@ void TupleBPS::reloadExtentLists()
for (j = 0; j < extents.size(); j++)
mref[extents[j].range.start] = extents[j];
if (cc->auxCol())
{
const vector<EMEntry>& extentsAux = cc->getExtentsAux();
oid = cc->getOIDAux();
extentsMap[oid] = tr1::unordered_map<int64_t, struct BRM::EMEntry>();
tr1::unordered_map<int64_t, struct BRM::EMEntry>& mrefAux = extentsMap[oid];
for (j = 0; j < extentsAux.size(); j++)
mrefAux[extentsAux[j].range.start] = extentsAux[j];
}
}
for (i = 0; i < projections.size(); i++)

View File

@ -16,7 +16,8 @@ create table if not exists systable (tablename varchar(128),
numofrows int,
avgrowlen int,
numofblocks int,
autoincrement int) engine=columnstore comment='SCHEMA SYNC ONLY';
autoincrement int,
auxcolumnoid int not null default 0) engine=columnstore comment='SCHEMA SYNC ONLY';
-- SYSCOLUMN
create table if not exists syscolumn (`schema` varchar(128),