mirror of
https://github.com/mariadb-corporation/mariadb-columnstore-engine.git
synced 2025-04-18 21:44:02 +03:00
feat(MCOL-5886): support InnoDB's table partitions in cross-engine joins
The purpose of this changeset is to obtain list of partitions from SELECT_LEX structure and pass it down to joblist and then to CrossEngineStep to pass to InnoDB.
This commit is contained in:
parent
d5c8b98162
commit
58df63f967
@ -340,8 +340,15 @@ bool CalpontSystemCatalog::TableAliasName::operator<(const TableAliasName& rhs)
|
||||
}
|
||||
else if (view == rhs.view)
|
||||
{
|
||||
if (fisColumnStore < rhs.fisColumnStore)
|
||||
if (partitions < rhs.partitions)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (partitions == rhs.partitions)
|
||||
{
|
||||
if (fisColumnStore < rhs.fisColumnStore)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -357,6 +364,7 @@ void CalpontSystemCatalog::TableAliasName::serialize(messageqcpp::ByteStream& b)
|
||||
b << alias;
|
||||
b << view;
|
||||
b << static_cast<ByteStream::doublebyte>(fisColumnStore);
|
||||
partitions.serialize(b);
|
||||
}
|
||||
|
||||
void CalpontSystemCatalog::TableAliasName::unserialize(messageqcpp::ByteStream& b)
|
||||
@ -366,6 +374,7 @@ void CalpontSystemCatalog::TableAliasName::unserialize(messageqcpp::ByteStream&
|
||||
b >> alias;
|
||||
b >> view;
|
||||
b >> reinterpret_cast<ByteStream::doublebyte&>(fisColumnStore);
|
||||
partitions.unserialize(b);
|
||||
}
|
||||
|
||||
/*static*/
|
||||
@ -6319,5 +6328,47 @@ bool ctListSort(const CalpontSystemCatalog::ColType& a, const CalpontSystemCatal
|
||||
return a.colPosition < b.colPosition;
|
||||
}
|
||||
|
||||
bool operator <(const Partitions& a, const Partitions& b)
|
||||
{
|
||||
// lexicographic order.
|
||||
uint32_t l = std::min(a.fPartNames.size(), b.fPartNames.size());
|
||||
for (uint32_t i = 0; i < l; i++)
|
||||
{
|
||||
if (a.fPartNames[i] < b.fPartNames[i])
|
||||
{
|
||||
return true;
|
||||
}
|
||||
if (a.fPartNames[i] > b.fPartNames[i])
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (l < a.fPartNames.size())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (l < b.fPartNames.size())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
bool operator ==(const Partitions& a, const Partitions& b)
|
||||
{
|
||||
if (a.fPartNames.size() != b.fPartNames.size())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
uint32_t l = a.fPartNames.size();
|
||||
for (uint32_t i = 0; i < l; i++)
|
||||
{
|
||||
if (a.fPartNames[i] != b.fPartNames[i])
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace execplan
|
||||
// vim:sw=4 ts=4:
|
||||
|
@ -72,6 +72,36 @@ class SessionManager;
|
||||
const int32_t CNX_VTABLE_ID = 100;
|
||||
const int32_t IDB_VTABLE_ID = CNX_VTABLE_ID;
|
||||
|
||||
/**
|
||||
* A struct to hold a list of table partitions.
|
||||
*/
|
||||
struct Partitions {
|
||||
std::vector<std::string> fPartNames;
|
||||
void serialize(messageqcpp::ByteStream& b) const
|
||||
{
|
||||
uint32_t n = fPartNames.size();
|
||||
b << n;
|
||||
for (uint32_t i = 0; i < n; i++)
|
||||
{
|
||||
b << fPartNames[i];
|
||||
}
|
||||
}
|
||||
void unserialize(messageqcpp::ByteStream& b)
|
||||
{
|
||||
uint32_t n;
|
||||
b >> n;
|
||||
for (uint32_t i = 0; i < n; i++)
|
||||
{
|
||||
std::string t;
|
||||
b >> t;
|
||||
fPartNames.push_back(t);
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
bool operator <(const Partitions& a, const Partitions& b);
|
||||
bool operator ==(const Partitions& a, const Partitions& b);
|
||||
|
||||
/** The CalpontSystemCatalog class
|
||||
*
|
||||
* This object encapsulates the system catalog stored in the engine
|
||||
@ -410,6 +440,7 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
|
||||
std::string table;
|
||||
std::string alias;
|
||||
std::string view;
|
||||
execplan::Partitions partitions;
|
||||
bool fisColumnStore;
|
||||
void clear();
|
||||
bool operator<(const TableAliasName& rhs) const;
|
||||
@ -420,7 +451,7 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
|
||||
bool operator==(const TableAliasName& rhs) const
|
||||
{
|
||||
return (schema == rhs.schema && table == rhs.table && alias == rhs.alias && view == rhs.view &&
|
||||
fisColumnStore == rhs.fisColumnStore);
|
||||
partitions == rhs.partitions && fisColumnStore == rhs.fisColumnStore);
|
||||
}
|
||||
bool operator!=(const TableAliasName& rhs) const
|
||||
{
|
||||
|
@ -203,6 +203,7 @@ SimpleColumn::SimpleColumn(const SimpleColumn& rhs, const uint32_t sessionID)
|
||||
, fData(rhs.data())
|
||||
, fIndexName(rhs.indexName())
|
||||
, fViewName(rhs.viewName())
|
||||
, fPartitions(rhs.fPartitions)
|
||||
, fTimeZone(rhs.timeZone())
|
||||
, fisColumnStore(rhs.isColumnStore())
|
||||
{
|
||||
@ -250,6 +251,7 @@ SimpleColumn& SimpleColumn::operator=(const SimpleColumn& rhs)
|
||||
fSequence = rhs.sequence();
|
||||
fDistinct = rhs.distinct();
|
||||
fisColumnStore = rhs.isColumnStore();
|
||||
fPartitions = rhs.fPartitions;
|
||||
}
|
||||
|
||||
return *this;
|
||||
@ -267,12 +269,22 @@ const string SimpleColumn::toString() const
|
||||
static const char delim = '/';
|
||||
ostringstream output;
|
||||
|
||||
ostringstream ossps;
|
||||
for (uint32_t i=0;i<fPartitions.fPartNames.size();i++)
|
||||
{
|
||||
if (i > 0)
|
||||
{
|
||||
ossps << ",";
|
||||
}
|
||||
ossps << fPartitions.fPartNames[i];
|
||||
}
|
||||
output << "SimpleColumn " << data() << endl;
|
||||
// collations in both result and operations type are the same and
|
||||
// set in the plugin code.
|
||||
datatypes::Charset cs(fResultType.charsetNumber);
|
||||
output << " s/t/c/v/o/ct/TA/CA/RA/#/card/join/source/engine/colPos/cs/coll: " << schemaName() << delim
|
||||
<< tableName() << delim << columnName() << delim << viewName() << delim << oid() << delim
|
||||
output << " s/t/ps/c/v/o/ct/TA/CA/RA/#/card/join/source/engine/colPos/cs/coll: " << schemaName() << delim
|
||||
<< tableName() << delim << ossps.str() << delim << columnName() << delim
|
||||
<< viewName() << delim << oid() << delim
|
||||
<< colDataTypeToString(fResultType.colDataType) << delim << tableAlias() << delim << alias() << delim
|
||||
<< returnAll() << delim << sequence() << delim << cardinality() << delim << joinInfo() << delim
|
||||
<< colSource() << delim << (isColumnStore() ? "ColumnStore" : "ForeignEngine") << delim
|
||||
@ -377,6 +389,12 @@ void SimpleColumn::serialize(messageqcpp::ByteStream& b) const
|
||||
b << fTableAlias;
|
||||
b << (uint32_t)fSequence;
|
||||
b << static_cast<ByteStream::doublebyte>(fisColumnStore);
|
||||
uint32_t nparts = fPartitions.fPartNames.size();
|
||||
b << nparts;
|
||||
for (uint32_t i = 0; i < nparts; i++)
|
||||
{
|
||||
b << fPartitions.fPartNames[i];
|
||||
}
|
||||
}
|
||||
|
||||
void SimpleColumn::unserialize(messageqcpp::ByteStream& b)
|
||||
@ -396,6 +414,14 @@ void SimpleColumn::unserialize(messageqcpp::ByteStream& b)
|
||||
b >> fTableAlias;
|
||||
b >> (uint32_t&)fSequence;
|
||||
b >> reinterpret_cast<ByteStream::doublebyte&>(fisColumnStore);
|
||||
uint32_t nparts;
|
||||
b >> nparts;
|
||||
for (uint32_t i = 0; i < nparts; i++)
|
||||
{
|
||||
std::string temps;
|
||||
b >> temps;
|
||||
fPartitions.fPartNames.push_back(temps);
|
||||
}
|
||||
}
|
||||
|
||||
bool SimpleColumn::operator==(const SimpleColumn& t) const
|
||||
@ -441,6 +467,9 @@ bool SimpleColumn::operator==(const SimpleColumn& t) const
|
||||
if (fisColumnStore != t.fisColumnStore)
|
||||
return false;
|
||||
|
||||
if (fPartitions != t.fPartitions)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -160,6 +160,14 @@ class SimpleColumn : public ReturnedColumn
|
||||
if (lower_case_table_names)
|
||||
boost::algorithm::to_lower(fViewName);
|
||||
}
|
||||
inline const execplan::Partitions& partitions() const
|
||||
{
|
||||
return fPartitions;
|
||||
}
|
||||
inline void partitions(const execplan::Partitions& partitions)
|
||||
{
|
||||
fPartitions = partitions;
|
||||
}
|
||||
inline long timeZone() const
|
||||
{
|
||||
return fTimeZone;
|
||||
@ -270,6 +278,7 @@ class SimpleColumn : public ReturnedColumn
|
||||
std::string fIndexName;
|
||||
// if belong to view, view name is non-empty
|
||||
std::string fViewName;
|
||||
execplan::Partitions fPartitions;
|
||||
long fTimeZone;
|
||||
bool fisColumnStore;
|
||||
|
||||
|
@ -60,6 +60,7 @@ using namespace querytele;
|
||||
namespace joblist
|
||||
{
|
||||
CrossEngineStep::CrossEngineStep(const std::string& schema, const std::string& table,
|
||||
const execplan::Partitions& partitions,
|
||||
const std::string& alias, const JobInfo& jobInfo)
|
||||
: BatchPrimitive(jobInfo)
|
||||
, fRowsRetrieved(0)
|
||||
@ -72,6 +73,7 @@ CrossEngineStep::CrossEngineStep(const std::string& schema, const std::string& t
|
||||
, fSchema(schema)
|
||||
, fTable(table)
|
||||
, fAlias(alias)
|
||||
, fPartitions(partitions)
|
||||
, fColumnCount(0)
|
||||
, fFeInstance(funcexp::FuncExp::instance())
|
||||
{
|
||||
@ -669,6 +671,20 @@ std::string CrossEngineStep::makeQuery()
|
||||
ostringstream oss;
|
||||
oss << fSelectClause << " FROM `" << fTable << "`";
|
||||
|
||||
if (fPartitions.fPartNames.size())
|
||||
{
|
||||
oss << "PARTITION (";
|
||||
for (uint32_t i=0;i<fPartitions.fPartNames.size();i++)
|
||||
{
|
||||
if (i > 0)
|
||||
{
|
||||
oss << ", ";
|
||||
}
|
||||
oss << fPartitions.fPartNames[i];
|
||||
}
|
||||
oss << ") ";
|
||||
}
|
||||
|
||||
if (fTable.compare(fAlias) != 0)
|
||||
oss << " `" << fAlias << "`";
|
||||
|
||||
|
@ -55,8 +55,9 @@ class CrossEngineStep : public BatchPrimitive, public TupleDeliveryStep
|
||||
public:
|
||||
/** @brief CrossEngineStep constructor
|
||||
*/
|
||||
CrossEngineStep(const std::string& schema, const std::string& table, const std::string& alias,
|
||||
const JobInfo& jobInfo);
|
||||
CrossEngineStep(const std::string& schema, const std::string& table,
|
||||
const execplan::Partitions& partitions,
|
||||
const std::string& alias, const JobInfo& jobInfo);
|
||||
|
||||
/** @brief CrossEngineStep destructor
|
||||
*/
|
||||
@ -225,6 +226,7 @@ class CrossEngineStep : public BatchPrimitive, public TupleDeliveryStep
|
||||
std::string fSchema;
|
||||
std::string fTable;
|
||||
std::string fAlias;
|
||||
execplan::Partitions fPartitions;
|
||||
unsigned int fPort;
|
||||
|
||||
// returned columns and primitive filters
|
||||
@ -245,6 +247,7 @@ class CrossEngineStep : public BatchPrimitive, public TupleDeliveryStep
|
||||
|
||||
funcexp::FuncExp* fFeInstance;
|
||||
utils::LibMySQL* mysql;
|
||||
std::string fPartition;
|
||||
};
|
||||
|
||||
} // namespace joblist
|
||||
|
@ -87,6 +87,7 @@ ExpressionStep::ExpressionStep(const ExpressionStep& rhs)
|
||||
, fAliases(rhs.aliases())
|
||||
, fViews(rhs.views())
|
||||
, fSchemas(rhs.schemas())
|
||||
, fPartitionss(rhs.fPartitionss)
|
||||
, fTableKeys(rhs.tableKeys())
|
||||
, fColumnKeys(rhs.columnKeys())
|
||||
, fVarBinOK(rhs.fVarBinOK)
|
||||
@ -370,6 +371,7 @@ void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo)
|
||||
string alias = extractTableAlias(sc);
|
||||
string view = sc->viewName();
|
||||
string schema = sc->schemaName();
|
||||
execplan::Partitions part = sc->partitions();
|
||||
fTableOids.push_back(tblOid);
|
||||
CalpontSystemCatalog::ColType ct;
|
||||
|
||||
@ -400,6 +402,7 @@ void ExpressionStep::populateColumnInfo(SimpleColumn* sc, JobInfo& jobInfo)
|
||||
|
||||
fAliases.push_back(alias);
|
||||
fViews.push_back(view);
|
||||
fPartitionss.push_back(part);
|
||||
fSchemas.push_back(schema);
|
||||
fTableKeys.push_back(makeTableKey(jobInfo, sc));
|
||||
fColumns.push_back(sc);
|
||||
@ -450,9 +453,11 @@ void ExpressionStep::populateColumnInfo(WindowFunctionColumn* wc, JobInfo& jobIn
|
||||
string alias("");
|
||||
string view("");
|
||||
string schema("");
|
||||
execplan::Partitions part;
|
||||
fTableOids.push_back(jobInfo.keyInfo->tupleKeyToTableOid[wcKey]);
|
||||
fAliases.push_back(alias);
|
||||
fViews.push_back(view);
|
||||
fPartitionss.push_back(part);
|
||||
fSchemas.push_back(schema);
|
||||
fTableKeys.push_back(jobInfo.keyInfo->colKeyToTblKey[wcKey]);
|
||||
fColumnKeys.push_back(wcKey);
|
||||
@ -476,6 +481,7 @@ void ExpressionStep::populateColumnInfo(AggregateColumn* ac, JobInfo& jobInfo)
|
||||
fTableOids.push_back(jobInfo.keyInfo->tupleKeyToTableOid[acKey]);
|
||||
fAliases.push_back(alias);
|
||||
fViews.push_back(view);
|
||||
fPartitionss.push_back(execplan::Partitions());
|
||||
fSchemas.push_back(schema);
|
||||
fTableKeys.push_back(jobInfo.keyInfo->colKeyToTblKey[acKey]);
|
||||
fColumnKeys.push_back(acKey);
|
||||
@ -739,6 +745,7 @@ bool ExpressionStep::parseFuncJoinColumn(ReturnedColumn* rc, JobInfo& jobInfo)
|
||||
string alias;
|
||||
string view;
|
||||
string schema;
|
||||
execplan::Partitions part;
|
||||
|
||||
if (sc && tids.size() == 1)
|
||||
{
|
||||
@ -747,6 +754,7 @@ bool ExpressionStep::parseFuncJoinColumn(ReturnedColumn* rc, JobInfo& jobInfo)
|
||||
alias = extractTableAlias(sc);
|
||||
view = sc->viewName();
|
||||
schema = sc->schemaName();
|
||||
part = sc->partitions();
|
||||
}
|
||||
else if (dynamic_cast<AggregateColumn*>(rc) || dynamic_cast<WindowFunctionColumn*>(rc) ||
|
||||
dynamic_cast<ArithmeticColumn*>(rc) || dynamic_cast<FunctionColumn*>(rc))
|
||||
@ -772,6 +780,7 @@ bool ExpressionStep::parseFuncJoinColumn(ReturnedColumn* rc, JobInfo& jobInfo)
|
||||
fFunctionJoinInfo->fSequence.push_back(rc->sequence());
|
||||
fFunctionJoinInfo->fAlias.push_back(alias);
|
||||
fFunctionJoinInfo->fView.push_back(view);
|
||||
fFunctionJoinInfo->fPartitionss.push_back(part);
|
||||
fFunctionJoinInfo->fSchema.push_back(schema);
|
||||
|
||||
return true;
|
||||
|
@ -75,6 +75,11 @@ class ExpressionStep : public JobStep
|
||||
{
|
||||
return fViews.empty() ? "" : fViews.front();
|
||||
}
|
||||
using JobStep::partitions;
|
||||
execplan::Partitions partitions() const
|
||||
{
|
||||
return fPartitions;
|
||||
}
|
||||
using JobStep::schema;
|
||||
std::string schema() const override
|
||||
{
|
||||
@ -123,6 +128,10 @@ class ExpressionStep : public JobStep
|
||||
{
|
||||
return fViews;
|
||||
}
|
||||
const std::vector<execplan::Partitions>& partitionss() const
|
||||
{
|
||||
return fPartitionss;
|
||||
}
|
||||
const std::vector<std::string>& schemas() const
|
||||
{
|
||||
return fSchemas;
|
||||
@ -148,6 +157,10 @@ class ExpressionStep : public JobStep
|
||||
{
|
||||
return fViews;
|
||||
}
|
||||
std::vector<execplan::Partitions>& partitionss()
|
||||
{
|
||||
return fPartitionss;
|
||||
}
|
||||
std::vector<std::string>& schemas()
|
||||
{
|
||||
return fSchemas;
|
||||
@ -237,6 +250,7 @@ class ExpressionStep : public JobStep
|
||||
std::vector<std::string> fAliases;
|
||||
std::vector<std::string> fViews;
|
||||
std::vector<std::string> fSchemas;
|
||||
std::vector<execplan::Partitions> fPartitionss;
|
||||
std::vector<uint32_t> fTableKeys;
|
||||
std::vector<uint32_t> fColumnKeys;
|
||||
std::vector<execplan::ReturnedColumn*> fColumns;
|
||||
|
@ -44,7 +44,9 @@ namespace
|
||||
// @brief Returns unique key for a column, table, or expresssion.
|
||||
uint32_t uniqTupleKey(JobInfo& jobInfo, CalpontSystemCatalog::OID& o, CalpontSystemCatalog::OID& t,
|
||||
const string& cn, const string& ca, const string& tn, const string& ta,
|
||||
const string& sn, const string& vw, uint32_t pi, uint64_t en, bool correlated = false)
|
||||
const string& sn, const string& vw, const execplan::Partitions& pa,
|
||||
uint32_t pi, uint64_t en,
|
||||
bool correlated = false)
|
||||
{
|
||||
uint64_t subId = jobInfo.subId;
|
||||
|
||||
@ -59,7 +61,7 @@ uint32_t uniqTupleKey(JobInfo& jobInfo, CalpontSystemCatalog::OID& o, CalpontSys
|
||||
if (!cn.empty())
|
||||
nm += "." + cn;
|
||||
|
||||
UniqId id(o, ta, sn, vw, pi, subId);
|
||||
UniqId id(o, ta, sn, vw, pa, pi, subId);
|
||||
TupleKeyMap::iterator iter = jobInfo.keyInfo->tupleKeyMap.find(id);
|
||||
|
||||
if (iter != jobInfo.keyInfo->tupleKeyMap.end())
|
||||
@ -156,14 +158,16 @@ uint32_t fudgeWidth(const CalpontSystemCatalog::ColType& ict, CalpontSystemCatal
|
||||
TupleInfo setTupleInfo_(const CalpontSystemCatalog::ColType& ct, CalpontSystemCatalog::OID col_oid,
|
||||
JobInfo& jobInfo, CalpontSystemCatalog::OID tbl_oid, const string& col_name,
|
||||
const string& col_alias, const string& sch_name, const string& tbl_name,
|
||||
const string& tbl_alias, const string& vw_name, bool correlated = false,
|
||||
const string& tbl_alias, const string& vw_name,
|
||||
const execplan::Partitions& partitions,
|
||||
bool correlated = false,
|
||||
uint32_t pc_id = 0, uint64_t engine = 0)
|
||||
{
|
||||
// get the unique tupleOids for this column
|
||||
uint32_t tbl_key = uniqTupleKey(jobInfo, tbl_oid, tbl_oid, "", "", tbl_name, tbl_alias, sch_name, vw_name,
|
||||
uint32_t tbl_key = uniqTupleKey(jobInfo, tbl_oid, tbl_oid, "", "", tbl_name, tbl_alias, sch_name, vw_name, partitions,
|
||||
0, engine, correlated);
|
||||
uint32_t col_key = uniqTupleKey(jobInfo, col_oid, tbl_oid, col_name, col_alias, tbl_name, tbl_alias,
|
||||
sch_name, vw_name, pc_id, engine, correlated);
|
||||
sch_name, vw_name, partitions, pc_id, engine, correlated);
|
||||
// If this is the first time we've seen this col, add it to the tim
|
||||
TupleInfoMap::iterator it = jobInfo.keyInfo->tupleInfoMap.find(col_key);
|
||||
TupleInfo ti;
|
||||
@ -193,6 +197,7 @@ TupleInfo setTupleInfo_(const CalpontSystemCatalog::ColType& ct, CalpontSystemCa
|
||||
|
||||
uint32_t getTupleKey_(const JobInfo& jobInfo, CalpontSystemCatalog::OID oid, const string& colName,
|
||||
const string& tblAlias, const string& schema, const string& view,
|
||||
const execplan::Partitions& part,
|
||||
bool correlated = false, uint32_t pseudo = 0, uint64_t engine = 0)
|
||||
{
|
||||
uint64_t subId = jobInfo.subId;
|
||||
@ -208,7 +213,7 @@ uint32_t getTupleKey_(const JobInfo& jobInfo, CalpontSystemCatalog::OID oid, con
|
||||
|
||||
// if (!colAlias.empty())
|
||||
// alias += "." + colAlias;
|
||||
UniqId id(oid, tblAlias, schema, view, pseudo, subId);
|
||||
UniqId id(oid, tblAlias, schema, view, part, pseudo, subId);
|
||||
TupleKeyMap::const_iterator iter = jobInfo.keyInfo->tupleKeyMap.find(id);
|
||||
|
||||
if (iter != jobInfo.keyInfo->tupleKeyMap.end())
|
||||
@ -252,6 +257,7 @@ UniqId::UniqId(const execplan::SimpleColumn* sc)
|
||||
fTable(extractTableAlias(sc))
|
||||
, fSchema(sc->schemaName())
|
||||
, fView(sc->viewName())
|
||||
, fPart()
|
||||
, fPseudo(0)
|
||||
, fSubId(-1)
|
||||
{
|
||||
@ -267,6 +273,7 @@ UniqId::UniqId(int o, const execplan::SimpleColumn* sc)
|
||||
fTable(extractTableAlias(sc))
|
||||
, fSchema(sc->schemaName())
|
||||
, fView(sc->viewName())
|
||||
, fPart()
|
||||
, fPseudo(0)
|
||||
, fSubId(-1)
|
||||
{
|
||||
@ -275,7 +282,15 @@ UniqId::UniqId(int o, const execplan::SimpleColumn* sc)
|
||||
string UniqId::toString() const
|
||||
{
|
||||
ostringstream strstm;
|
||||
strstm << fId << ":" << fTable << ":" << fSchema << ":" << fView << ":" << fPseudo << ":"
|
||||
strstm << fId << ":" << fTable << ":" << fSchema << ":" << fView << ":(";
|
||||
string dlm = "";
|
||||
for (const auto& p : fPart.fPartNames)
|
||||
{
|
||||
strstm << dlm << p;
|
||||
dlm = ",";
|
||||
}
|
||||
strstm << "):"
|
||||
<< fPseudo << ":"
|
||||
<< (int64_t)fSubId;
|
||||
return strstm.str();
|
||||
}
|
||||
@ -394,7 +409,8 @@ uint32_t getTupleKey(JobInfo& jobInfo, const execplan::SimpleColumn* sc, bool ad
|
||||
{
|
||||
// TupleInfo is expected to be set already
|
||||
return getTupleKey_(jobInfo, sc->oid(), sc->columnName(), extractTableAlias(sc), sc->schemaName(),
|
||||
sc->viewName(), ((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0), pseudoType,
|
||||
sc->viewName(), execplan::Partitions(),
|
||||
((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0), pseudoType,
|
||||
(sc->isColumnStore() ? 0 : 1));
|
||||
}
|
||||
|
||||
@ -475,9 +491,10 @@ uint32_t getTupleKey(JobInfo& jobInfo, const SRCP& srcp, bool add)
|
||||
}
|
||||
|
||||
uint32_t getTableKey(const JobInfo& jobInfo, execplan::CalpontSystemCatalog::OID tableOid,
|
||||
const string& alias, const string& schema, const string& view)
|
||||
const string& alias, const string& schema, const string& view,
|
||||
const execplan::Partitions& part)
|
||||
{
|
||||
return getTupleKey_(jobInfo, tableOid, "", alias, schema, view);
|
||||
return getTupleKey_(jobInfo, tableOid, "", alias, schema, view, part);
|
||||
}
|
||||
|
||||
uint32_t getTableKey(const JobInfo& jobInfo, uint32_t cid)
|
||||
@ -493,21 +510,21 @@ void updateTableKey(uint32_t cid, uint32_t tid, JobInfo& jobInfo)
|
||||
uint32_t getTableKey(JobInfo& jobInfo, JobStep* js)
|
||||
{
|
||||
CalpontSystemCatalog::OID tableOid = js->tableOid();
|
||||
return getTupleKey_(jobInfo, tableOid, "", js->alias(), js->schema(), js->view());
|
||||
return getTupleKey_(jobInfo, tableOid, "", js->alias(), js->schema(), js->view(), execplan::Partitions());
|
||||
}
|
||||
|
||||
uint32_t makeTableKey(JobInfo& jobInfo, const execplan::SimpleColumn* sc)
|
||||
{
|
||||
CalpontSystemCatalog::OID o = tableOid(sc, jobInfo.csc);
|
||||
return uniqTupleKey(jobInfo, o, o, "", "", sc->tableName(), extractTableAlias(sc), sc->schemaName(),
|
||||
sc->viewName(), 0, (sc->isColumnStore() ? 0 : 1),
|
||||
sc->viewName(), sc->partitions(), 0, (sc->isColumnStore() ? 0 : 1),
|
||||
((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0));
|
||||
}
|
||||
|
||||
uint32_t makeTableKey(JobInfo& jobInfo, CalpontSystemCatalog::OID o, const string& tn, const string& ta,
|
||||
const string& sn, const string& vn, uint64_t en)
|
||||
const string& sn, const string& vn, const execplan::Partitions& pn, uint64_t en)
|
||||
{
|
||||
return uniqTupleKey(jobInfo, o, o, "", "", tn, ta, sn, vn, 0, en);
|
||||
return uniqTupleKey(jobInfo, o, o, "", "", tn, ta, sn, vn, pn, 0, en);
|
||||
}
|
||||
|
||||
TupleInfo getTupleInfo(uint32_t columnKey, const JobInfo& jobInfo)
|
||||
@ -545,7 +562,7 @@ TupleInfo setTupleInfo(const execplan::CalpontSystemCatalog::ColType& ct,
|
||||
const PseudoColumn* pc = dynamic_cast<const execplan::PseudoColumn*>(sc);
|
||||
uint32_t pseudoType = (pc) ? pc->pseudoType() : execplan::PSEUDO_UNKNOWN;
|
||||
return setTupleInfo_(ct, col_oid, jobInfo, tbl_oid, sc->columnName(), sc->alias(), sc->schemaName(),
|
||||
sc->tableName(), alias, sc->viewName(),
|
||||
sc->tableName(), alias, sc->viewName(), sc->partitions(),
|
||||
((sc->joinInfo() & execplan::JOIN_CORRELATED) != 0), pseudoType,
|
||||
(sc->isColumnStore() ? 0 : 1));
|
||||
}
|
||||
@ -565,7 +582,7 @@ TupleInfo setExpTupleInfo(const execplan::CalpontSystemCatalog::ColType& ct, uin
|
||||
if (!(ji->subAlias.empty()))
|
||||
expAlias += ji->subAlias;
|
||||
|
||||
return setTupleInfo_(ct, expressionId, jobInfo, CNX_EXP_TABLE_ID, "", alias, "", "$exp", expAlias, "", cr);
|
||||
return setTupleInfo_(ct, expressionId, jobInfo, CNX_EXP_TABLE_ID, "", alias, "", "$exp", expAlias, "", execplan::Partitions(), cr);
|
||||
}
|
||||
|
||||
TupleInfo setExpTupleInfo(const execplan::ReturnedColumn* rc, JobInfo& jobInfo)
|
||||
@ -586,7 +603,7 @@ uint32_t getExpTupleKey(const JobInfo& jobInfo, uint64_t eid, bool cr)
|
||||
if (!(ji->subAlias.empty()))
|
||||
expAlias += ji->subAlias;
|
||||
|
||||
return getTupleKey_(jobInfo, eid, "", expAlias, "", "", cr);
|
||||
return getTupleKey_(jobInfo, eid, "", expAlias, "", "", execplan::Partitions(), cr);
|
||||
}
|
||||
|
||||
void addAggregateColumn(ReturnedColumn* agc, int idx, RetColsVector& vec, JobInfo& jobInfo)
|
||||
@ -634,15 +651,17 @@ bool operator<(const struct UniqId& x, const struct UniqId& y)
|
||||
(x.fId == y.fId && x.fTable == y.fTable && x.fSchema < y.fSchema) ||
|
||||
(x.fId == y.fId && x.fTable == y.fTable && x.fSchema == y.fSchema && x.fView < y.fView) ||
|
||||
(x.fId == y.fId && x.fTable == y.fTable && x.fSchema == y.fSchema && x.fView == y.fView &&
|
||||
x.fPseudo < y.fPseudo) ||
|
||||
x.fPart < y.fPart) ||
|
||||
(x.fId == y.fId && x.fTable == y.fTable && x.fSchema == y.fSchema && x.fView == y.fView &&
|
||||
x.fPseudo == y.fPseudo && x.fSubId < y.fSubId));
|
||||
x.fPart == y.fPart && x.fPseudo < y.fPseudo) ||
|
||||
(x.fId == y.fId && x.fTable == y.fTable && x.fSchema == y.fSchema && x.fView == y.fView &&
|
||||
x.fPart == y.fPart && x.fPseudo == y.fPseudo && x.fSubId < y.fSubId));
|
||||
}
|
||||
|
||||
bool operator==(const struct UniqId& x, const struct UniqId& y)
|
||||
{
|
||||
return (x.fId == y.fId && x.fTable == y.fTable && x.fSchema == y.fSchema && x.fView == y.fView &&
|
||||
x.fPseudo == y.fPseudo && x.fSubId == y.fSubId);
|
||||
x.fPart == y.fPart && x.fPseudo == y.fPseudo && x.fSubId == y.fSubId);
|
||||
}
|
||||
|
||||
void updateDerivedColumn(JobInfo& jobInfo, SimpleColumn* sc, CalpontSystemCatalog::ColType& ct)
|
||||
|
@ -132,6 +132,7 @@ struct UniqId
|
||||
std::string fTable; // table name (table alias)
|
||||
std::string fSchema; // schema name
|
||||
std::string fView; // view name
|
||||
execplan::Partitions fPart; // partition(s) name(s)
|
||||
uint32_t fPseudo; // pseudo type
|
||||
// uint64_t fEngine; // InfiniDB == 0
|
||||
uint64_t fSubId; // subquery ID
|
||||
@ -139,9 +140,10 @@ struct UniqId
|
||||
UniqId() : fId(-1), fSubId(-1)
|
||||
{
|
||||
}
|
||||
UniqId(int i, const std::string& t, const std::string& s, const std::string& v, uint32_t pi = 0,
|
||||
UniqId(int i, const std::string& t, const std::string& s, const std::string& v,
|
||||
const execplan::Partitions& p, uint32_t pi = 0,
|
||||
uint64_t l = -1)
|
||||
: fId(i), fTable(t), fSchema(s), fView(v), fPseudo(pi), fSubId(l)
|
||||
: fId(i), fTable(t), fSchema(s), fView(v), fPart(p), fPseudo(pi), fSubId(l)
|
||||
{
|
||||
}
|
||||
explicit UniqId(const execplan::SimpleColumn* sc);
|
||||
@ -440,7 +442,7 @@ execplan::CalpontSystemCatalog::OID tableOid(const execplan::SimpleColumn* sc,
|
||||
*/
|
||||
uint32_t getTupleKey(JobInfo& jobInfo, const execplan::SimpleColumn* sc, bool add = false);
|
||||
uint32_t getTableKey(const JobInfo& jobInfo, execplan::CalpontSystemCatalog::OID tableOid,
|
||||
const std::string& alias, const std::string& schema, const std::string& view);
|
||||
const std::string& alias, const std::string& schema, const std::string& view, const execplan::Partitions& partitions);
|
||||
uint32_t getTupleKey(JobInfo& jobInfo, const execplan::SRCP& srcp, bool add = false);
|
||||
uint32_t getTableKey(const JobInfo& jobInfo, uint32_t cid);
|
||||
uint32_t getTableKey(JobInfo& jobInfo, JobStep* js);
|
||||
@ -452,7 +454,7 @@ uint32_t getExpTupleKey(const JobInfo& jobInfo, uint64_t eid, bool cr = false);
|
||||
uint32_t makeTableKey(JobInfo& jobInfo, const execplan::SimpleColumn* sc);
|
||||
uint32_t makeTableKey(JobInfo& jobInfo, execplan::CalpontSystemCatalog::OID tableOid,
|
||||
const std::string& tbl_name, const std::string& tbl_alias, const std::string& sch_name,
|
||||
const std::string& vw_name, uint64_t engine = 0);
|
||||
const std::string& vw_name, const execplan::Partitions& partitions, uint64_t engine = 0);
|
||||
|
||||
/** @brief Returns the tupleInfo associate with the (table, column) key pair
|
||||
*
|
||||
|
@ -1508,6 +1508,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
const ConstantColumn* cc = static_cast<const ConstantColumn*>(rhs);
|
||||
string alias(extractTableAlias(sc));
|
||||
string view(sc->viewName());
|
||||
execplan::Partitions partitions(sc->partitions());
|
||||
string schema(sc->schemaName());
|
||||
tbl_oid = tableOid(sc, jobInfo.csc);
|
||||
|
||||
@ -1520,7 +1521,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
{
|
||||
// bug 3749, mark outer join table with isNull filter
|
||||
if (cc->isNull() && (opis == *sop || opisnull == *sop))
|
||||
jobInfo.tableHasIsNull.insert(getTableKey(jobInfo, tbl_oid, alias, "", view));
|
||||
jobInfo.tableHasIsNull.insert(getTableKey(jobInfo, tbl_oid, alias, "", view, partitions));
|
||||
|
||||
return doExpressionFilter(sf, jobInfo);
|
||||
}
|
||||
@ -1814,7 +1815,7 @@ const JobStepVector doSimpleFilter(SimpleFilter* sf, JobInfo& jobInfo)
|
||||
}
|
||||
|
||||
if (cc->isNull() && (opis == *sop || opisnull == *sop))
|
||||
jobInfo.tableHasIsNull.insert(getTableKey(jobInfo, tbl_oid, alias, sc->schemaName(), view));
|
||||
jobInfo.tableHasIsNull.insert(getTableKey(jobInfo, tbl_oid, alias, sc->schemaName(), view, sc->partitions()));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -2045,10 +2046,10 @@ const JobStepVector doOuterJoinOnFilter(OuterJoinOnFilter* oj, JobInfo& jobInfo)
|
||||
// cascade outer table attribute.
|
||||
CalpontSystemCatalog::OID tableOid1 = tableOid(sc1, jobInfo.csc);
|
||||
uint64_t tid1 =
|
||||
getTableKey(jobInfo, tableOid1, sc1->tableAlias(), sc1->schemaName(), sc1->viewName());
|
||||
getTableKey(jobInfo, tableOid1, sc1->tableAlias(), sc1->schemaName(), sc1->viewName(), sc1->partitions());
|
||||
CalpontSystemCatalog::OID tableOid2 = tableOid(sc2, jobInfo.csc);
|
||||
uint64_t tid2 =
|
||||
getTableKey(jobInfo, tableOid2, sc2->tableAlias(), sc2->schemaName(), sc2->viewName());
|
||||
getTableKey(jobInfo, tableOid2, sc2->tableAlias(), sc2->schemaName(), sc2->viewName(), sc2->partitions());
|
||||
|
||||
if (tablesInOuter.find(tid1) != tablesInOuter.end())
|
||||
sc1->returnAll(true);
|
||||
@ -2195,7 +2196,7 @@ const JobStepVector doOuterJoinOnFilter(OuterJoinOnFilter* oj, JobInfo& jobInfo)
|
||||
if (sc != NULL)
|
||||
{
|
||||
CalpontSystemCatalog::OID tblOid = tableOid(sc, jobInfo.csc);
|
||||
uint64_t tid = getTableKey(jobInfo, tblOid, sc->tableAlias(), sc->schemaName(), sc->viewName());
|
||||
uint64_t tid = getTableKey(jobInfo, tblOid, sc->tableAlias(), sc->schemaName(), sc->viewName(), sc->partitions());
|
||||
|
||||
// skip outer table filters or table not directly involved in the outer join
|
||||
if (tablesInOuter.find(tid) != tablesInOuter.end() || tablesInJoin.find(tid) == tablesInJoin.end())
|
||||
|
@ -824,7 +824,7 @@ void addOrderByAndLimit(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
|
||||
}
|
||||
|
||||
oid = sc->oid();
|
||||
ct = jobInfo.vtableColTypes[UniqId(oid, alias, "", "")];
|
||||
ct = jobInfo.vtableColTypes[UniqId(oid, alias, "", "", execplan::Partitions())];
|
||||
}
|
||||
|
||||
tupleKey = getTupleKey(jobInfo, sc);
|
||||
|
@ -817,7 +817,7 @@ void addExpresssionStepsToBps(TableInfoMap::iterator& mit, SJSTEP& sjsp, JobInfo
|
||||
}
|
||||
else
|
||||
{
|
||||
sjsp.reset(new CrossEngineStep(mit->second.fSchema, mit->second.fName, mit->second.fAlias, jobInfo));
|
||||
sjsp.reset(new CrossEngineStep(mit->second.fSchema, mit->second.fName, mit->second.fPartitions, mit->second.fAlias, jobInfo));
|
||||
|
||||
bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
|
||||
}
|
||||
@ -1004,7 +1004,7 @@ bool combineJobStepsByTable(TableInfoMap::iterator& mit, JobInfo& jobInfo)
|
||||
else
|
||||
{
|
||||
sjsp.reset(
|
||||
new CrossEngineStep(mit->second.fSchema, mit->second.fName, mit->second.fAlias, jobInfo));
|
||||
new CrossEngineStep(mit->second.fSchema, mit->second.fName, mit->second.fPartitions, mit->second.fAlias, jobInfo));
|
||||
bps = dynamic_cast<CrossEngineStep*>(sjsp.get());
|
||||
}
|
||||
}
|
||||
@ -3139,6 +3139,7 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo
|
||||
largeJoinInfo->fAlias = tableInfoMap[large].fAlias;
|
||||
largeJoinInfo->fView = tableInfoMap[large].fView;
|
||||
largeJoinInfo->fSchema = tableInfoMap[large].fSchema;
|
||||
largeJoinInfo->fPartitions = tableInfoMap[large].fPartitions;
|
||||
|
||||
largeJoinInfo->fDl = tableInfoMap[large].fDl;
|
||||
largeJoinInfo->fRowGroup = tableInfoMap[large].fRowGroup;
|
||||
@ -3407,7 +3408,7 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo
|
||||
if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
|
||||
{
|
||||
uint32_t tid = getTableKey(jobInfo, smallSides[i]->fTableOid, smallSides[i]->fAlias,
|
||||
smallSides[i]->fSchema, smallSides[i]->fView);
|
||||
smallSides[i]->fSchema, smallSides[i]->fView, smallSides[i]->fPartitions);
|
||||
correlateTables[tid] = i;
|
||||
correlateCompare[tid] = NULL;
|
||||
}
|
||||
@ -4173,7 +4174,7 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap&
|
||||
if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
|
||||
{
|
||||
uint32_t tid = getTableKey(jobInfo, smallSides[i]->fTableOid, smallSides[i]->fAlias,
|
||||
smallSides[i]->fSchema, smallSides[i]->fView);
|
||||
smallSides[i]->fSchema, smallSides[i]->fView, smallSides[i]->fPartitions);
|
||||
correlateTables[tid] = i;
|
||||
correlateCompare[tid] = NULL;
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ struct JoinInfo
|
||||
std::string fAlias;
|
||||
std::string fSchema;
|
||||
std::string fView;
|
||||
execplan::Partitions fPartitions;
|
||||
AnyDataListSPtr fDl; // output data list
|
||||
rowgroup::RowGroup fRowGroup; // rowgroup meta data for the data list
|
||||
// colOid and alias can be retrieved from JobInfo.tupleKeyVec using join key.
|
||||
@ -66,6 +67,7 @@ struct TableInfo
|
||||
std::string fName;
|
||||
std::string fAlias;
|
||||
std::string fSchema;
|
||||
execplan::Partitions fPartitions;
|
||||
std::string fView;
|
||||
uint64_t fSubId;
|
||||
JobStepVector fQuerySteps;
|
||||
@ -103,6 +105,7 @@ struct FunctionJoinInfo
|
||||
std::vector<std::string> fAlias;
|
||||
std::vector<std::string> fView;
|
||||
std::vector<std::string> fSchema;
|
||||
std::vector<execplan::Partitions> fPartitionss;
|
||||
JobStepVector fStep;
|
||||
JoinType fJoinType;
|
||||
int64_t fJoinId;
|
||||
|
@ -209,7 +209,7 @@ void projectSimpleColumn(const SimpleColumn* sc, JobStepVector& jsv, JobInfo& jo
|
||||
else // must be vtable mode
|
||||
{
|
||||
oid = (tbl_oid + 1) + sc->colPosition();
|
||||
ct = jobInfo.vtableColTypes[UniqId(oid, alias, "", "")];
|
||||
ct = jobInfo.vtableColTypes[UniqId(oid, alias, "", "", execplan::Partitions())];
|
||||
ti = setTupleInfo(ct, oid, jobInfo, tbl_oid, sc, alias);
|
||||
}
|
||||
|
||||
@ -574,7 +574,9 @@ void checkGroupByCols(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo)
|
||||
if (sc)
|
||||
col = UniqId(sc);
|
||||
else
|
||||
col = UniqId(rc->expressionId(), rc->alias(), "", "");
|
||||
{
|
||||
col = UniqId(rc->expressionId(), rc->alias(), "", "", execplan::Partitions());
|
||||
}
|
||||
|
||||
if (colInGroupBy.find(col) == colInGroupBy.end() || selectSubquery)
|
||||
{
|
||||
@ -722,7 +724,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
else
|
||||
{
|
||||
gbOid = (tblOid + 1) + sc->colPosition();
|
||||
ct = jobInfo.vtableColTypes[UniqId(gbOid, alias, "", "")];
|
||||
ct = jobInfo.vtableColTypes[UniqId(gbOid, alias, "", "", execplan::Partitions())];
|
||||
}
|
||||
|
||||
// As of bug3695, make sure varbinary is not used in group by.
|
||||
@ -1034,7 +1036,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
else
|
||||
{
|
||||
retOid = (tblOid + 1) + sc->colPosition();
|
||||
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")];
|
||||
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "", execplan::Partitions())];
|
||||
}
|
||||
|
||||
TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias));
|
||||
@ -1190,7 +1192,7 @@ const JobStepVector doAggProject(const CalpontSelectExecutionPlan* csep, JobInfo
|
||||
else
|
||||
{
|
||||
retOid = (tblOid + 1) + sc->colPosition();
|
||||
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "")];
|
||||
ct = jobInfo.vtableColTypes[UniqId(retOid, alias, "", "", execplan::Partitions())];
|
||||
}
|
||||
|
||||
TupleInfo ti(setTupleInfo(ct, retOid, jobInfo, tblOid, sc, alias));
|
||||
@ -1964,7 +1966,7 @@ void makeJobSteps(CalpontSelectExecutionPlan* csep, JobInfo& jobInfo, JobStepVec
|
||||
else
|
||||
oid = 0;
|
||||
|
||||
uint32_t tableUid = makeTableKey(jobInfo, oid, it->table, it->alias, it->schema, it->view);
|
||||
uint32_t tableUid = makeTableKey(jobInfo, oid, it->table, it->alias, it->schema, it->view, it->partitions);
|
||||
jobInfo.tableList.push_back(tableUid);
|
||||
}
|
||||
|
||||
|
@ -220,6 +220,15 @@ class JobStep
|
||||
{
|
||||
fView = vw;
|
||||
}
|
||||
// MCOL-5886 support partitions
|
||||
virtual execplan::Partitions partitions() const
|
||||
{
|
||||
return fPartitions;
|
||||
}
|
||||
virtual void partitions(const execplan::Partitions& ps)
|
||||
{
|
||||
fPartitions = ps;
|
||||
}
|
||||
// @bug 3438, stats with column name
|
||||
virtual std::string name() const
|
||||
{
|
||||
@ -465,6 +474,7 @@ class JobStep
|
||||
|
||||
std::string fAlias;
|
||||
std::string fView;
|
||||
execplan::Partitions fPartitions;
|
||||
std::string fName;
|
||||
std::string fSchema;
|
||||
uint32_t fTraceFlags;
|
||||
|
@ -204,6 +204,11 @@ SJSTEP& SubQueryTransformer::makeSubQueryStep(execplan::CalpontSelectExecutionPl
|
||||
for (uint64_t i = 0; i < outputCols; i++)
|
||||
{
|
||||
fVtable.addColumn(fSubReturnedCols[i]);
|
||||
SimpleColumn* sc = dynamic_cast<SimpleColumn*>(fSubReturnedCols[i].get());
|
||||
if (sc)
|
||||
{
|
||||
fVtable.partitions(sc->partitions());
|
||||
}
|
||||
|
||||
// make sure the column type is the same as rowgroup
|
||||
CalpontSystemCatalog::ColType ct = fVtable.columnType(i);
|
||||
@ -264,7 +269,7 @@ SJSTEP& SubQueryTransformer::makeSubQueryStep(execplan::CalpontSelectExecutionPl
|
||||
precision.push_back(ti.precision);
|
||||
}
|
||||
|
||||
fOutJobInfo->vtableColTypes[UniqId(fVtable.columnOid(i), fVtable.alias(), "", "")] =
|
||||
fOutJobInfo->vtableColTypes[UniqId(fVtable.columnOid(i), fVtable.alias(), "", "", execplan::Partitions())] =
|
||||
fVtable.columnType(i);
|
||||
}
|
||||
|
||||
@ -304,7 +309,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
// Insert at [1], not to mess with OUTER join and hint(INFINIDB_ORDERED -- bug2317).
|
||||
fOutJobInfo->tableList.insert(
|
||||
fOutJobInfo->tableList.begin() + 1,
|
||||
makeTableKey(*fOutJobInfo, fVtable.tableOid(), fVtable.name(), fVtable.alias(), "", fVtable.view()));
|
||||
makeTableKey(*fOutJobInfo, fVtable.tableOid(), fVtable.name(), fVtable.alias(), "", fVtable.view(), fVtable.partitions()));
|
||||
|
||||
// tables in outer level
|
||||
set<uint32_t> outTables;
|
||||
@ -394,6 +399,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
vector<CalpontSystemCatalog::OID>& tableOids = es->tableOids();
|
||||
vector<string>& aliases = es->aliases();
|
||||
vector<string>& views = es->views();
|
||||
vector<execplan::Partitions>& partitions = es->partitionss();
|
||||
vector<string>& schemas = es->schemas();
|
||||
vector<uint32_t>& tableKeys = es->tableKeys();
|
||||
vector<uint32_t>& columnKeys = es->columnKeys();
|
||||
@ -408,7 +414,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
if (subTables.find(tableKeys[j]) != subTables.end())
|
||||
{
|
||||
const map<UniqId, uint32_t>::const_iterator k =
|
||||
subMap.find(UniqId(sc->oid(), aliases[j], schemas[j], views[j]));
|
||||
subMap.find(UniqId(sc->oid(), aliases[j], schemas[j], views[j], partitions[j]));
|
||||
|
||||
if (k == subMap.end())
|
||||
throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
|
||||
@ -419,6 +425,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
sc->viewName(fVtable.view());
|
||||
sc->oid(fVtable.columnOid(k->second));
|
||||
sc->columnName(fVtable.columns()[k->second]->columnName());
|
||||
sc->partitions(fVtable.partitions());
|
||||
const CalpontSystemCatalog::ColType& ct = fVtable.columnType(k->second);
|
||||
TupleInfo ti = setTupleInfo(ct, sc->oid(), *fOutJobInfo, fVtable.tableOid(), sc, fVtable.alias());
|
||||
|
||||
@ -428,6 +435,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
schemas[j] = sc->schemaName();
|
||||
columnKeys[j] = ti.key;
|
||||
tableKeys[j] = getTableKey(*fOutJobInfo, ti.key);
|
||||
partitions[j] = sc->partitions();
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -443,7 +451,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
{
|
||||
// workaround for window function IN/EXISTS subquery
|
||||
const map<UniqId, uint32_t>::const_iterator k =
|
||||
subMap.find(UniqId(scList[j]->expressionId(), "", "", ""));
|
||||
subMap.find(UniqId(scList[j]->expressionId(), "", "", "", execplan::Partitions()));
|
||||
|
||||
if (k == subMap.end())
|
||||
throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
|
||||
@ -475,7 +483,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
|
||||
if (sc == NULL)
|
||||
{
|
||||
UniqId colId = UniqId(rc->expressionId(), "", "", "");
|
||||
UniqId colId = UniqId(rc->expressionId(), "", "", "", execplan::Partitions());
|
||||
const map<UniqId, uint32_t>::const_iterator k = subMap.find(colId);
|
||||
|
||||
if (k == subMap.end())
|
||||
@ -518,7 +526,7 @@ void SubQueryTransformer::updateCorrelateInfo()
|
||||
|
||||
if (outTables.find(tid) == outTables.end())
|
||||
{
|
||||
if (subMap.find(UniqId(j->oid(), j->alias(), j->schema(), j->view(), 0)) != subMap.end())
|
||||
if (subMap.find(UniqId(j->oid(), j->alias(), j->schema(), j->view(), j->partitions(), 0)) != subMap.end())
|
||||
// throw CorrelateFailExcept();
|
||||
throw IDBExcept(logging::ERR_NON_SUPPORT_SUB_QUERY_TYPE);
|
||||
}
|
||||
|
@ -83,32 +83,32 @@ void VirtualTable::addColumn(const SRCP& column)
|
||||
// oss << agc->functionName() << "_" << agc->expressionId();
|
||||
// oss << "Aggregate_" << agc->expressionId();
|
||||
columnName = agc->data();
|
||||
colId = UniqId(agc->expressionId(), "", "", "");
|
||||
colId = UniqId(agc->expressionId(), "", "", "", execplan::Partitions());
|
||||
}
|
||||
else if ((wc = dynamic_cast<WindowFunctionColumn*>(column.get())) != NULL)
|
||||
{
|
||||
// oss << wc->functionName() << "_" << wc->expressionId();
|
||||
// oss << "Window_" << wc->expressionId();
|
||||
columnName = wc->data();
|
||||
colId = UniqId(wc->expressionId(), "", "", "");
|
||||
colId = UniqId(wc->expressionId(), "", "", "", execplan::Partitions());
|
||||
}
|
||||
else if ((arc = dynamic_cast<ArithmeticColumn*>(column.get())) != NULL)
|
||||
{
|
||||
// oss << "Arithmetic_" << arc->expressionId();
|
||||
columnName = arc->data();
|
||||
colId = UniqId(arc->expressionId(), "", "", "");
|
||||
colId = UniqId(arc->expressionId(), "", "", "", execplan::Partitions());
|
||||
}
|
||||
else if ((fc = dynamic_cast<FunctionColumn*>(column.get())) != NULL)
|
||||
{
|
||||
// oss << fc->functionName() << "_" << fc->expressionId();
|
||||
columnName = fc->data();
|
||||
colId = UniqId(fc->expressionId(), "", "", "");
|
||||
colId = UniqId(fc->expressionId(), "", "", "", execplan::Partitions());
|
||||
}
|
||||
else if ((cc = dynamic_cast<ConstantColumn*>(column.get())) != NULL)
|
||||
{
|
||||
// oss << "Constant_" << cc->expressionId();
|
||||
columnName = cc->data();
|
||||
colId = UniqId(cc->expressionId(), cc->alias(), "", fView);
|
||||
colId = UniqId(cc->expressionId(), cc->alias(), "", fView, execplan::Partitions());
|
||||
}
|
||||
else // new column type has added, but this code is not updated.
|
||||
{
|
||||
|
@ -72,6 +72,15 @@ class VirtualTable
|
||||
return fView;
|
||||
}
|
||||
|
||||
void partitions(const execplan::Partitions& p)
|
||||
{
|
||||
fPartitions = p;
|
||||
}
|
||||
const execplan::Partitions& partitions() const
|
||||
{
|
||||
return fPartitions;
|
||||
}
|
||||
|
||||
const std::vector<execplan::SSC>& columns() const
|
||||
{
|
||||
return fColumns;
|
||||
@ -104,6 +113,7 @@ class VirtualTable
|
||||
std::string fName;
|
||||
std::string fAlias;
|
||||
std::string fView;
|
||||
execplan::Partitions fPartitions;
|
||||
|
||||
std::vector<execplan::SSC> fColumns;
|
||||
std::vector<execplan::CalpontSystemCatalog::ColType> fColumnTypes;
|
||||
|
@ -49,6 +49,8 @@ using namespace logging;
|
||||
#define PREFER_MY_CONFIG_H
|
||||
#include <my_config.h>
|
||||
#include "idb_mysql.h"
|
||||
#include "partition_element.h"
|
||||
#include "partition_info.h"
|
||||
|
||||
#include "mcsv1_udaf.h"
|
||||
|
||||
@ -314,13 +316,58 @@ void convertOuterJoinToInnerJoin(List<TABLE_LIST>* join_list, TableOnExprList& t
|
||||
}
|
||||
}
|
||||
|
||||
CalpontSystemCatalog::TableAliasName makeTableAliasName(TABLE_LIST* table)
|
||||
static execplan::Partitions getPartitions(TABLE* table)
|
||||
{
|
||||
execplan::Partitions result;
|
||||
|
||||
if (table->part_info)
|
||||
{
|
||||
List_iterator<partition_element> part_el_it(table->part_info->partitions);
|
||||
|
||||
partition_element* pe;
|
||||
|
||||
while ((pe = part_el_it++)) // this is how server does it.
|
||||
{
|
||||
result.fPartNames.emplace_back(pe->partition_name);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
static execplan::Partitions getPartitions(TABLE_LIST* table)
|
||||
{
|
||||
execplan::Partitions result;
|
||||
|
||||
if (table->partition_names)
|
||||
{
|
||||
List_iterator<String> part_name_it(*(table->partition_names));
|
||||
|
||||
String* n;
|
||||
|
||||
while ((n = part_name_it++)) // this is how server does it.
|
||||
{
|
||||
std::string pn(n->ptr(), n->length());
|
||||
result.fPartNames.push_back(pn);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
CalpontSystemCatalog::TableAliasName makeTableAliasName_(TABLE_LIST* table)
|
||||
{
|
||||
return make_aliasview(
|
||||
(table->db.length ? table->db.str : ""), (table->table_name.length ? table->table_name.str : ""),
|
||||
(table->alias.length ? table->alias.str : ""), getViewName(table), true, lower_case_table_names);
|
||||
}
|
||||
|
||||
CalpontSystemCatalog::TableAliasName makeTableAliasName(TABLE_LIST* table)
|
||||
{
|
||||
CalpontSystemCatalog::TableAliasName result = makeTableAliasName_(table);
|
||||
result.partitions = getPartitions(table);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
//@bug5228. need to escape backtick `
|
||||
string escapeBackTick(const char* str)
|
||||
{
|
||||
@ -2575,6 +2622,7 @@ SimpleColumn* buildSimpleColFromDerivedTable(gp_walk_info& gwi, Item_field* ifp)
|
||||
|
||||
sc->tableAlias(gwi.tbList[i].alias);
|
||||
sc->viewName(viewName, lower_case_table_names);
|
||||
sc->partitions(gwi.tbList[i].partitions);
|
||||
sc->resultType(ct);
|
||||
sc->timeZone(gwi.timeZone);
|
||||
break;
|
||||
@ -2660,6 +2708,7 @@ SimpleColumn* buildSimpleColFromDerivedTable(gp_walk_info& gwi, Item_field* ifp)
|
||||
}
|
||||
sc->resultType(cols[j]->resultType());
|
||||
sc->hasAggregate(cols[j]->hasAggregate());
|
||||
// XXX partitions???
|
||||
|
||||
if (col)
|
||||
sc->isColumnStore(col->isColumnStore());
|
||||
@ -2771,6 +2820,7 @@ void collectAllCols(gp_walk_info& gwi, Item_field* ifp)
|
||||
sc->colPosition(j);
|
||||
sc->tableAlias(csep->derivedTbAlias());
|
||||
sc->viewName(gwi.tbList[i].view);
|
||||
sc->partitions(gwi.tbList[i].partitions);
|
||||
sc->resultType(cols[j]->resultType());
|
||||
sc->timeZone(gwi.timeZone);
|
||||
|
||||
@ -2819,6 +2869,7 @@ void collectAllCols(gp_walk_info& gwi, Item_field* ifp)
|
||||
sc->alias(tcn.column);
|
||||
sc->resultType(ct);
|
||||
sc->tableAlias(gwi.tbList[i].alias, lower_case_table_names);
|
||||
sc->partitions(gwi.tbList[i].partitions);
|
||||
sc->viewName(viewName, lower_case_table_names);
|
||||
sc->timeZone(gwi.timeZone);
|
||||
srcp.reset(sc);
|
||||
@ -3162,6 +3213,7 @@ SimpleColumn* getSmallestColumn(boost::shared_ptr<CalpontSystemCatalog> csc,
|
||||
sc->columnName(rc->alias());
|
||||
sc->sequence(0);
|
||||
sc->tableAlias(tan.alias);
|
||||
sc->partitions(tan.partitions);
|
||||
sc->timeZone(gwi.timeZone);
|
||||
sc->derivedTable(csep->derivedTbAlias());
|
||||
sc->derivedRefCol(rc);
|
||||
@ -3180,6 +3232,7 @@ SimpleColumn* getSmallestColumn(boost::shared_ptr<CalpontSystemCatalog> csc,
|
||||
SimpleColumn* sc = new SimpleColumn(table->s->db.str, table->s->table_name.str, field->field_name.str,
|
||||
tan.fisColumnStore, gwi.sessionid, lower_case_table_names);
|
||||
sc->tableAlias(table->alias.ptr(), lower_case_table_names);
|
||||
sc->partitions(tan.partitions);
|
||||
sc->isColumnStore(false);
|
||||
sc->timeZone(gwi.timeZone);
|
||||
sc->resultType(fieldType_MysqlToIDB(field));
|
||||
@ -3211,6 +3264,7 @@ SimpleColumn* getSmallestColumn(boost::shared_ptr<CalpontSystemCatalog> csc,
|
||||
SimpleColumn* sc = new SimpleColumn(tcn.schema, tcn.table, tcn.column, csc->sessionID());
|
||||
sc->tableAlias(tan.alias);
|
||||
sc->viewName(tan.view);
|
||||
sc->partitions(tan.partitions);
|
||||
sc->timeZone(gwi.timeZone);
|
||||
sc->resultType(csc->colType(oidlist[minWidthColOffset].objnum));
|
||||
sc->charsetNumber(table->field[minWidthColOffset]->charset()->number);
|
||||
@ -5019,6 +5073,7 @@ SimpleColumn* buildSimpleColumn(Item_field* ifp, gp_walk_info& gwi)
|
||||
|
||||
// view name
|
||||
sc->viewName(getViewName(ifp->cached_table), lower_case_table_names);
|
||||
//sc->partitions(...); // XXX how???
|
||||
sc->alias(ifp->name.str);
|
||||
|
||||
sc->isColumnStore(prm.columnStore());
|
||||
@ -5045,6 +5100,11 @@ SimpleColumn* buildSimpleColumn(Item_field* ifp, gp_walk_info& gwi)
|
||||
sc->joinInfo(sc->joinInfo() | JOIN_SCALAR | JOIN_OUTER_SELECT);
|
||||
}
|
||||
|
||||
if (ifp->cached_table)
|
||||
{
|
||||
sc->partitions(getPartitions(ifp->cached_table));
|
||||
}
|
||||
|
||||
return sc;
|
||||
}
|
||||
|
||||
@ -7116,9 +7176,12 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP&
|
||||
CalpontSystemCatalog::TableAliasName tn =
|
||||
make_aliasview(table_ptr->db.str, table_name, table_ptr->alias.str, viewName, columnStore,
|
||||
lower_case_table_names);
|
||||
execplan::Partitions parts = getPartitions(table_ptr);
|
||||
tn.partitions = parts;
|
||||
gwi.tbList.push_back(tn);
|
||||
CalpontSystemCatalog::TableAliasName tan = make_aliastable(
|
||||
table_ptr->db.str, table_name, table_ptr->alias.str, columnStore, lower_case_table_names);
|
||||
tan.partitions = parts;
|
||||
gwi.tableMap[tan] = make_pair(0, table_ptr);
|
||||
#ifdef DEBUG_WALK_COND
|
||||
cerr << tn << endl;
|
||||
@ -9048,6 +9111,7 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
|
||||
sc1->tableName(sc->tableName());
|
||||
sc1->tableAlias(sc->tableAlias());
|
||||
sc1->viewName(sc->viewName());
|
||||
sc1->partitions(sc->partitions());
|
||||
sc1->colPosition(0);
|
||||
sc1->timeZone(gwi.timeZone);
|
||||
minSc.reset(sc1);
|
||||
@ -9151,6 +9215,7 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone)
|
||||
}
|
||||
sc->tableAlias(alias);
|
||||
sc->timeZone(gwi->timeZone);
|
||||
sc->partitions(getPartitions(table));
|
||||
assert(sc);
|
||||
boost::shared_ptr<SimpleColumn> spsc(sc);
|
||||
gwi->returnedCols.push_back(spsc);
|
||||
@ -9213,7 +9278,7 @@ int cp_get_table_plan(THD* thd, SCSEP& csep, cal_table_info& ti, long timeZone)
|
||||
csep->columnMap(gwi->columnMap);
|
||||
CalpontSelectExecutionPlan::TableList tblist;
|
||||
tblist.push_back(make_aliastable(table->s->db.str, table->s->table_name.str, table->alias.c_ptr(), true,
|
||||
lower_case_table_names));
|
||||
lower_case_table_names));
|
||||
csep->tableList(tblist);
|
||||
|
||||
// @bug 3321. Set max number of blocks in a dictionary file to be scanned for filtering
|
||||
|
@ -0,0 +1,15 @@
|
||||
DROP DATABASE IF EXISTS MCOL5886;
|
||||
CREATE DATABASE MCOL5886;
|
||||
USE MCOL5886;
|
||||
CREATE USER IF NOT EXISTS'cejuser'@'localhost' IDENTIFIED BY 'Vagrant1|0000001';
|
||||
GRANT ALL PRIVILEGES ON *.* TO 'cejuser'@'localhost';
|
||||
FLUSH PRIVILEGES;
|
||||
CREATE TABLE t1( a DECIMAL(12, 2), b int ) ENGINE=innodb PARTITION BY KEY(b,a) PARTITIONS 4;
|
||||
INSERT INTO t1 SELECT seq, seq/10 FROM seq_1_to_100;
|
||||
CREATE TABLE IF NOT EXISTS t2 ( a DECIMAL(12, 2), b int ) ENGINE=COLUMNSTORE;
|
||||
SELECT COUNT(*) FROM (SELECT * FROM t1 PARTITION (p0)) tt;
|
||||
COUNT(*)
|
||||
20
|
||||
SELECT COUNT(*) FROM (SELECT * FROM t2 UNION ALL SELECT * FROM t1 PARTITION (p0)) tt;
|
||||
COUNT(*)
|
||||
20
|
@ -0,0 +1,24 @@
|
||||
--disable_warnings
|
||||
DROP DATABASE IF EXISTS MCOL5886;
|
||||
--enable_warnings
|
||||
CREATE DATABASE MCOL5886;
|
||||
USE MCOL5886;
|
||||
|
||||
--exec $MCS_MCSSETCONFIG CrossEngineSupport User 'cejuser'
|
||||
--exec $MCS_MCSSETCONFIG CrossEngineSupport Password 'Vagrant1|0000001'
|
||||
|
||||
--disable_warnings
|
||||
CREATE USER IF NOT EXISTS'cejuser'@'localhost' IDENTIFIED BY 'Vagrant1|0000001';
|
||||
--enable_warnings
|
||||
GRANT ALL PRIVILEGES ON *.* TO 'cejuser'@'localhost';
|
||||
FLUSH PRIVILEGES;
|
||||
|
||||
CREATE TABLE t1( a DECIMAL(12, 2), b int ) ENGINE=innodb PARTITION BY KEY(b,a) PARTITIONS 4;
|
||||
INSERT INTO t1 SELECT seq, seq/10 FROM seq_1_to_100;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS t2 ( a DECIMAL(12, 2), b int ) ENGINE=COLUMNSTORE;
|
||||
|
||||
SELECT COUNT(*) FROM (SELECT * FROM t1 PARTITION (p0)) tt;
|
||||
|
||||
SELECT COUNT(*) FROM (SELECT * FROM t2 UNION ALL SELECT * FROM t1 PARTITION (p0)) tt;
|
||||
|
Loading…
x
Reference in New Issue
Block a user