1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-5021 Add prototype support for the AUX column in CREATE/DROP

DDL commands, single and multi-value INSERTs, cpimport, and
DELETE.
This commit is contained in:
Gagan Goel
2022-04-08 14:45:13 -04:00
parent af9caf8d6e
commit 86df9a972c
16 changed files with 687 additions and 109 deletions

View File

@ -682,7 +682,7 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
throw std::runtime_error(err);
}
if (columnOid > 0) // Column exists already
if ((columnOid > 0) && (columnOid != OID_SYSTABLE_AUXCOLUMNOID)) // Column exists already
{
err = err + "Internal add column error for " + tableColName.schema + "." + tableColName.table + "." +
tableColName.column + ". Column exists already. Your table is probably out-of-sync";
@ -747,6 +747,22 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
{
fStartingColOID = OID_SYSTABLE_AUTOINCREMENT;
}
else if ((inTableName.fName == SYSTABLE_TABLE) && (columnDefPtr->fName == AUXCOLUMNOID_COL))
{
fStartingColOID = OID_SYSTABLE_AUXCOLUMNOID;
if (!columnDefPtr->fDefaultValue || columnDefPtr->fDefaultValue->fValue != "0" ||
columnDefPtr->fConstraints.size() != 1 || !columnDefPtr->fConstraints[0] ||
columnDefPtr->fConstraints[0]->fConstraintType != ddlpackage::DDL_NOT_NULL ||
columnDefPtr->fType->fType != CalpontSystemCatalog::INT ||
columnDefPtr->fType->fLength != 4)
{
std::ostringstream oss;
oss << "Error adding " << AUXCOLUMNOID_COL << " column to calpontsys table.";
oss << " Column should be of type integer, added with NOT NULL constraint and default value of 0";
throw std::runtime_error(oss.str());
}
}
else if ((inTableName.fName == SYSCOLUMN_TABLE) && (columnDefPtr->fName == COMPRESSIONTYPE_COL))
{
fStartingColOID = OID_SYSCOLUMN_COMPRESSIONTYPE;
@ -1003,12 +1019,6 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
else
bs << (uint32_t)0;
// new column info
bs << (ByteStream::byte)dataType;
bs << (ByteStream::byte)autoincrement;
bs << (uint32_t)columnDefPtr->fType->fLength;
bs << (uint32_t)columnDefPtr->fType->fScale;
bs << (uint32_t)columnDefPtr->fType->fPrecision;
std::string tmpStr("");
if (columnDefPtr->fDefaultValue)
@ -1016,6 +1026,13 @@ void AlterTableProcessor::addColumn(uint32_t sessionID, execplan::CalpontSystemC
tmpStr = columnDefPtr->fDefaultValue->fValue;
}
// new column info
bs << (ByteStream::byte)dataType;
bs << (ByteStream::byte)autoincrement;
bs << (uint32_t)columnDefPtr->fType->fLength;
bs << (uint32_t)columnDefPtr->fType->fScale;
bs << (uint32_t)columnDefPtr->fType->fPrecision;
bs << tmpStr;
bs << (ByteStream::byte)columnDefPtr->fType->fCompressiontype;
// ref column info

View File

@ -267,12 +267,17 @@ keepGoing:
numDictCols++;
}
fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols +
1); // include column, oids,dictionary oids and tableoid
// Include column oids, dictionary oids, tableoid, and
// also include AUX oid as of MCOL-5021
fStartingColOID = fObjectIDManager.allocOIDs(numColumns + numDictCols + 2);
#ifdef IDB_DDL_DEBUG
cout << fTxnid.id << " Create table allocOIDs got the starting oid " << fStartingColOID << endl;
#endif
uint32_t size = numColumns + numDictCols;
idbassert(size > 0);
size += 1; // MCOL-5021
if (fStartingColOID < 0)
{
result.result = CREATE_ERROR;
@ -295,6 +300,7 @@ keepGoing:
bytestream << (uint32_t)createTableStmt.fSessionID;
bytestream << (uint32_t)txnID.id;
bytestream << (uint32_t)fStartingColOID;
bytestream << (uint32_t)(fStartingColOID + size);
bytestream << (uint32_t)createTableStmt.fTableWithAutoi;
uint16_t dbRoot;
BRM::OID_t sysOid = 1001;
@ -537,7 +543,7 @@ keepGoing:
bytestream << (ByteStream::byte)WE_SVR_WRITE_CREATETABLEFILES;
bytestream << uniqueId;
bytestream << (uint32_t)txnID.id;
bytestream << (numColumns + numDictCols);
bytestream << size;
unsigned colNum = 0;
unsigned dictNum = 0;
@ -601,6 +607,15 @@ keepGoing:
++iter;
}
// MCOL-5021
// TODO compressionType is hardcoded to 2 (SNAPPY)
bytestream << (fStartingColOID + size);
bytestream << (uint8_t)datatypes::SystemCatalog::UTINYINT;
bytestream << (uint8_t) false;
bytestream << (uint32_t)1;
bytestream << (uint16_t)useDBRoot;
bytestream << (uint32_t)2;
//@Bug 4176. save oids to a log file for cleanup after fail over.
std::vector<CalpontSystemCatalog::OID> oidList;
@ -616,6 +631,9 @@ keepGoing:
oidList.push_back(fStartingColOID + numColumns + i + 1);
}
// MCOL-5021
oidList.push_back(fStartingColOID + size);
try
{
createWriteDropLogFile(fStartingColOID, uniqueId, oidList);
@ -683,9 +701,9 @@ keepGoing:
bytestream.restart();
bytestream << (ByteStream::byte)WE_SVR_WRITE_DROPFILES;
bytestream << uniqueId;
bytestream << (uint32_t)(numColumns + numDictCols);
bytestream << (uint32_t)size;
for (unsigned i = 0; i < (numColumns + numDictCols); i++)
for (unsigned i = 0; i < size; i++)
{
bytestream << (uint32_t)(fStartingColOID + i + 1);
}

View File

@ -89,6 +89,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(
CalpontSystemCatalog::RIDList tableColRidList;
CalpontSystemCatalog::DictOIDList dictOIDList;
execplan::CalpontSystemCatalog::ROPair roPair;
CalpontSystemCatalog::OID tableAUXColOid;
std::string errorMsg;
ByteStream bytestream;
uint64_t uniqueId = 0;
@ -145,6 +146,7 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(
try
{
roPair = systemCatalogPtr->tableRID(tableName);
tableAUXColOid = systemCatalogPtr->tableAUXColumnOID(tableName);
}
catch (IDBExcept& ie)
{
@ -580,6 +582,18 @@ DropTableProcessor::DDLResult DropTableProcessor::processPackage(
return result;
}
// MCOL-5021 Valid AUX column OID for a table is > 3000
// Tables that were created before this feature was added will have
// tableAUXColOid = 0
if (tableAUXColOid > 3000)
{
oidList.push_back(tableAUXColOid);
CalpontSystemCatalog::ROPair auxRoPair;
auxRoPair.rid = 0;
auxRoPair.objnum = tableAUXColOid;
tableColRidList.push_back(auxRoPair);
}
// Save the oids to a file
try
{

View File

@ -3607,6 +3607,140 @@ const CalpontSystemCatalog::ROPair CalpontSystemCatalog::tableRID(const TableNam
throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args);
}
// This function is similar to CalpontSystemCatalog::tableRID, except that
// instead of returning a ROPair for the table, it returns the OID for the
// AUX column for the table
CalpontSystemCatalog::OID CalpontSystemCatalog::tableAUXColumnOID(const TableName& tableName,
int lower_case_table_names)
{
TableName aTableName;
aTableName.schema = tableName.schema;
aTableName.table = tableName.table;
if (lower_case_table_names)
{
boost::algorithm::to_lower(aTableName.schema);
boost::algorithm::to_lower(aTableName.table);
}
if (aTableName.schema.compare(CALPONT_SCHEMA) == 0)
{
std::ostringstream oss;
oss << "tableAUXColumnOID() cannot be called on a ";
oss << CALPONT_SCHEMA << " schema table";
throw runtime_error(oss.str());
}
DEBUG << "Enter tableAUXColumnOID: " << tableName.schema << "|" << tableName.table << endl;
// look up the OID in the cache first
OID auxColOid;
checkSysCatVer();
boost::mutex::scoped_lock lk1(fTableAUXColumnOIDMapLock);
TableOIDmap::const_iterator iter = fTableAUXColumnOIDMap.find(aTableName);
if (iter != fTableAUXColumnOIDMap.end())
{
auxColOid = iter->second;
return auxColOid;
}
lk1.unlock();
// select auxcolumnoid from systable where schema = tableName.schema and tablename = tableName.table;
CalpontSelectExecutionPlan csep;
CalpontSelectExecutionPlan::ReturnedColumnList returnedColumnList;
CalpontSelectExecutionPlan::FilterTokenList filterTokenList;
CalpontSelectExecutionPlan::ColumnMap colMap;
SimpleColumn* c1 =
new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, fSessionID);
SimpleColumn* c2 =
new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + SCHEMA_COL, fSessionID);
SimpleColumn* c3 =
new SimpleColumn(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + TABLENAME_COL, fSessionID);
SRCP srcp;
srcp.reset(c1);
colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + AUXCOLUMNOID_COL, srcp));
srcp.reset(c2);
colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + SCHEMA_COL, srcp));
srcp.reset(c3);
colMap.insert(CMVT_(CALPONT_SCHEMA + "." + SYSTABLE_TABLE + "." + TABLENAME_COL, srcp));
csep.columnMapNonStatic(colMap);
srcp.reset(c1->clone());
returnedColumnList.push_back(srcp);
csep.returnedCols(returnedColumnList);
OID oid = c1->oid();
// Filters
SimpleFilter* f1 =
new SimpleFilter(opeq, c2->clone(), new ConstantColumn(aTableName.schema, ConstantColumn::LITERAL));
filterTokenList.push_back(f1);
filterTokenList.push_back(new Operator("and"));
SimpleFilter* f2 =
new SimpleFilter(opeq, c3->clone(), new ConstantColumn(aTableName.table, ConstantColumn::LITERAL));
filterTokenList.push_back(f2);
csep.filterTokenList(filterTokenList);
ostringstream oss;
oss << "select auxcolumnoid from systable where schema='" << aTableName.schema << "' and tablename='"
<< aTableName.table << "' --tableAUXColumnOID/";
csep.data(oss.str()); //@bug 6078. Log the statement
if (fIdentity == EC)
oss << "EC";
else
oss << "FE";
NJLSysDataList sysDataList;
try
{
getSysData(csep, sysDataList, SYSTABLE_TABLE);
}
catch (IDBExcept&)
{
throw;
}
catch (runtime_error& e)
{
throw runtime_error(e.what());
}
vector<ColumnResult*>::const_iterator it;
for (it = sysDataList.begin(); it != sysDataList.end(); it++)
{
if ((*it)->dataCount() == 0)
{
Message::Args args;
args.add("'" + tableName.schema + "." + tableName.table + "'");
// throw logging::NoTableExcept(msg);
throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args);
}
if ((*it)->ColumnOID() == oid)
{
auxColOid = (OID)((*it)->GetData(0));
// populate cache
lk1.lock();
fTableAUXColumnOIDMap[aTableName] = auxColOid;
return auxColOid;
}
}
Message::Args args;
args.add("'" + tableName.schema + "." + tableName.table + "'");
// throw logging::NoTableExcept(msg);
throw IDBExcept(ERR_TABLE_NOT_IN_CATALOG, args);
}
#if 0
const CalpontSystemCatalog::IndexNameList CalpontSystemCatalog::indexNames(const TableName& tableName)
{
@ -5640,6 +5774,10 @@ void CalpontSystemCatalog::flushCache()
buildSysTablemap();
lk3.unlock();
boost::mutex::scoped_lock auxlk(fTableAUXColumnOIDMapLock);
fTableAUXColumnOIDMap.clear();
auxlk.unlock();
boost::recursive_mutex::scoped_lock lk4(fDctTokenMapLock);
fDctTokenMap.clear();
buildSysDctmap();
@ -5774,6 +5912,14 @@ void CalpontSystemCatalog::buildSysColinfomap()
aCol.columnOID = OID_SYSTABLE_AUTOINCREMENT;
fColinfomap[aCol.columnOID] = aCol;
aCol.colWidth = 4;
aCol.constraintType = NOTNULL_CONSTRAINT;
aCol.colDataType = INT;
aCol.ddn = notDict;
aCol.colPosition++;
aCol.columnOID = OID_SYSTABLE_AUXCOLUMNOID;
fColinfomap[aCol.columnOID] = aCol;
fTablemap[make_table(CALPONT_SCHEMA, SYSCOLUMN_TABLE)] = SYSCOLUMN_BASE;
aCol.colWidth = 129; // @bug 4433
@ -5984,6 +6130,7 @@ void CalpontSystemCatalog::buildSysOIDmap()
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AVGROWLEN_COL)] = OID_SYSTABLE_AVGROWLEN;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, NUMOFBLOCKS_COL)] = OID_SYSTABLE_NUMOFBLOCKS;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AUTOINC_COL)] = OID_SYSTABLE_AUTOINCREMENT;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSTABLE_TABLE, AUXCOLUMNOID_COL)] = OID_SYSTABLE_AUXCOLUMNOID;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, SCHEMA_COL)] = OID_SYSCOLUMN_SCHEMA;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, TABLENAME_COL)] = OID_SYSCOLUMN_TABLENAME;
fOIDmap[make_tcn(CALPONT_SCHEMA, SYSCOLUMN_TABLE, COLNAME_COL)] = OID_SYSCOLUMN_COLNAME;

View File

@ -659,6 +659,12 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
*/
const ROPair tableRID(const TableName& tableName, int lower_case_table_names = 0);
/** return the OID of the table's AUX column
*
* returns the OID of the table's AUX column
*/
OID tableAUXColumnOID(const TableName& tableName, int lower_case_table_names = 0);
/** return the RID of the index for a table
*
* returns the RID of the indexes for a table
@ -863,6 +869,10 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
typedef std::map<TableName, RID> Tablemap;
Tablemap fTablemap;
typedef std::map<TableName, OID> TableOIDmap;
TableOIDmap fTableAUXColumnOIDMap;
boost::mutex fTableAUXColumnOIDMapLock;
typedef std::map<OID, ColType> Colinfomap;
Colinfomap fColinfomap;
boost::mutex fColinfomapLock;
@ -1159,6 +1169,7 @@ const std::string MINVALUE_COL = "minvalue";
const std::string MAXVALUE_COL = "maxvalue";
const std::string COMPRESSIONTYPE_COL = "compressiontype";
const std::string NEXTVALUE_COL = "nextvalue";
const std::string AUXCOLUMNOID_COL = "auxcolumnoid";
/*****************************************************
* System tables OID definition
@ -1182,7 +1193,8 @@ const int OID_SYSTABLE_NUMOFROWS = SYSTABLE_BASE + 8; /** @brief total num
const int OID_SYSTABLE_AVGROWLEN = SYSTABLE_BASE + 9; /** @brief avg. row length column */
const int OID_SYSTABLE_NUMOFBLOCKS = SYSTABLE_BASE + 10; /** @brief num. of blocks column */
const int OID_SYSTABLE_AUTOINCREMENT = SYSTABLE_BASE + 11; /** @brief AUTOINCREMENT column */
const int SYSTABLE_MAX = SYSTABLE_BASE + 12; // be sure this is one more than the highest #
const int OID_SYSTABLE_AUXCOLUMNOID = SYSTABLE_BASE + 12; /** @brief AUXCOLUMNOID column */
const int SYSTABLE_MAX = SYSTABLE_BASE + 13; // be sure this is one more than the highest #
/*****************************************************
* SYSCOLUMN columns OID definition

View File

@ -1198,9 +1198,9 @@ string JobList::toString() const
for (i = 0; i < fQuery.size(); i++)
ret += fQuery[i]->toString();
// ret += "\nProjection Steps:\n";
// for (i = 0; i < fProject.size(); i++)
// ret += fProject[i]->toString();
ret += "\nProjection Steps:\n";
for (i = 0; i < fProject.size(); i++)
ret += fProject[i]->toString();
ret += "\n";
return ret;
}

View File

@ -2313,6 +2313,7 @@ SJLP JobListFactory::makeJobList(CalpontExecutionPlan* cplan, ResourceManager* r
ret->errorInfo(errorInfo);
}
std::cout<<ret->toString()<<std::endl;
return ret;
}

View File

@ -16,7 +16,8 @@ create table if not exists systable (tablename varchar(128),
numofrows int,
avgrowlen int,
numofblocks int,
autoincrement int) engine=columnstore comment='SCHEMA SYNC ONLY';
autoincrement int,
auxcolumnoid int not null default 0) engine=columnstore comment='SCHEMA SYNC ONLY';
-- SYSCOLUMN
create table if not exists syscolumn (`schema` varchar(128),