1
0
mirror of https://github.com/mariadb-corporation/mariadb-columnstore-engine.git synced 2025-07-29 08:21:15 +03:00

MCOL-1205 Support queries with circular joins

This patch adds support for queries with circular joins.
Currently support added for inner joins only.
This commit is contained in:
Denis Khalikov
2021-04-22 10:03:44 +03:00
parent 6dc356ed60
commit 1d5f309b8f
6 changed files with 787 additions and 41 deletions

View File

@ -71,6 +71,7 @@ using namespace dataconvert;
#include "jlf_tuplejoblist.h"
using namespace joblist;
#include "statistics.h"
namespace
{
@ -1607,8 +1608,278 @@ bool addFunctionJoin(vector<uint32_t>& joinedTables, JobStepVector& joinSteps,
return added;
}
void collectCycles(JoinGraph& joinGraph, const JobInfo& jobInfo, TableInfoMap& tableInfoMap,
uint32_t currentTable, uint32_t prevTable, JoinEdges& edgesToTransform,
Cycles& cycles)
{
// Mark as visited.
joinGraph[currentTable].fVisited = true;
joinGraph[currentTable].fParent = prevTable;
void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobInfo& jobInfo)
// For each adjacent node.
for (const auto adjNode : joinGraph[currentTable].fAdjacentList)
{
// If visited and not a back edge consider as a cycle.
if (joinGraph[adjNode].fVisited && prevTable != adjNode)
{
Cycle cycle;
const auto edgeForward = make_pair(currentTable, adjNode);
const auto edgeBackward = make_pair(adjNode, currentTable);
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
{
edgesToTransform.insert(edgeForward);
cycle.push_back(edgeForward);
}
auto nodeIt = currentTable;
auto nextNode = joinGraph[nodeIt].fParent;
// Walk back until we find node `adjNode` we identified before.
while (nextNode != UINT_MAX && nextNode != adjNode)
{
const auto edgeForward = make_pair(nextNode, nodeIt);
const auto edgeBackward = make_pair(nodeIt, nextNode);
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
{
edgesToTransform.insert(edgeForward);
cycle.push_back(edgeForward);
}
nodeIt = nextNode;
nextNode = joinGraph[nodeIt].fParent;
}
// Add the last edge.
if (nextNode != UINT_MAX)
{
const auto edgeForward = make_pair(nextNode, nodeIt);
const auto edgeBackward = make_pair(nodeIt, nextNode);
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
{
edgesToTransform.insert(edgeForward);
cycle.push_back(edgeForward);
}
}
if (jobInfo.trace && cycle.size())
{
std::cout << "Collected cycle (while walking join graph): " << std::endl;
for (const auto& edge : cycle)
{
cout << "Edge: " << edge.first << " -> " << edge.second << endl;
auto it = jobInfo.tableJoinMap.find(edge);
cout << "Left keys: " << endl;
for (auto key : it->second.fLeftKeys)
{
cout << "Key: " << key
<< " column oid: " << jobInfo.keyInfo->tupleKeyVec[key].fId << endl;
}
cout << "Right keys: " << endl;
for (auto key : it->second.fRightKeys)
{
cout << "Key: " << key
<< " column oid: " << jobInfo.keyInfo->tupleKeyVec[key].fId << endl;
}
}
}
// Collect the cycle.
if (cycle.size())
cycles.push_back(std::move(cycle));
}
// If not visited - go there.
else if (joinGraph[adjNode].fVisited == false)
{
collectCycles(joinGraph, jobInfo, tableInfoMap, adjNode, currentTable, edgesToTransform,
cycles);
}
}
}
void removeFromList(uint32_t tableId, std::vector<uint32_t>& adjList)
{
auto tableIdIt = std::find(adjList.begin(), adjList.end(), tableId);
if (tableIdIt != adjList.end())
adjList.erase(tableIdIt);
}
bool isForeignKeyForeignKeyLink(TableInfoMap& infoMap, const JobInfo& jobInfo,
const pair<uint32_t, uint32_t>& edge,
statistics::StatisticsManager* statisticsManager)
{
const auto it = jobInfo.tableJoinMap.find(edge);
std::vector<statistics::KeyType> leftKeys, rightKeys;
std::vector<uint32_t> lOid, rOid;
for (auto key : it->second.fLeftKeys)
{
auto oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
if (!statisticsManager->hasKey(oid))
return false;
auto keyType = statisticsManager->getKeyType(oid);
leftKeys.push_back(keyType);
lOid.push_back(oid);
if (jobInfo.trace)
std::cout << "OID " << oid << " with key type " << (uint32_t) keyType << std::endl;
}
for (auto key : it->second.fRightKeys)
{
auto oid = jobInfo.keyInfo->tupleKeyVec[key].fId;
if (!statisticsManager->hasKey(oid))
return false;
auto keyType = statisticsManager->getKeyType(oid);
rightKeys.push_back(keyType);
rOid.push_back(oid);
if (jobInfo.trace)
std::cout << "OID " << oid << " with key type " << (uint32_t) keyType << std::endl;
}
if (rightKeys.size() == 0 || leftKeys.size() == 0)
return false;
statistics::KeyType leftType = statistics::KeyType::PK;
for (auto keyType : leftKeys)
{
if (keyType == statistics::KeyType::FK)
{
leftType = keyType;
break;
}
}
statistics::KeyType rightType = statistics::KeyType::PK;
for (auto keyType : rightKeys)
{
if (keyType == statistics::KeyType::FK)
{
rightType = keyType;
break;
}
}
if (rightType == statistics::KeyType::FK && leftType == statistics::KeyType::FK)
{
if (jobInfo.trace)
{
std::cout << "Found FK <-> FK connection " << lOid.front() << " <-> " << rOid.front()
<< std::endl;
}
return true;
}
return false;
}
void chooseEdgeToTransform(TableInfoMap& infoMap, const JobInfo& jobInfo, Cycle& cycle,
JoinEdges& edgesToTransform, std::pair<uint32_t, uint32_t>& resultEdge)
{
auto* statisticsManager = statistics::StatisticsManager::instance();
for (auto& edgeForward : cycle)
{
if (isForeignKeyForeignKeyLink(infoMap, jobInfo, edgeForward, statisticsManager))
{
const auto edgeBackward = std::make_pair(edgeForward.second, edgeForward.first);
if (!edgesToTransform.count(edgeForward) && !edgesToTransform.count(edgeBackward))
{
edgesToTransform.insert(edgeForward);
resultEdge = edgeForward;
return;
}
}
}
if (jobInfo.trace)
std::cout << "FK FK key not found, removing the first one " << std::endl;
// FIXME: Use size of columns to possible largest cardinality.
// Take just a first.
edgesToTransform.insert(cycle.front());
resultEdge = cycle.front();
}
void breakCyclesAndCollectEdges(TableInfoMap& infoMap, const JobInfo& jobInfo, Cycles& cycles,
JoinEdges& edgesToTransform)
{
for (auto& cycle : cycles)
{
std::pair<uint32_t, uint32_t> edgeForward;
if (cycle.size() == 0)
continue;
chooseEdgeToTransform(infoMap, jobInfo, cycle, edgesToTransform, edgeForward);
if (jobInfo.trace)
{
std::cout << "Remove " << edgeForward.first << " from adjlist of " << edgeForward.second
<< std::endl;
}
// If not present add the edge.
auto tableInfoIt = jobInfo.tableJoinMap.find(edgeForward);
auto& firstExp2 = infoMap[edgeForward.first].fColsInExp2;
firstExp2.insert(firstExp2.end(), tableInfoIt->second.fLeftKeys.begin(),
tableInfoIt->second.fLeftKeys.end());
auto& secondExp2 = infoMap[edgeForward.second].fColsInExp2;
secondExp2.insert(secondExp2.end(), tableInfoIt->second.fRightKeys.begin(),
tableInfoIt->second.fRightKeys.end());
removeFromList(edgeForward.first, infoMap[edgeForward.second].fAdjacentList);
removeFromList(edgeForward.second, infoMap[edgeForward.first].fAdjacentList);
}
}
void initJoinGraph(const TableInfoMap& infoMap, JoinGraph& joinGraph)
{
for (const auto& infoPair : infoMap)
{
JoinTableNode joinTableNode;
// Copy adjacent list.
joinTableNode.fAdjacentList = infoPair.second.fAdjacentList;
joinGraph[infoPair.first] = joinTableNode;
}
}
void collectEdgesAndBreakCycles(TableInfoMap& infoMap, const JobInfo& jobInfo, JoinEdges& edgesToTransform)
{
JoinGraph joinGraph;
initJoinGraph(infoMap, joinGraph);
Cycles cycles;
collectCycles(joinGraph, jobInfo, infoMap,
/*currentTable=*/joinGraph.begin()->first,
/*prevTable=*/UINT_MAX, edgesToTransform, cycles);
if (jobInfo.trace)
{
std::cout << "Collected cycles (after walking join graph): " << std::endl;
for (const auto& cycle : cycles)
{
std::cout << "Collected cycle: " << std::endl;
for (const auto& edge : cycle)
{
std::cout << edge.first << " -> " << edge.second << std::endl;
}
}
}
edgesToTransform.clear();
// Finally break the cycles by removing collected edges from the graph.
breakCyclesAndCollectEdges(infoMap, jobInfo, cycles, edgesToTransform);
}
void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps,
JobInfo& jobInfo, JoinEdges& edgesToTransform)
{
bool spanningTree = true;
unsigned errcode = 0;
@ -1863,13 +2134,22 @@ void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobI
}
// 2. no cycles
// 2. Cycles.
if (spanningTree && (nodeSet.size() - pathSet.size() / 2 != 1))
{
// 2.1. Inner.
if (jobInfo.outerOnTable.size() == 0)
{
collectEdgesAndBreakCycles(tableInfoMap, jobInfo, edgesToTransform);
}
// 2.2. Outer.
else
{
errcode = ERR_CIRCULAR_JOIN;
spanningTree = false;
}
}
}
if (!spanningTree)
{
@ -1878,7 +2158,6 @@ void spanningTreeCheck(TableInfoMap& tableInfoMap, JobStepVector joinSteps, JobI
}
}
void outjoinPredicateAdjust(TableInfoMap& tableInfoMap, JobInfo& jobInfo)
{
set<uint32_t>::iterator i = jobInfo.outerOnTable.begin();
@ -2120,9 +2399,180 @@ string joinTypeToString(const JoinType& joinType)
return ret;
}
void matchEdgesInRowGroup(const JobInfo& jobInfo, const RowGroup& rg, JoinEdges& edgesToTransform,
PostJoinFilterKeys& postJoinFilterKeys)
{
if (jobInfo.trace)
{
cout << "\nTrying to match the RowGroup to apply a post join "
"filter\n";
}
SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
JobInfo& jobInfo, vector<uint32_t>& joinOrder)
std::vector<pair<uint32_t, uint32_t>> takenEdges;
for (const auto& edge : edgesToTransform)
{
auto it = jobInfo.tableJoinMap.find(edge);
std::vector<uint32_t> currentKeys;
// Combine keys.
currentKeys = it->second.fLeftKeys;
currentKeys.insert(currentKeys.end(), it->second.fRightKeys.begin(),
it->second.fRightKeys.end());
// Rowgroup keys.
const auto& rgKeys = rg.getKeys();
uint32_t keyIndex = 0;
uint32_t keySize = currentKeys.size();
// Search for keys in result rowgroup.
while (keyIndex < keySize)
{
auto keyIt = std::find(rgKeys.begin(), rgKeys.end(), currentKeys[keyIndex]);
// We have to match all keys.
if (keyIt == rgKeys.end())
break;
++keyIndex;
}
if (jobInfo.trace)
{
if (keyIndex == keySize)
cout << "\nRowGroup matched\n";
else
cout << "\nRowGroup not matched\n";
cout << rg.toString() << endl;
cout << "For the following keys:\n";
for (auto key : currentKeys)
cout << key << " ";
cout << endl;
}
// All keys matched in current Rowgroup.
if (keyIndex == keySize)
{
// Add macthed keys.
postJoinFilterKeys.push_back(make_pair(edge, currentKeys));
takenEdges.push_back(edge);
}
}
// Erase taken edges.
for (const auto& edge : takenEdges)
{
auto it = edgesToTransform.find(edge);
edgesToTransform.erase(it);
}
}
void createPostJoinFilters(const JobInfo& jobInfo, TableInfoMap& tableInfoMap,
const PostJoinFilterKeys& postJoinFilterKeys,
const std::map<uint32_t, uint32_t>& keyToIndexMap,
std::vector<SimpleFilter*>& postJoinFilters)
{
for (const auto& p : postJoinFilterKeys)
{
const auto& edge = p.first;
const auto& keys = p.second;
if (jobInfo.trace)
cout << "\nRestore a cycle as a post join filter\n";
uint32_t leftKeyIndex = 0;
uint32_t rightKeyIndex = keys.size() / 2;
// Left end is where right starts.
const uint32_t leftSize = rightKeyIndex;
while (leftKeyIndex < leftSize)
{
// Keys.
auto leftKey = keys[leftKeyIndex];
auto rightKey = keys[rightKeyIndex];
// Column oids.
auto leftOid = jobInfo.keyInfo->tupleKeyVec[leftKey].fId;
auto rightOid = jobInfo.keyInfo->tupleKeyVec[rightKey].fId;
// Column types.
auto leftType = jobInfo.keyInfo->colType[keys[leftKeyIndex]];
auto rightType = jobInfo.keyInfo->colType[keys[rightKeyIndex]];
CalpontSystemCatalog::TableColName leftTableColName;
CalpontSystemCatalog::TableColName rightTableColName;
// Check for the dict.
if (joblist::isDictCol(leftType) && joblist::isDictCol(rightType))
{
leftTableColName = jobInfo.csc->dictColName(leftOid);
rightTableColName = jobInfo.csc->dictColName(rightOid);
}
else
{
leftTableColName = jobInfo.csc->colName(leftOid);
rightTableColName = jobInfo.csc->colName(rightOid);
}
// Create columns.
auto* leftColumn = new SimpleColumn(leftTableColName.schema, leftTableColName.table,
leftTableColName.column);
auto* rightColumn = new SimpleColumn(rightTableColName.schema, rightTableColName.table,
rightTableColName.column);
// Set column indices in the result Rowgroup.
auto leftIndexIt = keyToIndexMap.find(leftKey);
if (leftIndexIt != keyToIndexMap.end())
{
leftColumn->inputIndex(leftIndexIt->second);
}
else
{
std::cerr << "Cannot find key: " << leftKey << " in the IndexMap " << std::endl;
throw logic_error("Post join filter: Cannot find key in the index map");
}
auto rightIndexIt = keyToIndexMap.find(rightKey);
if (rightIndexIt != keyToIndexMap.end())
{
rightColumn->inputIndex(rightIndexIt->second);
}
else
{
std::cerr << "Cannot find key: " << rightKey << " in the IndexMap " << std::endl;
throw logic_error("Post join filter: Cannot find key in the index map");
}
// Create an eq operator.
SOP eqPredicateOperator(new PredicateOperator("="));
// Set a type.
eqPredicateOperator->setOpType(leftColumn->resultType(), rightColumn->resultType());
// Create a post join filter.
SimpleFilter* joinFilter =
new SimpleFilter(eqPredicateOperator, leftColumn, rightColumn);
postJoinFilters.push_back(joinFilter);
// Erase keys from fColsInExp2.
auto& firstExp2 = tableInfoMap[edge.first].fColsInExp2;
auto keyItInExp2 = std::find(firstExp2.begin(), firstExp2.end(), leftKey);
if (keyItInExp2 != firstExp2.end())
firstExp2.erase(keyItInExp2);
auto& secondExp2 = tableInfoMap[edge.second].fColsInExp2;
keyItInExp2 = std::find(secondExp2.begin(), secondExp2.end(), rightKey);
if (keyItInExp2 != secondExp2.end())
secondExp2.erase(keyItInExp2);
++leftKeyIndex;
++rightKeyIndex;
}
}
}
SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap, JobInfo& jobInfo,
vector<uint32_t>& joinOrder, JoinEdges& edgesToTransform)
{
vector<SP_JoinInfo> smallSides;
tableInfoMap[large].fVisited = true;
@ -2140,7 +2590,8 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
if (tableInfoMap[*i].fVisited == false)
{
cId = *i;
smallSides.push_back(joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder));
smallSides.push_back(
joinToLargeTable(*i, tableInfoMap, jobInfo, joinOrder, edgesToTransform));
tableSet.insert(tableInfoMap[*i].fJoinedTables.begin(),
tableInfoMap[*i].fJoinedTables.end());
@ -2415,33 +2866,39 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
jobInfo.outerJoinExpressions.begin(),
jobInfo.outerJoinExpressions.end());
// check additional compares for semi-join
if (readyExpSteps.size() > 0)
PostJoinFilterKeys postJoinFilterKeys;
if (edgesToTransform.size())
matchEdgesInRowGroup(jobInfo, rg, edgesToTransform, postJoinFilterKeys);
// check additional compares for semi-join.
if (readyExpSteps.size() > 0 || postJoinFilterKeys.size() > 0)
{
map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
// tables have additional comparisons
map<uint32_t, int> correlateTables; // index in thjs
map<uint32_t, ParseTree*> correlateCompare; // expression
// map keys to the indices in the RG
map<uint32_t, uint32_t> keyToIndexMap;
const auto& rowGroupKeys = rg.getKeys();
for (uint64_t i = 0, e = rowGroupKeys.size(); i < e; ++i)
keyToIndexMap.insert(make_pair(rowGroupKeys[i], i));
if (readyExpSteps.size() > 0)
{
for (size_t i = 0; i != smallSides.size(); i++)
{
if ((jointypes[i] & SEMI) || (jointypes[i] & ANTI) || (jointypes[i] & SCALAR))
{
uint32_t tid = getTableKey(jobInfo,
smallSides[i]->fTableOid,
smallSides[i]->fAlias,
smallSides[i]->fSchema,
smallSides[i]->fView);
uint32_t tid =
getTableKey(jobInfo, smallSides[i]->fTableOid, smallSides[i]->fAlias,
smallSides[i]->fSchema, smallSides[i]->fView);
correlateTables[tid] = i;
correlateCompare[tid] = NULL;
}
}
}
if (correlateTables.size() > 0)
if (readyExpSteps.size() > 0 && correlateTables.size() > 0)
{
// separate additional compare for each table pair
JobStepVector::iterator eit = readyExpSteps.begin();
@ -2517,11 +2974,39 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
thjs->setJoinFilterInputRG(rg);
}
// normal expression if any
if (readyExpSteps.size() > 0)
// normal expression if any.
if (readyExpSteps.size() > 0 || postJoinFilterKeys.size() > 0)
{
// add the expression steps in where clause can be solved by this join to bps
ParseTree* pt = NULL;
std::vector<SimpleFilter*> postJoinFilters;
createPostJoinFilters(jobInfo, tableInfoMap, postJoinFilterKeys, keyToIndexMap,
postJoinFilters);
if (jobInfo.trace)
{
cout << "Filters created " << endl;
for (auto* filter : postJoinFilters)
cout << filter->toString() << endl;
}
for (auto* joinFilter : postJoinFilters)
{
if (pt == nullptr)
{
pt = new ParseTree(joinFilter);
}
else
{
ParseTree* left = pt;
ParseTree* right = new ParseTree(joinFilter);
pt = new ParseTree(new LogicOperator("and"));
pt->left(left);
pt->right(right);
}
}
JobStepVector::iterator eit = readyExpSteps.begin();
for (; eit != readyExpSteps.end(); eit++)
@ -2548,9 +3033,12 @@ SP_JoinInfo joinToLargeTable(uint32_t large, TableInfoMap& tableInfoMap,
}
}
if (pt)
{
boost::shared_ptr<ParseTree> sppt(pt);
thjs->addFcnExpGroup2(sppt);
}
}
// update the fColsInExp2 and construct the output RG
updateExp2Cols(readyExpSteps, tableInfoMap, jobInfo);
@ -3108,8 +3596,9 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap&
{
map<uint32_t, uint32_t> keyToIndexMap; // map keys to the indices in the RG
for (uint64_t i = 0; i < rg.getKeys().size(); ++i)
keyToIndexMap.insert(make_pair(rg.getKeys()[i], i));
const auto& rowGroupKeys = rg.getKeys();
for (uint64_t i = 0, e = rowGroupKeys.size(); i < e; ++i)
keyToIndexMap.insert(make_pair(rowGroupKeys[i], i));
// tables have additional comparisons
map<uint32_t, int> correlateTables; // index in thjs
@ -3324,14 +3813,15 @@ void joinTablesInOrder(uint32_t largest, JobStepVector& joinSteps, TableInfoMap&
}
}
inline void joinTables(JobStepVector& joinSteps, TableInfoMap& tableInfoMap, JobInfo& jobInfo,
vector<uint32_t>& joinOrder, const bool overrideLargeSideEstimate)
vector<uint32_t>& joinOrder, JoinEdges& edgesToTransform,
const bool overrideLargeSideEstimate)
{
uint32_t largestTable = getLargestTable(jobInfo, tableInfoMap, overrideLargeSideEstimate);
uint32_t largestTable =
getLargestTable(jobInfo, tableInfoMap, overrideLargeSideEstimate);
if (jobInfo.outerOnTable.size() == 0)
joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder);
joinToLargeTable(largestTable, tableInfoMap, jobInfo, joinOrder, edgesToTransform);
else
joinTablesInOrder(largestTable, joinSteps, tableInfoMap, jobInfo, joinOrder);
}
@ -3346,8 +3836,6 @@ void makeNoTableJobStep(JobStepVector& querySteps, JobStepVector& projectSteps,
querySteps.push_back(TupleConstantStep::addConstantStep(jobInfo));
deliverySteps[CNX_VTABLE_ID] = querySteps.back();
}
}
@ -3968,8 +4456,9 @@ void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSte
projectSteps.clear();
deliverySteps.clear();
JoinEdges edgesToTransform;
// Check if the tables and joins can be used to construct a spanning tree.
spanningTreeCheck(tableInfoMap, joinSteps, jobInfo);
spanningTreeCheck(tableInfoMap, joinSteps, jobInfo, edgesToTransform);
// 1. combine job steps for each table
TableInfoMap::iterator mit;
@ -3980,7 +4469,8 @@ void associateTupleJobSteps(JobStepVector& querySteps, JobStepVector& projectSte
// 2. join the combined steps together to form the spanning tree
vector<uint32_t> joinOrder;
joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, overrideLargeSideEstimate);
joinTables(joinSteps, tableInfoMap, jobInfo, joinOrder, edgesToTransform,
overrideLargeSideEstimate);
// 3. put the steps together
for (vector<uint32_t>::iterator i = joinOrder.begin(); i != joinOrder.end(); ++i)

View File

@ -132,6 +132,20 @@ SJSTEP unionQueries(JobStepVector& queries,
uint64_t distinctUnionNum,
JobInfo& jobInfo);
struct JoinTableNode
{
bool fVisited;
uint32_t fParent;
std::vector<uint32_t> fAdjacentList;
JoinTableNode() : fVisited(false), fParent(-1) {}
};
using JoinGraph = std::map<uint32_t, JoinTableNode>;
using JoinEdges = std::set<pair<uint32_t, uint32_t>>;
using Cycles = std::vector<std::vector<std::pair<uint32_t, uint32_t>>>;
using Cycle = std::vector<std::pair<uint32_t, uint32_t>>;
using PostJoinFilterKeys =
std::vector<std::pair<std::pair<uint32_t, uint32_t>, std::vector<uint32_t>>>;
}
#endif // JLF_TUPLEJOBLIST_H

View File

@ -0,0 +1,111 @@
DROP DATABASE IF EXISTS mcol1205;
CREATE DATABASE mcol1205;
USE mcol1205;
/* simple circular inner join `=` operator */
create table t1 (a int, b int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int) engine=columnstore;
insert into t1 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4), (3, 5), (4, 6), (5, 10), (6, 12);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c);
a b b c a c
2 3 3 4 2 4
3 4 4 5 3 5
4 5 5 6 4 6
drop table t1;
drop table t2;
drop table t3;
/* simple circular inner join with `where` filter */
create table t1 (a int, b int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int) engine=columnstore;
insert into t1 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4), (3, 5), (4, 6), (5, 10), (6, 12);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a) where t2.c = t3.c;
a b b c a c
2 3 3 4 2 4
3 4 4 5 3 5
4 5 5 6 4 6
drop table t1;
drop table t2;
drop table t3;
/* circular inner join with filter */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c) where t1.f > t3.f;
a b f b c a c f
2 3 2 3 4 2 4 1
3 4 3 4 5 3 5 2
drop table t1;
drop table t2;
drop table t3;
/* circular `where` node inner join with filter */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a) where t2.c = t3.c and t1.f > t3.f;
a b f b c a c f
2 3 2 3 4 2 4 1
3 4 3 4 5 3 5 2
drop table t1;
drop table t2;
drop table t3;
/* circular join more than 2 join keys */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c and t1.f = t3.f);
a b f b c a c f
4 5 4 5 6 4 6 4
drop table t1;
drop table t2;
drop table t3;
/* circular join with 3 loops */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
create table t4 (a int, b int) engine=columnstore;
create table t5 (a int, b int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
insert into t4 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t5 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c) inner join t4 on (t1.a = t4.a and t2.b = t4.b) inner join t5 on (t4.a = t5.a and t3.a = t5.a);
a b f b c a c f a b a b
2 3 2 3 4 2 4 1 2 3 2 3
3 4 3 4 5 3 5 2 3 4 3 4
4 5 4 5 6 4 6 4 4 5 4 5
drop table t1;
drop table t2;
drop table t3;
drop table t4;
drop table t5;
/* circular joins for dict */
create table t1 (a varchar(255), b varchar(255)) engine=columnstore;
create table t2 (b varchar(255), c varchar(255)) engine=columnstore;
create table t3 (a varchar(255), c varchar(255)) engine=columnstore;
insert into t1 values ("1","2"), ("2","3"), ("3","4"), ("4","5"), ("5", "6"), ("6","7");
insert into t2 values ("1","2"), ("2","3"), ("3","4"), ("4","5"), ("5","6"), ("6","7");
insert into t3 values ("2","4"), ("3","5"), ("4","6"), ("5", "10"), ("6", "12");
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c);
a b b c a c
2 3 3 4 2 4
3 4 4 5 3 5
4 5 5 6 4 6
drop table t1;
drop table t2;
drop table t3;
DROP DATABASE mcol1205;

View File

@ -0,0 +1,123 @@
#-- source ../include/have_columnstore.inc
--disable_warnings
DROP DATABASE IF EXISTS mcol1205;
--enable_warnings
CREATE DATABASE mcol1205;
USE mcol1205;
/* simple circular inner join `=` operator */
create table t1 (a int, b int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int) engine=columnstore;
insert into t1 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4), (3, 5), (4, 6), (5, 10), (6, 12);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c);
drop table t1;
drop table t2;
drop table t3;
/* simple circular inner join with `where` filter */
create table t1 (a int, b int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int) engine=columnstore;
insert into t1 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4), (3, 5), (4, 6), (5, 10), (6, 12);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a) where t2.c = t3.c;
drop table t1;
drop table t2;
drop table t3;
/* circular inner join with filter */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c) where t1.f > t3.f;
drop table t1;
drop table t2;
drop table t3;
/* circular `where` node inner join with filter */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a) where t2.c = t3.c and t1.f > t3.f;
drop table t1;
drop table t2;
drop table t3;
/* circular join more than 2 join keys */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c and t1.f = t3.f);
drop table t1;
drop table t2;
drop table t3;
/* circular join with 3 loops */
create table t1 (a int, b int, f int) engine=columnstore;
create table t2 (b int, c int) engine=columnstore;
create table t3 (a int, c int, f int) engine=columnstore;
create table t4 (a int, b int) engine=columnstore;
create table t5 (a int, b int) engine=columnstore;
insert into t1 values (1, 2, 1), (2, 3, 2), (3, 4, 3), (4, 5, 4), (5, 6, 5), (6, 7, 6);
insert into t2 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t3 values (2, 4, 1), (3, 5, 2), (4, 6, 4), (5, 10, 5), (6, 12, 6);
insert into t4 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
insert into t5 values (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7);
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c) inner join t4 on (t1.a = t4.a and t2.b = t4.b) inner join t5 on (t4.a = t5.a and t3.a = t5.a);
drop table t1;
drop table t2;
drop table t3;
drop table t4;
drop table t5;
/* circular joins for dict */
create table t1 (a varchar(255), b varchar(255)) engine=columnstore;
create table t2 (b varchar(255), c varchar(255)) engine=columnstore;
create table t3 (a varchar(255), c varchar(255)) engine=columnstore;
insert into t1 values ("1","2"), ("2","3"), ("3","4"), ("4","5"), ("5", "6"), ("6","7");
insert into t2 values ("1","2"), ("2","3"), ("3","4"), ("4","5"), ("5","6"), ("6","7");
insert into t3 values ("2","4"), ("3","5"), ("4","6"), ("5", "10"), ("6", "12");
select * from t1 inner join t2 on (t1.b = t2.b) inner join t3 on (t1.a = t3.a and t2.c = t3.c);
drop table t1;
drop table t2;
drop table t3;
DROP DATABASE mcol1205;

View File

@ -285,6 +285,10 @@ void StatisticsManager::unserialize(messageqcpp::ByteStream& bs)
}
}
bool StatisticsManager::hasKey(uint32_t oid) { return keyTypes.count(oid) > 0 ? true : false; }
KeyType StatisticsManager::getKeyType(uint32_t oid) { return keyTypes[oid]; }
StatisticsDistributor* StatisticsDistributor::instance()
{
static StatisticsDistributor* sd = new StatisticsDistributor();

View File

@ -88,6 +88,10 @@ class StatisticsManager
void unserialize(messageqcpp::ByteStream& bs);
// Computes hash from the current statistics data.
uint64_t computeHashFromStats();
// Checks whether statistics is available for the given `oid`.
bool hasKey(uint32_t oid);
// Returns a KeyType for the given `oid`.
KeyType getKeyType(uint32_t oid);
private:
std::map<uint32_t, KeyType> keyTypes;